首页 > 代码库 > 从源码上,分析AsyncTask的实现

从源码上,分析AsyncTask的实现

Android开发者们应该都知道AsyncTask这个类,它是系统提供的一个异步任务类,可以方便的让我们实现异步操作。在本篇文章中,我将带大家进入源码,简单分析一下AsyncTask的实现。

首先,贴上AsyncTask类的源码:

package android.os;

import java.util.ArrayDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


public abstract class AsyncTask<Params, Progress, Result> {
    private static final String LOG_TAG = "AsyncTask";

	// 获取可用CPU的数目
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;	
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE = 1;

	// 线程工厂类,供线程池创建新线程时使用
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    // 创建一个线程池,用来进行我们的任务
    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

 
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static final int MESSAGE_POST_RESULT = 0x1;
    private static final int MESSAGE_POST_PROGRESS = 0x2;

    private static final InternalHandler sHandler = new InternalHandler();

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    private final WorkerRunnable<Params, Result> mWorker;	// 可以理解为:要执行的任务
    private final FutureTask<Result> mFuture;	

    private volatile Status mStatus = Status.PENDING;
    
    private final AtomicBoolean mCancelled = new AtomicBoolean();    // 原子布尔值,指示任务是否已经取消
    private final AtomicBoolean mTaskInvoked = new AtomicBoolean();  // 指示任务是否被调用执行

