首页 > 代码库 > AsyncTask中线程池调度分析

AsyncTask中线程池调度分析

尊重原创:http://blog.csdn.net/yuanzeyao/article/details/42583215

在Android中,和异步执行相关的两个类就是Handler和AsyncTask,所以Android开发人员对于这两个类是再熟悉不过了,所以这里我不是讲解AsyncTask怎么使用,而是想分析一下AsyncTask中线程池的调度过程,然后简单的介绍一下AsyncTask的源码以及Android3.0前后,AsyncTask中线程池的区别。


在正式学习AsyncTask中的线程池之前,我们首先回忆一下java中的线程池的内容。就从Executor开始吧,Executor就是一个接口,代码如下:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <tt>Executor</tt> implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution.
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

这个接口的内容非常简单,仅仅提供了一个execute方法,通过上面的描述,execute方法仅仅负责将任务保存起来,可能在未来某一个时刻执行(可能在一个新的线程,或者一个线程池中执行)。至于具体是在新的线程中执行还是在线程池中执行就是有实现该接口的类决定的。

我们来分析一下execute这个方法的方法签名,可以发现有一个Runnable类型参数和会抛出两个异常:

Runnable:这里我们就把他当作一个线程吧。

RejectedExecutionException:通过execute方法无法将该任务加入任务队列就会有该异常。

NullPointerException:就是加入的任务时空的。

通过jdk1.6官方文档,可以知道,有一个接口继承了Executor,这个接口就是ExecutorService

public interface ExecutorService extends Executor {

    /**
	不再接纳新的任务,但是之前的任务会继续执行
	*/
    void shutdown();
   
   /**
	不再接纳新的任务,并且之前的任务也会终止(但是不一定终止成功)
	返回还没有开始的任务列表
   */
    List<Runnable> shutdownNow();
   /**
    向任务队列添加一个任务,并且返回一个Future对象
	Future对象通过get方法可以拿到返回结果,注意这里可是Callable对象
   */
    <T> Future<T> submit(Callable<T> task);
	
   /**
	这个方法也是添加一个任务,不过这里传入的是Callable对象,并多了一个返回值类型参数
   */
    <T> Future<T> submit(Runnable task, T result);
	/**
	添加一个任务
	*/
    Future<?> submit(Runnable task);

}
这个接口比Executor对了几个方法,其中几个比较重要的方法我也已经贴出并给出了解释,这里出现了几个新的类型,Callable,Future。

首先来分析一下Callable

/**
 * A task that returns a result and may throw an exception.
 * Implementors define a single method with no arguments called
 * <tt>call</tt>.
 *
 * <p>The <tt>Callable</tt> interface is similar to {@link
 * java.lang.Runnable}, in that both are designed for classes whose
 * instances are potentially executed by another thread.  A
 * <tt>Runnable</tt>, however, does not return a result and cannot
 * throw a checked exception.
 *
 * <p> The {@link Executors} class contains utility methods to
 * convert from other common forms to <tt>Callable</tt> classes.
 *
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method <tt>call</tt>
 */
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

原来Callable也是一个接口,有没有发现Callable接口和Runnable超像?其实他们的功能也超像的,区别主要如下:

1、Runnable是run方法,而Callable是call方法

2、Runnable中的run方法没有返回值,而call方法时有返回值的

3、Runnable中run方法不会抛异常,而call方法会抛异常

所以如果我们遇到了Callable方法就暂且将它和Runnable同等对待吧。


下面再来看看Future这个类

public interface Future<V> {

    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when <tt>cancel</tt> is called,
     * this task should never run.  If the task has already started,
     * then the <tt>mayInterruptIfRunning</tt> parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled}
     * will always return <tt>true</tt> if this method returned <tt>true</tt>.
     *
     * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return <tt>false</tt> if the task could not be cancelled,
     * typically because it has already completed normally;
     * <tt>true</tt> otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Returns <tt>true</tt> if this task was cancelled before it completed
     * normally.
     *
     * @return <tt>true</tt> if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * Returns <tt>true</tt> if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * <tt>true</tt>.
     *
     * @return <tt>true</tt> if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Future其实也是一个泛型接口,其中每个方法都比较重要我们一一来看:

cancel:取消一个任务,不过当一个任务已经完成时,取消就会失败,这个方法有一个参数mayInterruptIfRunning,如果传入的是true,那么即使你的任务已经开始,那么也会试图中断任务所在线程

isDone:如果一个任务执行完毕,那么这里返回true

get:该方法有两个重载方法,首先看没有参数的,这个方法时一个同步方法,只到任务成功执行完毕才返回,如果任务取消或者中断,都会抛出异常,有参数的get方法就是限定了等待时间,如果在指定时间内没有返回,那么就不再等待。

