首页 > 代码库 > Java并发基础(六) - 线程池

Java并发基础(六) - 线程池

Java并发基础(六) - 线程池

1. 概述


这里讲一下Java并发编程的线程池的原理及其实现

2. 线程池的基本用法


2.1 线程池的处理流程图
该图来自《Java并发编程的艺术》:
技术分享

从图中我们可以看出当一个新任务到线程池时,线程池的处理流程如下:

  1. 线程池首先判断线程池里面线程数是否达到核心线程数。如果不是则直接创建新线程作为核心线程来执行该任务(该线程作为核心线程不会由于任务的完成而销毁),否则进入下一流程。
  2. 判断阻塞队列是否已经满了。如果没满则将该任务放入阻塞队列中,等待核心线程处理,否则进入下一流程。
  3. 判断线程池中线程是否到达最大线程数,如果没有到达则创建新线程执行该任务(该线程会由于任务的完成而销毁),否则进入会按照该线程池的饱和(拒绝)策略处理该任务。

2.2 线程池的执行示意图
该图是Java并发包中ThreadPoolExecutor的执行示意图,图来自《Java并发编程的艺术》:
技术分享

从图中我们可以看出来ThreadPoolExecutor执行任务时的处理流程基本如上所诉,每一步的处理情况如下所诉:

  1. 如果当前运行的线程数小于corePoolSize(核心线程数),则会创建新线程作为核心线程来执行任务(注意,执行这一步需要获取全局锁)。
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue中。
  3. 如果无法将任务加入BlockingQueue(队列已满),则将创建线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  4. 如果第三步创建线程使得当前线程数大于maximumPoolSize,任务将被拒绝,并调用RejectdExecutionHandler.rejectedExecution()方法。

2.3 Java ThreadPoolExecutor类
首先来看ThreadPoolExecutor的构造函数的几个参数:

  • corePoolSize - 池中所保存的线程数,包括空闲线程。
  • maximumPoolSize - 池中允许的最大线程数。
  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit - keepAliveTime 参数的时间单位。
  • workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
  • threadFactory - 执行程序创建新线程时使用的工厂。
  • handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

