首页 > 代码库 > ThreadPoolExecutor 分析

ThreadPoolExecutor 分析

一。从用法入手

Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.

These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute 

will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to

the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains

idle for long enough will not consume any resources. 

    public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }

2.

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads 

threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until

a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if

needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }

 

 

二。构造函数

    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException();
this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

corePoolSize:正在运行的线程数

maximumPoolSize:当大于了这个值就会将Thread由一个丢弃处理机制来处理

workQueue:工作队列

keepAliveTime:当线程没有任务处理后,保持多长时间

threadFactory:实现newThread方法即可

handler:达到maximumPoolSize后处理的方法,实现RejectedExecutionHandler接口:

1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程

2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。

3、DiscardPolicy:什么也不做

4、AbortPolicy:抛出一个异常:RejectedExecutionException。

 

三。

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task

cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the

task is handled by the currentRejectedExecutionHandler.

    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {      //当前线程数大于corePoolSize时进入            if (runState == RUNNING && workQueue.offer(command)) {               //运行状态 且 成功在队列末尾插入元素                if (runState != RUNNING || poolSize == 0)                    ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }

1.

poolSize >= corePoolSize 或 线程池已经不是在running状态的时候,没有成功创建Thread才会返回false

注意:这里在再一次比较了poolSize 和 corePoolSize

第一次是初步判定,第二次是加锁后判定的,以得到更为准确的结果。如果 poolSize >= corePoolSize ,就没必要进入addIfUnderCorePoolSize

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < corePoolSize && runState == RUNNING)                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        if (t == null)            return false;        t.start();        return true;    }

2.

    private Thread addThread(Runnable firstTask) {        Worker w = new Worker(firstTask);        Thread t = threadFactory.newThread(w);        if (t != null) {            w.thread = t;            workers.add(w);            int nt = ++poolSize;            if (nt > largestPoolSize)                largestPoolSize = nt;        }        return t;    }

3.

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < maximumPoolSize && runState == RUNNING)                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        if (t == null)            return false;        t.start();        return true;    }

 

ThreadPoolExecutor 分析