首页 > 代码库 > Java并发之任务的描述和执行

Java并发之任务的描述和执行

简单概念

《Java编程思想》对并发概念的重要性阐述:

Java是一种多线程语言,并且提出了并发问题,不管你是否意识到了。因此,有很多使用中的Java程序,要么只是偶尔工作,要么是在大多数时间里工作,并且会由于未发现的并发缺陷而时不时地神秘崩溃。有事这种崩溃是温和的,但有时却意味着重要数据的丢失,并且如果没有意识到并发问题,你能最终会认为问题出现在其他什么地方,而不是你的软件中。如果程序被迁移到多处理器系统中,这些种类的问题还会被暴露或放大。基本上,了解并发可以使你意识到明显正确的程序可能展示出不正确的行为。

所以在你编写任何复杂程序之前,应该学习一下专门讨论并发主题的数据。

使用并发解决的问题可以分为两种

  • 更快的速度
    更快的速度是针对阻塞(通常是I/O)来讲的,其实如果没有阻塞使用并发是没有任何意义,反而比顺序执行还增加了“上下文切换(从一个任务切换到另一个任务)”的开销。因为有了阻塞,才把程序断开为多个片段,然后在单处理器上运行每个片段。

  • 改进代码的设计
    线程通常使你能够创建更加松散耦合的设计如用户交互,否则,你的代码中各个部分都必须显示地关注那些通常可以由线程来处理的任务。

进程与线程

  • 进程:
    进程是运行在它自己的地址空间内的自包容的程序,进程是自愿分配的基本单位,它也是抢占处理器的调度单。多任务操作系统可以通过周期性地将CPU从一个进程切换到另一个进程,来实现同时运行多个进程(程序)。

  • 线程:
    台湾称为"执行者",我认为更确切些,它是进程中某个单一顺序的控制流,是程序(进程)执行流的最小单位,也是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属于一个进程的其他线程共享进程所拥有的全部资源。

发生进程切换与发生线程切换时相比较,进程切换时涉及到有关资源指针的保存以及地址空间的变化等问题;线程切换时,由于同进程内的线程共享资源和地址 空间,将不涉及资源信息的保存和地址变化问题,从而减少了操作系统的开销时间。而且,进程的调度与切换都是由操作系统内核完成,而线程则既可由操作系统内 核完成,也可由用户程序进行。

任务描述

线程可以驱动,因此可以说线程是任务的执行载体,而任务是真正的业务逻辑。而描述任务可以使用继承Thread类或使用Runnablecallable两个接口。

实现Runnable接口即可定义为任务,然后使用线程驱动,Callable 接口也类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常,而Callable可以。

使用Runable定义任务和执行简单示例:

public class App {		public static void main(String[] args) {		for(int i=0;i<10;i++){//开10个线程			new Thread(new Task()).start();		}	}}/** * 定义任务 * @author Administrator */class Task implements Runnable{	@Override	public void run() {		System.out.println("线程ID:"+Thread.currentThread().getId());	}}

输出:

线程ID:8
线程ID:10
线程ID:12
线程ID:14
线程ID:16
线程ID:9
线程ID:11
线程ID:13
线程ID:15
线程ID:17

使用继承Thread类的方式,与此类似就不做举例了。

Executor框架

Runnable 的任务可以直接使用Thread类来执行,但Callable却不能,需要使用Executor框架来执行,其实最好不要直接使用Thread来执行Runnable任务,而是使用Executor框架。

Executor是JAVA SE5的java.util.concurrent包中的执行器,它为你管理Thread对象。Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。Executor存在的目的是提供一种将"任务提交"与"任务如何运行"分离开来的机制。架构如下图: 

Executor接口定义如下:

public interface Executor {      void execute(Runnable command);  }

虽然只有一个方法,但却为灵活强大的异步任务执行框架提供了基础。它将任务的提交过程与执行过程解耦:用Runnable/Callable来表示任务,执行的任务放入run/call方法中即可,将Runnable/Callable接口的实现类交给线程池的execute方法来执行。实际上我们并不是直接使用Execuotr接口的,而是使用更为方便的ExecutorService接口,因为它对任务的生命周期做了管理:

public interface ExecutorService extends Executor {      void shutdown();      List<Runnable> shutdownNow();      boolean isShutdown();      boolean isTerminated();      boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;      ......}

对于上述框架图:

ExecutorService: 真正的线程池接口。
ScheduledExecutorService接口: 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
ThreadPoolExecutor: ExecutorService的默认实现。
ScheduledThreadPoolExecutor: 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

线程池

那么线程池从而而来呢?这时Executors就出场了,它为Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些工具方法,类似于集合中的Collections类的功能。

Executors可以很方便的创建线程池

static ExecutorService newSingleThreadExecutor();static ExecutorService newFixedThreadPool(int nThreads);static ExecutorService newCachedThreadPool();static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
  • newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

  • newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  • newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

  • newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

其实还有很多与线程池相关的方法,具体看参考JDK API文档,其实上面列出的方法都是使用ThreadPoolExecutor类来实现的,因为ThreadPoolExecutor是ExecutorService的默认实现。我们来看看ThreadPoolExecutor的几个构造函数:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

各参数含义如下:

  • corePoolSize(基本线程池的大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界(不限制线程数)的任务队列这个参数就没什么效果。

  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

  • workQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字或后台线程等等。

  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。

    • AbortPolicy:直接抛出异常。

    • CallerRunsPolicy:只用调用者所在线程来运行任务。

    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

    • DiscardPolicy:不处理,丢弃掉。

    • 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

如Executors的“newCachedThreadPool()”方法的实现使用的ThreadPoolExecutor的构造函数如:

public static ExecutorService newCachedThreadPool() {	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,			      60L, TimeUnit.SECONDS,			      new SynchronousQueue<Runnable>());}

由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时

  1. 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的;

  2. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理

  3. 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程才创建新的线程去处理请求;

  4. 如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求;

  5. 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务

也就是说,处理任务的优先级为: 

  1. 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

  2. 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。

使用Executors来提交和执行任务

  1. 使用Runnable来表示的无返回值的任务:


    1. import java.util.concurrent.ExecutorService;

    2. import java.util.concurrent.Executors;

    3.  

    4. public class App {

    5. public static void main(String[] args) {

    6. ExecutorService exec=Executors.newFixedThreadPool(10);

    7. for(int i=0;i<10;i++){

    8. exec.execute(new Task());

    9. }

    10. exec.shutdown();

    11. }

    12. }

    13. /**

    14. * 定义任务

    15. * @author Administrator

    16. */

    17. class Task implements Runnable{

    18.  

    19. @Override

    20. public void run() {

    21. System.out.println(Thread.currentThread().getId());

    22. }

    23. }

  2. 使用Callable来表示的有返回值的任务:


    Callable接口只能使用submit来提交任务,submit会产生Future对象结果,可以用阻塞的get()来获取这个结果,但你使用isDone方法来查询是否已经产生了Future结果,然后再获取。

    1. import java.util.ArrayList;

    2. import java.util.concurrent.Callable;

    3. import java.util.concurrent.ExecutorService;

    4. import java.util.concurrent.Executors;

    5. import java.util.concurrent.Future;

    6.  

    7. public class App {

    8. public static void main(String[] args) throws Exception {

    9. ExecutorService exec=Executors.newFixedThreadPool(10);

    10. ArrayList<Future<Long>> results=new ArrayList<Future<Long>>();

    11. for(int i=0;i<10;i++){

    12. results.add(exec.submit(new Task()));

    13. }

    14. for(Future<Long> fs :results){

    15. System.out.println(fs.get());

    16. }

    17. exec.shutdown();

    18. }

    19. }

    20. /**

    21. * 定义任务

    22. * @author Administrator

    23. */

    24. class Task implements Callable<Long>{

    25.  

    26. @Override

    27. public Long call() throws Exception {

    28. return Thread.currentThread().getId();

    29. }

    30. }

关闭线程池:

  1. shutdown():平缓的关闭线程池。线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务;

  2. shutdownNow():立即关闭线程池。线程池停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务;

向任务传值

Runnable的run和Callable的call都是无参接口,那么在运行状态如何传递值呢?网上说又三种方式,其实本质是一种:向Runnable/Callable的实现类传值,所以你有多种方式比如构造函数,属性等等:

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class App {		public static void main(String[] args) {		ExecutorService exec=Executors.newFixedThreadPool(10);		for(int i=0;i<10;i++){			exec.execute(new Task("线程ID"));		}		exec.shutdown();	}}/** * 定义任务 * @author Administrator */class Task implements Runnable{	private String outVal;		public Task(String outVal){		this.outVal=outVal;	}	@Override	public void run() {		System.out.println(outVal+":"+Thread.currentThread().getId());	}}

输出:

线程ID:8
线程ID:10
线程ID:12
线程ID:14
线程ID:16
线程ID:9
线程ID:11
线程ID:13
线程ID:15
线程ID:17


Java并发之任务的描述和执行