我们由线程池的执行示意图可以看出来,当任务队列满了并且线程池中线程数已经到了线程池容许的最大线程数后,将会有饱和(拒绝)策略来处理该任务。下面是Java线程池提供的4种饱和(拒绝)策略(当然你也可以自定义的自己的饱和策略),注:自己有2个策略也不是太懂,先写在这里:

  • ThreadPoolExecutor.AbortPolicy:不执行该任务,并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,如果执行程序已关闭,则会丢弃该任务。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最老的一个任务,然后重新尝试执行任务(重复此过程。
  • ThreadPoolExecutor.DiscardPolicy:不执行该任务,也不抛异常。

3. 线程池的实现原理


注:这里使用了ThreadPoolExecutor源码(JDK 1.6)来说明Java线程池的实现原理。自己一些具体的细节也不是太懂,如:线程池的几个状态。自己在这里只是说个线程池的基本处理流程。

3.1 线程池的几个状态

    /**
     * runState provides the main lifecyle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don‘t accept new tasks, but process queued tasks
     *   STOP:     Don‘t accept new tasks, don‘t process queued tasks,
     *             and interrupt in-progress tasks
     *   TERMINATED: Same as STOP, plus all threads have terminated
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TERMINATED
     *    When both queue and pool are empty
     * STOP -> TERMINATED
     *    When pool is empty
     */
    volatile int runState;
    static final int RUNNING    = 0;
    static final int SHUTDOWN   = 1;
    static final int STOP       = 2;
    static final int TERMINATED = 3;

线程池的几个状态具体表示注释中说的挺明白。

3.2 ThreadPoolExecutor中一些重要的字段
注:我把JDK 1.6中该类的字段的注释保留了,这样能够更好的理解。

    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  Note that when using this queue, we do not require
     * that workQueue.poll() returning null necessarily means that
     * workQueue.isEmpty(), so must sometimes check both. This
     * accommodates special-purpose queues such as DelayQueues for
     * which poll() is allowed to return null even if it may later
     * return non-null when delays expire.
     * 阻塞队列:用于存放执行的任务。但可能不存储任务,
     * 仅仅作为线程间通信使用,如:Synchronous
     */
    private final BlockingQueue workQueue;

    /**
     * Lock held on updates to poolSize, corePoolSize,
     * maximumPoolSize, runState, and workers set.
     * 全局锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     * 存放执行任务的Set
     */
    private final HashSet workers = new HashSet();

    /**
     * Core pool size, updated only while holding mainLock, but
     * volatile to allow concurrent readability even during updates.
     * 核心线程数
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size, updated only while holding mainLock but
     * volatile to allow concurrent readability even during updates.
     * 最大线程数
     */
    private volatile int maximumPoolSize;

    /**
     * Current pool size, updated only while holding mainLock but
     * volatile to allow concurrent readability even during updates.
     * 当前线程数
     */
    private volatile int poolSize;

3.3 ThreadPoolExecutor执行任务
注:这里使用的execute()方法来说明执行Runnable任务,还有submit()方法执行Callable任务,具体可以看源码:

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current RejectedExecutionHandler.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     * RejectedExecutionHandler, if task cannot be accepted
     * for execution
     * @throws NullPointerException if command is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 如果当前线程数小于核心线程数,则创建线程并执行该任务
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            // 如果当前线程数大于核心线程数,则将该任务插入任务队列等待核心线程执行
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            // 如果当前线程数大于核心数线程数并且该任务插入任务队列失败,
            // 则将会判断当前线程数是否小于最大线程数,如果小于则创建线程
            // 并执行该任务,否则使用饱和(任务)策略执行该任务
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

    /**
     * Creates and starts a new thread running firstTask as its first
     * task, only if fewer than corePoolSize threads are running
     * and the pool is not shut down.
     * @param firstTask the task the new thread should run first (or
     * null if none)
     * @return true if successful
     * 仅仅当前线程数小于核心线程数并且线程池状态为运行状态时,
     * 创建一个线程作为核心线程并执行该任务。
     */
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                // 创建新线程并执行该任务
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

    /**
     * Creates and starts a new thread running firstTask as its first
     * task, only if fewer than maximumPoolSize threads are running
     * and pool is not shut down.
     * @param firstTask the task the new thread should run first (or
     * null if none)
     * 仅仅当前线程数小于最大线程数并且线程池状态为运行状态时,
     * 创建一个新线程并执行该任务。
     */
    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                // 创建新线程并执行该任务
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

上面的execute()方法不太好理解,需要结合线程池的执行方式并且理解上面其他两个方法的作用进行理解,还有就是上面两个方法在创建线程时都加锁了的。注:自己想不明白为何这样做,这样做仅仅使得代码看起来简洁,但是不太好理解其逻辑。

下面的方法时上面两个方法的共有方法,该方法的意义是:创建新线程并执行该任务

    /**
     * Creates and returns a new thread running firstTask as its first
     * task. Call only while holding mainLock.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none)
     * @return the new thread, or null if threadFactory fails to create thread
     */
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        // 调用线程工厂创建线线程
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            if (t.isAlive()) // precheck that t is startable
                throw new IllegalThreadStateException();
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                // 运行该线程执行任务
                t.start();
                workerStarted = true;
            }
            finally {
                if (!workerStarted)
                    workers.remove(w);
            }
        }
        return t;
    }

3.4 Worker类
Worker类作为工作线程的任务类,主要是执行任务,该Worker会循环获取任务队列中的任务来执行。注:该类自己有许多不太明白的地方,如shutdown、interrupt等等,所以主要讲执行逻辑。

private final class Worker implements Runnable {
}

Worker类的run()方法,该方法首先执行初始任务,然后通过getTask()方法获取任务:

    /**
     * Main run loop
     */
    public void run() {
        try {
            hasRun = true;
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            // 如果获得任务为空,则销毁该工作线程,具体可见源码,这里就没有列出来。
            workerDone(this);
        }
    }

getTask()方法:

    /**
     * Gets the next task for a worker thread to run.  The general
     * approach is similar to execute() in that worker threads trying
     * to get a task to run do so on the basis of prevailing state
     * accessed outside of locks.  This may cause them to choose the
     * "wrong" action, such as trying to exit because no tasks
     * appear to be available, or entering a take when the pool is in
     * the process of being shut down.  These potential problems are
     * countered by (1) rechecking pool state (in workerCanExit)
     * before giving up, and (2) interrupting other workers upon
     * shutdown, so they can recheck state. All other user-based state
     * changes (to allowCoreThreadTimeOut etc) are OK even when
     * performed asynchronously wrt getTask.
     *
     * @return the task
     */
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                // 如果当前线程数大于核心线程数或者允许为核心池线程设置空闲时间
                // 将会通过poll(long time,TimeUtile util)方法超时等待任务
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                // 如果当前线程池数小于或等于核心线程数,该线程就会作为核心线程
                // 将会阻塞等待下去,直到任务队列中有任务。
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

从上面我们基本可以看出线程池的执行整个过程。

3.5 初始化线程池
默认情况下,创建线程池后线程池中没有工作线程,需要等任务提交后才会创建线程。

实际使用中,ThreadPoolExecutor提供了两个方法创建线程。

  • prestartCoreThread():初始化一个核心线程
  • prestartAllCoreThreads():初始化所有核心线程