	// 实现自己的任务执行器,其实作用就是安排任务到线程中执行
	// 在下面的具体实现中,我们可以知道,任务是按顺序进行执行
	// 的;也就是当一个任务执行完成,才会部署开始下一个任务,
	// 即:这个AsyncTask是串行处理任务的。
	// 
    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;	// 当前正在执行的任务

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();		// 上一个任务完成后,部署进行下一个任务
                    }
                }
            });
            if (mActive == null) {		// 当前没有正在执行的任务,那就启动下一个任务
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
			// 从任务队列中抽取出队头任务
            if ((mActive = mTasks.poll()) != null) {
				// 在这里可以看到,任务实际上还是安排到线程池中进行
                THREAD_POOL_EXECUTOR.execute(mActive);		
            }
        }
    }

	// 定义任务的三种状态,只有处于预备状态的任务才能够调用execute执行,
	// 否则将会报错
    public enum Status {
       
        PENDING,	// 预备状态
       
        RUNNING,	// 正在执行状态
       
        FINISHED,	// 完成
    }

    // 进行消息循环
    public static void init() {
        sHandler.getLooper();
    }

	// 默认执行器
    public static void setDefaultExecutor(Executor exec) {
        sDefaultExecutor = exec;
    }

   
    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
              
				// 在这里调用了doInBackground,也即是在后台线程中执行我们的任务。
				// 执行完doInBackground,调用postResult把执行结果发送出去
                return postResult(doInBackground(mParams));		
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
			// 当任务执行完成的时候调用,我会在后面讲到这个方法在何处调用。
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occured while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }

    private void postResultIfNotInvoked(Result result) {
        final boolean wasTaskInvoked = mTaskInvoked.get();
        if (!wasTaskInvoked) {
            postResult(result);
        }
    }

	// 通过Handler把我们完成的结果发送到主线程的消息队列中,然后由指定的
	// sHandler进行处理。注意:这个方法是在子线程中进行操作的
    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
		// 任务执行完成,把执行完的结果封装成AsyncTaskResult
        Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }

   
    public final Status getStatus() {
        return mStatus;
    }
	/**
	* 这个方法由子类实现,我们要把程序中想要异步进行的任务放到这个方法中执行
	* 注意:这个方法是在子线程中执行的,所以我们不能在这里进行UI相关的操作。
	*/
    protected abstract Result doInBackground(Params... params);

	// 这个方法在doInBackground之前执行;主要是进行异步任务前的准备操作
    protected void onPreExecute() {
    }

	// 这个方法在任务执行完成后调用,参数result就是任务执行完成后的结果
    @SuppressWarnings({"UnusedDeclaration"})
    protected void onPostExecute(Result result) {
    }

    // 这个方法能提供任务的执行进度
    @SuppressWarnings({"UnusedDeclaration"})
    protected void onProgressUpdate(Progress... values) {
    }

	// 这个方法在任务取消时调用的
    @SuppressWarnings({"UnusedParameters"})
    protected void onCancelled(Result result) {
        onCancelled();
    }    
    
    // 如果我们想要在任务取消时进行一些操作,可以重写这个方法中进行处理
    protected void onCancelled() {
    }

    // 判断当前任务是否被取消了。返回的值就是前面的原子布尔值中的值
    public final boolean isCancelled() {
        return mCancelled.get();
    }

    // 取消正在执行的任务,主要是通过Future类中的cancel方法来处理的,
	// 后面我会对这个方法进行分析
    public final boolean cancel(boolean mayInterruptIfRunning) {
        mCancelled.set(true);
        return mFuture.cancel(mayInterruptIfRunning);
    }

   
    public final Result get() throws InterruptedException, ExecutionException {
        return mFuture.get();
    }

   
    public final Result get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return mFuture.get(timeout, unit);
    }

    // 调用这个方法,则开始了我们的异步任务。该方法需要传入异步任务需要的参数
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

    
	// 前面提到了,AsyncTask是串行的,不过,实际上也是可以改成并行的。下面这个方法可以指定
	// 我们自己的执行器来执行操作,因此,只需要传入并行的执行器,则可以使AsyncTask并行起来
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {

				// 前面已经提到了,只有在预备状态(PENDING)下,才能够调用execute来执行任务
				// 否则将会出现异常,这里就是它抛出异常的源头。
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;		// 任务状态改为执行

        onPreExecute();		// 主要是让调用者在任务执行前做一些准备操作。

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

	// 使用默认的执行器来执行任务
    public static void execute(Runnable runnable) {
        sDefaultExecutor.execute(runnable);
    }

	// 调用这个方法,可以通知调用者任务执行的进度,注意:这个方法在子线程中进行
    protected final void publishProgress(Progress... values) {
        if (!isCancelled()) {
			// 把任务进度封装成AsyncTaskRusult类
            sHandler.obtainMessage(MESSAGE_POST_PROGRESS,
                    new AsyncTaskResult<Progress>(this, values)).sendToTarget();
        }
    }

	// 任务完成时调用;任务完成有两种情况:
	// 1. 任务被取消,这时将会回调onCanceled()方法,因此,如果我们在任务取消时,
	//    想要执行一些处理操作,那么就要重写onCanceled()方法了。
	// 2. 任务顺利执行完成。这时会调用onPostExecute方法,同时把执行结果传递给这个
	//    方法处理。
    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }

	// 定义一个Handler,处理任务相关的操作
    private static class InternalHandler extends Handler {
        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult result = (AsyncTaskResult) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // 任务执行完成,
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
					// 任务执行进度
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

	// 这个类封装了任务所需要的参数。它继承了Callable接口,后面我会讲继承这个接口的作用
    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }

    @SuppressWarnings({"RawUseOfParameterizedType"})
    private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;

        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = http://www.mamicode.com/data;>我在代码上面做了一些注释。基本上我们需要了解的也就是这些注释到的地方。下面就来开始分析了:

首先,先看看SerialExecutor。

private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;	// 当前正在执行的任务

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();		// 上一个任务完成后,部署进行下一个任务
                    }
                }
            });
            if (mActive == null) {		// 当前没有正在执行的任务,那就启动下一个任务
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
			// 从任务队列中抽取出队头任务
            if ((mActive = mTasks.poll()) != null) {
				// 在这里可以看到,任务实际上还是安排到线程池中进行
                THREAD_POOL_EXECUTOR.execute(mActive);		
            }
        }
    }

在这个类是AsyncTask默认的执行器,也就是说我们的任务都是在这里面执行的。在execute()方法中,可以知道,使用了一个ArrayDeque来存放我们的任务。它先构建了一个Runnable对象,然后在Runnable中的run方法调用了execute传入的Runable r参数的run方法,然后调用scheduleNext(),这样做的作用是:在完成上一个任务后,接着启动下一个任务。

execute()方法的最后,会判断当前是否有执行的任务,即mActive是否为空,如果不为空,那就调用scheduleNext()来启动下一个任务。现在重点分析scheduleNext()方法。在该方法中,可以看出,任务最终还是被安排到已经创建好的线程池中执行。这里就是实现AsyncTask串行的地方,可以注意一下。

