首页 > 代码库 > 几种线程池的实现分析(转)

几种线程池的实现分析(转)

1. 前言

在阅读研究线程池的源码之前,一直感觉线程池是一个框架中最高深的技术。研究后才发现,线程池的实现是如此精巧。本文从技术角度分析了线程池的本质原理和组成,同时分析了JDK、Jetty6、Jetty8、Tomcat的源码实现,对于想了解线程池本质、更好的使用线程池或者定制实现自己的线程池的业务场景具有一定指导意义。

2. 使用线程池的意义

  • 复用:类似WEB服务器等系统,长期来看内部需要使用大量的线程处理请求,而单次请求响应时间通常比较短,此时Java基于操作系统的本地调用方式大量的创建和销毁线程本身会成为系统的一个性能瓶颈和资源浪费。若使用线程池技术可以实现工作线程的复用,即一个工作线程创建和销毁的生命周期期间内可以执行处理多个任务,从而总体上降低线程创建和销毁的频率和时间,提升了系统性能。
  • 流控:服务器资源有限,超过服务器性能的过高并发设置反而成为系统的负担,造成CPU大量耗费于上下文切换、内存溢出等后果。通过线程池技术可以控制系统最大并发数和最大处理任务量,从而很好的实现流控,保证系统不至于崩溃。
  • 功能:JDK的线程池实现的非常灵活,并提供了很多功能,一些场景基于功能的角度会选择使用线程池。

3. 线程池技术要点:

从内部实现上看,线程池技术可主要划分为如下6个要点实现:

 

图1线程池技术要点

  • 工作者线程worker:即线程池中可以重复利用起来执行任务的线程,一个worker的生命周期内会不停的处理多个业务job。线程池“复用”的本质就是复用一个worker去处理多个job,“流控“的本质就是通过对worker数量的控制实现并发数的控制。通过设置不同的参数来控制worker的数量可以实现线程池的容量伸缩从而实现复杂的业务需求
  • 待处理工作job的存储队列:工作者线程workers的数量是有限的,同一时间最多只能处理最多workers数量个job。对于来不及处理的job需要保存到等待队列里,空闲的工作者work会不停的读取空闲队列里的job进行处理。基于不同的队列实现,可以扩展出多种功能的线程池,如定制队列出队顺序实现带处理优先级的线程池、定制队列为阻塞有界队列实现可阻塞能力的线程池等。流控一方面通过控制worker数控制并发数和处理能力,一方面可基于队列控制线程池处理能力的上限。
  • 线程池初始化:即线程池参数的设定和多个工作者workers的初始化。通常有一开始就初始化指定数量的workers或者有请求时逐步初始化工作者两种方式。前者线程池启动初期响应会比较快但造成了空载时的少量性能浪费,后者是基于请求量灵活扩容但牺牲了线程池启动初期性能达不到最优。
  • 处理业务job算法:业务给线程池添加任务job时线程池的处理算法。有的线程池基于算法识别直接处理job还是增加工作者数处理job或者放入待处理队列,也有的线程池会直接将job放入待处理队列,等待工作者worker去取出执行。
  • workers的增减算法:业务线程数不是持久不变的,有高低峰期。线程池要有自己的算法根据业务请求频率高低调节自身工作者workers的数量来调节线程池大小,从而实现业务高峰期增加工作者数量提高响应速度,而业务低峰期减少工作者数来节省服务器资源。增加算法通常基于几个维度进行:待处理工作job数、线程池定义的最大最小工作者数、工作者闲置时间。

线程池终止逻辑:应用停止时线程池要有自身的停止逻辑,保证所有job都得到执行或者抛弃。

4. 几种线程池的实现细节