由于AsyncTask中涉及到了一个FutureTask这个类,并且这个类和Future有关,所以在这里一起也看看FutureTask的内容。由于FutureTask的代码比较多,这里我先贴出它的签名

public class FutureTask<V> implements RunnableFuture<V>

这个类有一个done方法由于在AsyncTask中会使用到,所以在这里说一下

    /**
     * Protected method invoked when this task transitions to state
     * <tt>isDone</tt> (whether normally or via cancellation). The
     * default implementation does nothing.  Subclasses may override
     * this method to invoke completion callbacks or perform
     * bookkeeping. Note that you can query status inside the
     * implementation of this method to determine whether this task
     * has been cancelled.
     */
    protected void done() { }

通过注释,done方法是任务状态变为isdone时调用,该任务可能正常执行,也可能取消了。其实FutureTask应该是弥补Thread的不足出现的,比如一个异步线程我们如果想拿到它的返回值,通常有两种方法,一种是使用另一个线程轮询这个线程是否执行完毕,第二个就是使用回调,但是使用FutureTask就很简单,调用get方法即可。

我们发现这个类实现了一个叫做RunnableFuture的接口,我暂且不看RunnableFuture接口,先看看FutrueTask的几个构造函数吧

/**
     * Creates a <tt>FutureTask</tt> that will upon running, execute the
     * given <tt>Callable</tt>.
     *
     * @param  callable the callable task
     * @throws NullPointerException if callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    /**
     * Creates a <tt>FutureTask</tt> that will upon running, execute the
     * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
     * given result on successful completion.
     *
     * @param  runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don‘t need a particular result, consider using
     * constructions of the form:
     * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt>
     * @throws NullPointerException if runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

主要有两个构造函数,一个传入的是Callable,另一个传入的是Runnable,


下面继承看RunnableFuture这个接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}


这个接口继承了Runnable和Future接口,也就是说FutureTask这个类具备了线程和Future两者的特性。

上面是jdk中和线程池关系比较紧密的几个接口和类,下面我就来看看jdk中的一个具体的线程池类ThreadPoolExecuter,这个类实现了ExecuteService接口,并且在Android3.0系统之前实用的的就是这个线程池


先看看ThreadPoolExecuter的构造函数的签名:(还有其他的构造函数,这里我那最简单的构造函数说明)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 

简单分析一下几个参数的意义:

corePoolSize:池中所保存的线程数,包括空闲线程

maximumPoolSize:池中允许的最大线程数

keepAliveTime:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间

unit:keepAliveTime 参数的时间单位

workQueue:执行前用于保持任务的队列

这里我们要重点理解corePoolSize,maximumPoolSize,workQueue这三个参数的意义。


当我们向线程池中添加一个任务时

1、如果线程数<corePoolSize,那么创建一个线程,不管此时是否有线程空闲

2、如果线程数=corePoolSize,如果有空闲线程,那么空闲线程处理加入的任务,如果没有空闲线程,那么加入workQueue中。只到workQueue线程已经满了,才会创建新的线程来处理新加入的任务,如果此时创建的线程数超过了maximumPoolSize,

那么就会抛RejectedExecutionException异常。

3、如果线程数>corePoolSize时,那么说明workQueue已经满了。

通过以上的描述,说明如果workQueue是一个无界队列,那么maximumPoolSize就没有意义了。


好了,关于线程池的分析就到这里,我们开始看看AsyncTask的源码吧。

先看看它的一些比较重要的属性吧

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAXIMUM_POOL_SIZE = 128;
    private static final int KEEP_ALIVE = 10;

    private static final BlockingQueue<Runnable> sWorkQueue =
            new LinkedBlockingQueue<Runnable>(10);

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final ThreadPoolExecutor sExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
            MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

结合上面介绍的线程池相关知识,可以知道如下几点:

1、AsyncTask中的线程池,保存的线程数量是5个,数量大于5个的线程如果空闲10s中就会销毁掉。

2、最大的线程数是128个,任务队列的容量是10,按照前面的分析,最多可以添加128+10个任务,加入139个任务时,程序就会崩溃

3、线程池是 static的,也就是说所有的AsyncTask公用一个线程池(一个应用之类的AsyncTask)。

下面看看AsyncTask的构造方法


 public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                return doInBackground(mParams);
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                Message message;
                Result result = null;

                try {
                    result = get();
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occured while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    message = sHandler.obtainMessage(MESSAGE_POST_CANCEL,
                            new AsyncTaskResult<Result>(AsyncTask.this, (Result[]) null));
                    message.sendToTarget();
                    return;
                } catch (Throwable t) {
                    throw new RuntimeException("An error occured while executing "
                            + "doInBackground()", t);
                }

                message = sHandler.obtainMessage(MESSAGE_POST_RESULT,
                        new AsyncTaskResult<Result>(AsyncTask.this, result));
                message.sendToTarget();
            }
        };
    }

这里出现了一个WorkerRunnable和FutureTask类型,其中FutureTask类型前面已经介绍过,我们先看看看WorkerRunnable是什么类型

  private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }

这里使用到了泛型,Params,Result是什么意思相信用过AsyncTask都不陌生,不熟悉的可以查一下,还记得前面我说过什么吗,Callable就相当于Runnable,所以我们这里就把WorkRunnable当作一个线程吧,在AsyncTask的构造函数中,定义了一个匿名内部类,并改写了call方法(可以理解改写了run方法),在call方法里,就是调用了doInBackground方法,原来AsyncTask方法的doInBackground是在这里调用的,现在明白为什么doInBackgound是在后台线程执行的吧。


接下来使用mWorker为参数创建了一个FutureTask对象,同样改写了done方法,前面我们已经介绍了done方法是在任务状态变为isdone时调用的,在done方法里面,我们会通过Future的get方法拿到结果。然后通过Handler将结果发送到UI线程处理,这里Handler处理的消息可能是MESSAGE_POST_CANCEL(取消),也可能是MESSAGE_POST_RESULT(正常完毕)。其中Message的obj是一个AsyncTaskResult类型,这个类型就是包装了当前AsyncTask和返回结果的。


我们启动一个AsyncTask就是调用execute方法,那么我们看看execute方法做了什么吧

    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        sExecutor.execute(mFuture);

        return this;
    }

mStatus默认值就是Status.PENDING,如果mStatus不等于这个值,那么就会抛出异常,如果等于这个值,那么就会将mStatus变为Status.RUNNING。然后调用onPreExecute方法,由于execute通常是在主线程执行,所以onPreExecute就是在UI线程中调用的的。onPreExecute执行完毕后将mFuture加入到了线程池,

加入线程池,那么其实调用的就是mWorker中的call方法,其实调用的就是doInBackground方法,当任务状态变为isdone时,就会向Handler发送消息,这个Handler是InternalHandler类型

    private static class InternalHandler extends Handler {
        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult result = (AsyncTaskResult) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
                case MESSAGE_POST_CANCEL:
                    result.mTask.onCancelled();
                    break;
            }
        }
    }

当消息是MESSAGE_POST_RESULT时,那么就说明正常返回结果,那么就调用当前AsyncTask的finish方法

  private void finish(Result result) {
        if (isCancelled()) result = null;
        onPostExecute(result);
        mStatus = Status.FINISHED;
    }
在finish中如果AsycnTask已经取消了,那么将结果设置为null,否则调用onPostExecute并将mStatus设置为FINISH。所以一个AsyncTask只能被执行一次,相同的任务需要多执行多次时,只能创建多次AsyncTask。


当消息是MESSAGE_POST_CANCEL时,就是调用了AsyncTask的cancel方法时,会出现的情况。MESSAGE_POST_PROGRESS消息大家可以自己分析。


到这里2.2中AsyncTask的调度过程也算是分析完了,下面分析4.1中AsyncTask的不同点,其中最大的不同点就是线程池的不一样

     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    /**
     * An {@link Executor} that executes tasks one at a time in serial
     * order.  This serialization is global to a particular process.
     */
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