代码如下:

    public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }

    public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))
            ++n;
        return n;
    }

3.6 关闭线程池
首先来看ThreadPoolExecutor关闭线程池的API:

  • shutdown():线程池停止接受新任务,已经接受的任务会继续执行下去。
  • shutdownNow():线程池停止接受新任务,已经接受的任务也会停止执行。

关闭线程池的原理(注:自己不是太懂其原理,所以直接使用的《Java并发编程实战》中关于关闭线程池的说法):

Java线程池中使用shutdown()、shutdownNow()方法来关闭线程池。它们的原理普遍是遍历线程池中的工作线程,然后逐个调用线程的interrupt()方法来中断线程。shutdownNow()方法首先将线程池状态设置为STOP,然后尝试停止所有正在执行或暂停任务的线程,并放回等待执行任务的列表。而shutdown()方法只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

3.7 总结
从上面ThreadPoolExecutor的实现代码中我们可以总结出:

  • 如果线程池中的当前线程数小于核心线程数(corePoolSize),则每来一个任务,就会创建一个线程去执行这个任务。该线程之后会作为核心线程,从任务队列中拿任务继续执行下去(在这里可能会被任务队列阻塞)。
  • 如果线程池中的当前线程数大于核心线程数(corePoolSize),则每来一个任务会尝试把其添加到任务队列中,如果添加成功则会等待核心线程来执行。
  • 如果添加任务队列失败,则会判断当前线程数是否大于最大线程数。如果当前线程数小于最大线程数,则会创建新线程来执行该任务。该新建线程执行任务完成后会继续从任务队列中获取任务来执行,只是这一次通过poll(keepAliveTime, TimeUnit)的方法超时获取任务,如果没有在规定的时间内(keepAliveTime)获取则会销毁该线程。
  • 如果当前线程数大于最大线程数,则会使用饱和(拒绝)策略来执行该任务。

3.8 问题
在上面的代码中我发现在创建线程执行任务会加锁,也就是addIfUnderCorePoolSize()和addIfUnderMaximumPoolSize()方法里面的调用的addThread()方法,该方法会创建线程并执行它。但是,其执行过程会在加锁过程中,这样就造成了加锁时间过长。如下代码:

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }
    
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            if (t.isAlive()) // precheck that t is startable
                throw new IllegalThreadStateException();
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start();
                workerStarted = true;
            }
            finally {
                if (!workerStarted)
                    workers.remove(w);
            }
        }
        return t;
    }

这不是最关键的,最关键的是我在深入理解Java之线程池这篇博客看它的代码是这样的(这样的话就没有在执行任务时继续加锁了):

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //创建线程去执行firstTask任务   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务   
    if (t != null) {
        w.thread = t;            //将创建的线程的引用赋值为w的成员变量       
        workers.add(w);
        int nt = ++poolSize;     //当前线程数加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

然后他说他参考的也是JDK 1.6,然后我就蒙了,难道我的是假的JDK 1.6 ..............^ - ^

4. 线程池(ThreadPoolExecutor)的扩展


我们在使用ThreadPoolExecutor时可以通过以下API来监控线程池:

  • getTaskCount(): 返回线程池需要执行的任务数量
  • getCompletedTaskCount(): 返回线程池运行过程中已经完成的任务数量
  • getLargestPoolSize(): 返回线程池曾经创建过的最大线程数量。通过这个数据我们可以知道线程是曾经是否满过。如果该数值等于线程池的最大大小,则表示该线程池曾经满过。
  • getPoolSize():返回线程池线程数目。
  • getActiveCount():返回线程池活动的线程数。

然后还可以通过继承ThreadPoolExecutor类,通过重写beforeExecute()、afterExecute()等方法来自定义线程池。如下是JDK 1.6 API Docs中扩展示例,该示例添加了一个简单的暂停/恢复功能的子类:

public class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private ReentrantLock pauseLock  = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();
    private boolean isPaused;

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            pauseLock.unlock();
        }
    }

    /**
     * 暂停
     */
    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    /**
     * 恢复
     */
    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}

该类能够正确执行是因为工作线程(Worker)执行任务时,会调用beforeExecute(thread, task)方法,具体代码如下(JDK 1.6中ThreadPoolExecutor的Worker内部类的runTask()方法):

    /**
     * Runs a single task between before/after methods.
     */
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if ((runState >= STOP || (Thread.interrupted() && runState >= STOP)) && hasRun)
                thread.interrupt();
            boolean ran = false;
            // 这里调用了beforeExecute()方法
            beforeExecute(thread, task);
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    // 这里调用了afterExecute()方法
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }

5. 应用


自己依照该线程池的逻辑写的线程池

6. References


《Java并发编程的艺术》
深入理解Java之线程池

Java并发基础(六) - 线程池