结合上面的技术点,列举几种线程池实现方式。

 
  • 工作者workers与待处理工作队列实现方式举例:

    实现

    工作者workers结构与并发保护

    待处理工作队列结构

    JDK

    使用了HashSet来存储工作者workers,通过可重入锁ReentrantLock对其进行并发保护。每个worker都是一个Runnable接口。

    使用了实现接口BlockingQueue的阻塞队列来存储待处理工作job,并把队列作为构造函数参数,从而实现业务可以灵活的扩展定制线程池的队列。业务也可使用JDK自身的同步阻塞队列SynchronousQueue、有界队列ArrayBlockingQueue、无界队列LinkedBlockingQueue、优先级队列PriorityBlockingQueue。

    Jetty6

    同样使用了HashSet存储工作者workers,通过synchronized一个对象进行HashSet的并发保护。每个工作者实际上是一个Thread的扩展。

    使用了数组存储待处理的job对象Runnable。数组初始化容量为_maxThreads个,使用变量_queued计算保存当前内部待处理job的个数即数组length。超过数组最大值时,扩大_maxThreads个容量,因此数组永远够用够大,容量无界。同样是用synchronized一个对象的方式实现同步。

    Jetty8

    使用了ConcurrentLinkedQueue存储工作者workers,利用JDK基于CSA算法的实现提高了并发效率,同时也降低了线程池并发保护的复杂程度。针对队列ConcurrentLinkedQueue无法保证size()实时性问题引入原子变量AtomicInteger统计工作者数量。

    与JDK相同实现,使用了基于接口BlockingQueue的阻塞队列来存储待处理工作job,也支持在线程池构造函数的参数中传入队列类型。同时,Jetty8内部默认未设置队列类型场景可自动设置使用2种队列:有界无法扩容的ArrayBlockingQueue及Jetty自身定制扩展实现的可扩容队列BlockingArrayQueue。

    Tomcat

    基于JDK的ThreadPoolExecutors实现,复用JDK业务

    复用JDK业务

  • 线程池初始化与处理业务job算法举例:

    实现

    线程池构造与工作者初始化

    处理业务job的算法

    JDK

    1. 基于多个构造参数实现灵活初始化,几个核心参数如下:

    corePoolSize:核心工作者数

    maximumPoolSize:最大工作者数

    keepAliveTime:超过核心工作者数时闲置工作者的存活时间。

    workQueue:待处理job队列,即前面提到的BlockingQueue接口。

    2. 默认初始化后不启动工作者,等待有请求时才启动。可以通过调用线程池接口提前启动核心工作数个工作者线程,也可以启动业务期望的多个工作者线程。

    1. 工作者workers数量低于核心工作者数corePoolSize时会优先创建一个工作者worker处理job,处理成功则返回。

    2. 工作者workers数量高于核心工作者数时会优先把job放入到待处理队列,放入队列成功时处理结束。

    3. 步骤2中入队失败会识别工作者数是否还小于最大工作者数maximumPoolsize,小于的话也会新创建一个工作者worker处理job。

    4. 拒绝处理

    Jetty6

    1. 同样支持设置多个参数:

    _spawnOrShrinkAt:扩容/缩容阀值

    _minThreads:最小工作者数

    _maxThreads:最大工作者数

    _maxIdleTimeMs:闲置工作者最大闲置超时时间

    2. 初始化后直接启动_minThreads个工作者线程

    1. 查找闲置的工作者worker,找到则派发job。

    2. 没有闲置的工作者,将job存入待处理数组。

    3. 当识别到数组中待处理job超过扩容阀值参数时,扩容增加工作者处理job

    4. 否则不处理

    Jetty8

    1. 配置参数类似Jetty6,去除了_spawnOrShrinkAt阀值参数。

    2. 初始化后直接启动_minThreads个工作者线程

    非常简单,直接将待处理job入队。

    Tomcat

    1. 基于JDK线程池的构造方法

    2. 来请求时启动工作者

    处理方法复用JDK的,但是在开始提交前扩展了JDK的功能,实现了可以统计提交数submittedCount的能力

  • 线程池工作者worker的增减机制举例:

    实现

    工作者增加算法

    工作者减少算法

    JDK

    1. 待处理job来时,工作者workers数量低于核心工作者数corePoolSize时。

    2. 待处理job来时,workers数超过核心数小于最大工作者数且入待处理队列失败场景。

    3. 业务调用线程池的更新核心工作者数接口时,若发现扩容,会增加工作者数。

    1. 待处理任务队列里没有job并且工作者workers数量超过了核心工作者数corePoolSize。

    2. 待处理任务队列里没有job并且允许工作者数量小于核心工作者参数为true,此场景会至少保留一个工作者线程。

    Jetty6

    1. 启动线程池时会启动_minThreads个工作者线程

    2. 待处理的job数量高于了阀值参数且工作者数没有达到最大值时会增加工作者。

    3. 调用线程池接口setMinThreads更新最小工作者数时会根据需要增加工作者。

    如下三个条件同时满足时会减少工作者:

    1. 待处理任务数组中没有待处理job

    2. 工作者workers数量超过了最小工作者数_minThreads

    3. 闲置工作者线程数高于了阀值参数

    Jetty8

    1. 启动线程池时启动最小工作者参数个工作者线程

    2. 已经没有闲置工作者或者闲置工作者的数量已经小于待处理的job的总数

    3. 调用线程池接口setMinThreads更新最小工作者数时

    如下三个条件同时满足时会减少工作者:

    1. 待处理任务队列里没有待处理的job

    2. 工作者workers总数超过了最小工作者参数配置_minThreads

    3. 工作者线程的闲置时间超时

    Tomcat

    同JDK增加工作者算法

    复用JDK减少算法,同时定制扩展延迟参数,超过参数时,直接抛出异常到外面来终止线程池工作者。

5. 小结

对比几种线程池实现,JDK的实现是最为灵活、功能最强且扩展性最好的,Tomcat即基于JDK线程池功能扩展实现,复用原有业务的同时扩充了自己的业务。Jetty6是完全自己定制的线程池业务,耦合线程池众多复杂的业务逻辑到线程池类里面,逻辑相对最为复杂,扩展性也非常差。Jetty8相对Jetty6的实现简化了很多,其中利用了JDK中的同步容器和原子变量,同时实现方式也越来越接近JDK。

6. 参考源码

  • JDK源码类:java.util.concurrent.ThreadPoolExecutor
  • Jetty6源码类:org.mortbay.thread.QueuedThreadPool
  • Jetty8源码类:org.eclipse.jetty.util.thread.QueuedThreadPool
  • Tomcat源码类:org.apache.tomcat.util.threads.ThreadPoolExecutor

感谢郭蕾对本文的审校。