这个有两个线程池THREAD_POOL_EXECUTOR和2.2中是一样的,这个多了一个SERIAL_EXECUTOR线程池。

我们看看execute方法到底用的是哪个线程池

  public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

在4.1版本中,execute调用的其实是executeOnExecutor方法完成,并将sDefaultExecutor传递进来了

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

到这里其实我们就知道使用的是SERIAL_EXECUTOR线程池了,这个线程池和之前的线程池有什么区别吗,我们看看源码 就知道了

    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

看了源码之后,你可以看到其实SERIAL_EXECUTOR并不是一个线程池,它仅仅实现了Executor接口然后改写了execute方法。在execute方法中将任务队列加入到了ArrayDeque中保存,这个队列是没有容量限制的,加入任务队列后,如果mActive为null,那么就会执行scheduleNext方法,这个方法就是从ArrayDeque中取出一个任务然后放入到THREAD_POOL_EXECUTOR这个线程池了。

所以最大的改变也很容易看出来就是限制了向线程池THREAD_POOL_EXECUTOR中加入任务的速度,只能上个任务执行完毕后才能加入下一个任务。所以相当于是一个线程在工作


所以在3.0后的版本中,我们通常不调用execute方法,而是调用executeonExecutor方法,自己定义自己的线程池,然后传递进入。




AsyncTask中线程池调度分析