首页 > 代码库 > 线程池之ThreadPoolExecutor

线程池之ThreadPoolExecutor

所属包:

java.util.concurrent.ThreadPoolExecutor

 类关系:

public class ThreadPoolExecutor extends AbstractExecutorService

1. 继承关系

ThreadPoolExecutor 继承了一个抽象类:AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService

 

而这个AbstractExecutorService实现了一个接口:ExecutorService

public interface ExecutorService extends Executor

 

这个ExecutorService接口又继承了一个类:Executor

public interface Executor

 

可以看出:

Executor是一个顶层接口,它的子接口ExecutorService继承了它(其实还有一个子接口: ScheduledExecutorService),抽象类AbstractExecutorService实现了这个子接口ExecutorService,最终ThreadPoolExecutor 继承了抽象类AbstractExecutorService并且同时实现了子接口ExecutorService。

 

2. 构造方法 

最简单的一个构造方法:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

 

有五个参数:

corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
maximumPoolSize - 池中允许的最大线程数 
keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。 
unit - keepAliveTime参数的时间单位 
workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务

 

 实际上它调用了同类的另外一个“全能”构造方法,通过 this() 的形式,最后两个参数用的默认值

另外一个构造方法:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

 

参数:

corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
maximumPoolSize - 池中允许的最大线程数 
keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。 
unit - keepAliveTime参数的时间单位 
workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。 
threadFactory - 执行程序创建新线程时使用的工厂 
handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量 

3. 如何使用

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor {

    /**
     * 使用有界队列:
     *  1、当线程数小于corePoolSize时,创建线程执行任务。
     *  2、当线程数大于等于corePoolSize并且workQueue没有满时,放入workQueue中
     *  3、线程数大于等于corePoolSize并且当workQueue满时,新任务新建线程运行,线程总数要小于maximumPoolSize
     *  4、当线程总数等于maximumPoolSize并且workQueue满了的时候执行handler的rejectedExecution。也就是拒绝策略。
     * 
     * ThreadPoolExecutor默认有四个拒绝策略:
     *  1、ThreadPoolExecutor.AbortPolicy()           直接抛出异常RejectedExecutionException
     *  2、ThreadPoolExecutor.CallerRunsPolicy()        直接调用run方法并且阻塞执行
     *  3、ThreadPoolExecutor.DiscardPolicy()           直接丢弃后来的任务
     *  4、ThreadPoolExecutor.DiscardOldestPolicy()    丢弃在队列中队首的任务
     *  
     * 
     *  
     */
    
    public static void main(String[] args) {
        
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,
                2,
                0L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3));
        
        Runnable[] runs = new Runnable[6];
        for (int i = 0; i < runs.length; i++) {
            runs[i] = new MyTask(i);
        }
        
        pool.execute(runs[0]);    //线程1个,队列0个
        pool.execute(runs[1]);    //线程1个,队列1个
        pool.execute(runs[2]);    //线程1个,队列2个
        pool.execute(runs[3]);    //线程1个,队列3个
        pool.execute(runs[4]);    //线程2个,队列3个
        pool.execute(runs[5]);    //线程2个,队列3个,拒绝第六个
        
        pool.shutdown();
        
    }
}

 

线程:

public class MyTask implements Runnable {

    private int id;
    
    public MyTask(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task-" + id);
    }

}

 

这个就是最简单的一个使用方法了。ps:最后那个(线程1个,队列0个....)指的是,你仅仅执行runs[0];runs[0]+runs[1];runs[0]+runs[1]+runs[2];....的时候,任务被放到哪里。

但是推荐使用这种方式创建线程池:

package cn.ying.thread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MyTest {
    
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 10; i++) {
            pool.execute(new MyTask(i));
        }
        pool.shutdown();
    }

    public void note(){
        Executors.newCachedThreadPool();    //无界线程池,可以进行自动线程回收
//        public static ExecutorService newCachedThreadPool() {
//            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
//                                          60L, TimeUnit.SECONDS,
//                                          new SynchronousQueue<Runnable>());//SynchronousQueue:长度为1的队列
//        }
        Executors.newFixedThreadPool(10);    //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
//        public static ExecutorService newFixedThreadPool(int nThreads) {
//            return new ThreadPoolExecutor(nThreads, nThreads,
//                                          0L, TimeUnit.MILLISECONDS,
//                                          new LinkedBlockingQueue<Runnable>());
//        }
        Executors.newScheduledThreadPool(10);    //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行
//        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
//            return new ScheduledThreadPoolExecutor(corePoolSize);
//        }
//        public ScheduledThreadPoolExecutor(int corePoolSize) {
//            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
//                  new DelayedWorkQueue());
//        }
        Executors.newSingleThreadExecutor();    //创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程
//        public static ExecutorService newSingleThreadExecutor() {
//            return new FinalizableDelegatedExecutorService
//                (new ThreadPoolExecutor(1, 1,
//                                        0L, TimeUnit.MILLISECONDS,
//                                        new LinkedBlockingQueue<Runnable>()));
//        }
        
    }
}

 

main方法里面就是相关代码,而note方法则是简单介绍几种方法,注释部分是源码。

 

就先写这么多吧,以后再补充

线程池之ThreadPoolExecutor