首页 > 代码库 > ThreadPoolExecutor参数解析

ThreadPoolExecutor参数解析

ThreadPoolExecutor是一个非常重要的类,用来构建带有线程池的任务执行器,通过配置不同的参数来构造具有不同规格线程池的任务执行器

写在前面的是:

线程池任务执行器,线程池的定义比较直接,可以看做多个线程的集合。而任务执行器的概念比较的具有针对性,它用来执行任务,通过对线程池的管理实现多任务的并发,是线程池的载体。

线程和任务的区别,线程不是任务,线程是用来执行任务的。

队列是用来存放任务的,不是用来存放线程的。

(37JUW4~P@]KZ]FYI@8@8`B

主要的几个参数解析:

  • 核心线程数(core pool sizes)和最大线程数(maxmum  pool sizes)

一开始两者的存在很让人摸不着头脑,简单的想法是用一个线程数(pool size)表示线程池的大小不就完了吗,不到规定的线程数就创建新的线程来执行新的任务,到了规定的线程数就等待其他线程处理完成,怎么还出现两个控制线程数的参数?

那这两个参数是什么意思干什么用的?

核心线程数这个数与上面那个简单想法中的数有一个共同点,就是如果当前线程数达不到核心线程数时,不会使用已有的空闲的线程(如果有的话),来了新任务就会创建新的线程

如果当前线程数达到核心线程数,而且没有空闲线程,那么来了新任务是否要创建新的线程呢?这取决于两点:

  1. 当前的任务队列是否已满。
  2. 线程池的最大线程数。

通过这个问题可以引出最大线程数的概念

最大线程数 : 最大线程数是和任务队列匹配使用的,确切的说是和有长度限制的任务队列(即有界任务队列)匹配使用的。

补充回答上面的问题,ThreadPoolExecutor的线程池拥有一个任务队列,这个任务队列只有在当前线程数>核心线程数的时候才开始使用,如果该线程池使用的任务队列是有界队列,比如10,那么当该队列被新任务填满时也就是说队列中有10个新任务时ThreadPoolExecutor才会创建一个新的线程来执行队列中的一个任务,如果再发生队列被填满,而且依旧没有空闲线程时ThreadPoolExecutor再次创建新的线程,一旦线程的数量等于最大线程数就不再创建新的线程了,如果此时队列中还有10个任务,那么新来的任务就会被拒绝(reject)。

上述是针对有界队列,如果这个任务执行器的队列是无界队列呢?

由于无界队列不会被填满,所以永远不能达到创建新线程所需要的条件,所以也就不会有新线程被创建,所以最大线程数在这种情况下也就失去了其存在的意义。

  • 线程空闲存活时间(keepAliveTime)

在介绍上面的核心线程数和最大线程数时有提到空闲的线程,所谓空闲的线程就是执行完任务之后闲着的线程。

超过这个时间会使得那么核心线程之外的空闲线程被杀死,如果想把这个时间也作用在核心线程上需要设置allowCoreThreadTimeOut(boolean)为true

这里有必要说一下的是,任务执行器如何实现线程的重复利用,当任务执行器执行execute(task)的时候会创建一个worker,它是一个Runnable类,可以看做task的载体,worker包含一个thread对象,这个thread启动的时候执行worker本身的run方法,这样worker和线程就融为一体。当worker的thread start的时候,就会执行worker的run方法,而worker的run会调用任务执行器的runWorker(worker),并将自身传递过去,意思是任务执行器启动了一个worker,而线程重复利用关键就在runWorker中,在启动了一个worker后,worker会从任务执行器中寻找可以运行的任务,而一开始创建worker使用的task就是它的第一个任务。

 

下面是jdk1.7的源码

//执行一个任务 taskpublic void execute(Runnable command) {         if (command == null)             throw new NullPointerException();         int c = ctl.get();         if (workerCountOf(c) < corePoolSize) {             if (addWorker(command, true)) // 将task装配到一个worker中                 return;             c = ctl.get();         }         if (isRunning(c) && workQueue.offer(command)) {             int recheck = ctl.get();             if (! isRunning(recheck) && remove(command))                 reject(command);             else if (workerCountOf(recheck) == 0)                 addWorker(null, false);         }         else if (!addWorker(command, false))             reject(command);     }

 

添加一个worker 

private boolean addWorker(Runnable firstTask, boolean core) {         retry:         for (;;) {             int c = ctl.get();             int rs = runStateOf(c);            // Check if queue empty only if necessary.             if (rs >= SHUTDOWN &&                 ! (rs == SHUTDOWN &&                    firstTask == null &&                    ! workQueue.isEmpty()))                 return false;            for (;;) {                 int wc = workerCountOf(c);                 if (wc >= CAPACITY ||                     wc >= (core ? corePoolSize : maximumPoolSize))                     return false;                 if (compareAndIncrementWorkerCount(c))                     break retry;                 c = ctl.get();  // Re-read ctl                 if (runStateOf(c) != rs)                     continue retry;                 // else CAS failed due to workerCount change; retry inner loop             }         }        boolean workerStarted = false;         boolean workerAdded = false;         Worker w = null;         try {             final ReentrantLock mainLock = this.mainLock;             w = new Worker(firstTask);             final Thread t = w.thread;             if (t != null) {                 mainLock.lock();                 try {                     // Recheck while holding lock.                     // Back out on ThreadFactory failure or if                     // shut down before lock acquired.                     int c = ctl.get();                     int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                         (rs == SHUTDOWN && firstTask == null)) {                         if (t.isAlive()) // precheck that t is startable                             throw new IllegalThreadStateException();                         workers.add(w);                         int s = workers.size();                         if (s > largestPoolSize)                             largestPoolSize = s;                         workerAdded = true;                     }                 } finally {                     mainLock.unlock();                 }                 if (workerAdded) {//如果worker创建成功,就启动它的对应的thread                     t.start(); //worker中的tread启动                     workerStarted = true;                 }             }         } finally {             if (! workerStarted)                 addWorkerFailed(w);         }         return workerStarted;     }
运行worker
final void runWorker(Worker w) {         Thread wt = Thread.currentThread();         Runnable task = w.firstTask;         w.firstTask = null;         w.unlock(); // allow interrupts         boolean completedAbruptly = true;         try {             while (task != null || (task = getTask()) != null) {//这里是关键,使用一个while来寻找任务执行器中(主要还是从任务队列中获取)还未执行的task。                 w.lock();                 // If pool is stopping, ensure thread is interrupted;                 // if not, ensure thread is not interrupted.  This                 // requires a recheck in second case to deal with                 // shutdownNow race while clearing interrupt                 if ((runStateAtLeast(ctl.get(), STOP) ||                      (Thread.interrupted() &&                       runStateAtLeast(ctl.get(), STOP))) &&                     !wt.isInterrupted())                     wt.interrupt();                 try {                     beforeExecute(wt, task);                     Throwable thrown = null;                     try {                         task.run();                     } catch (RuntimeException x) {                         thrown = x; throw x;                     } catch (Error x) {                         thrown = x; throw x;                     } catch (Throwable x) {                         thrown = x; throw new Error(x);                     } finally {                         afterExecute(task, thrown);                     }                 } finally {                     task = null;                     w.completedTasks++;                     w.unlock();                 }             }             completedAbruptly = false;         } finally {             processWorkerExit(w, completedAbruptly);         }     }

  

 

 

ThreadPoolExecutor参数解析