再来分析一下AsyncTask的构造方法:

public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
              
				// 在这里调用了doInBackground,也即是在后台线程中执行我们的任务。
				// 执行完doInBackground,调用postResult把执行结果发送出去
                return postResult(doInBackground(mParams));		
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
			// 当任务执行完成的时候调用,我会在后面讲到这个方法在何处调用。
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occured while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }
可以知道,构造方法中生成了两个对象,mWorker和mFuture。先看看WorkerRunnable<Params, Result>是如何定义的:

 private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }
它封装了任务所需要的参数,然后就是继承了Callable接口(Callable中只定义了call()方法)。因此,在mWorker对象的初始化的时候,我们必须要实现call()方法;好,现在回到mWorker中的call方法,它调用了doInBackground(),原来,我们在使用AsyncTask的时候,在doINBackground方法中,实现的操作就是在这里被调用的!!!此时,它已经是在子线程中进行的了。为什么呢?我后面再讲!再看看mFuture对象,它需要一个传入一个Callable类型的参数,而mWorker恰好实现了Callable接口!那我们不妨看一下Future类吧。
public class FutureTask<V> implements RunnableFuture<V> {
  
    private Callable<V> callable;
	......

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
       ......
    
在Future的构造方法中,可以知道,它把我们传进来的Callable对象保存在变量callable中了;我们先简单记住这个变量!后面它还会为我们解答一些问题。回到mFuture的实现上,它重写了done()方法,这里我们只需要知道它在任务完成的时候回调用就行了!这样,我们就把AsyncTask的构造方法简单的分析好了。

接下来,分析AsyncTask的execute(Params... params)方法;

  public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;		// 任务状态改为执行

        onPreExecute();		// 主要是让调用者在任务执行前做一些准备操作。

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }
execute(Params... params)方法实际上是调用了executeOnExecutor(Executor exec, Params... params)方法,那我们具体看看这个它里面做了些什么。它调用了onPostExecute(),可以让调用者做一些执行任务前的准备操作、把传入的params参数保存到mWorker中,然后再调用exec的execute(mFuture)方法。其中,exec就是我们传进来的默认执行器sDefaultExecutor!!!sDefaultExecutor的execute()方法我们在前面已经分析了!它是使用线程池来执行我们的任务的。由于TheadPoolExecutor的实现比较复杂,所以这里暂时不做分析。我们只需知道它接下来会调用到mFuture的run()方法即可,那我们先看看这个run()方法中,做了些什么。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
我们只看try里面的代码。可以看出,如果mFuture中的callable对象不为null,那么就会回调它的call方法了。在前面,我们已经知道了mFuture把我们传进来的mWorker对象保存在了Callable变量里面了,因此,这里调用的call()方法就是mWorker的。而这个时候,call()方法是在子线程中调用的,由此就可以知道,doInBackground()也就是在子线程进行的!如果任务正常完成,那么变量ran将会是true,那么它会调用set(result),看看set()方法的源码:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
它把任务完成的结果保存在outcome变量中,接着调用了finishCompletion()方法,了解下这个方法的实现:

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
finishCompletion中,它会调用done()方法,让调用者对完成的任务结果进行处理。

我们接下来分析一下,AsyncTask是如何取消任务的!

找到AsyncTask的cancel方法:

public final boolean cancel(boolean mayInterruptIfRunning) {
        mCancelled.set(true);
        return mFuture.cancel(mayInterruptIfRunning);
    }
它调用 了mFuture的cancel方法。下面是改方法的实现:

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
从代码中可以知道,取消操作实际上是让运行当前要取消的任务的线程中断了,同样,该方法也会调用finishCompletion()方法,然后在finishCompletion方法中调用done()方法,让调用者能够对取消任务后进行相关处理。

至此,AsyncTask执行异步任务的基本流程就简单的分析完了。在文章前面的AsyncTask源码里面,我已经做了比较详细的注释,读者可以结合注释,然后对AsyncTask整个流程执行分析,相信读者能够掌握这些步骤!


欢迎大家交流和转载,转载请标明:http://blog.csdn.net/wuzhipeng1991 




从源码上,分析AsyncTask的实现