首页 > 代码库 > 揭密FutureTask
揭密FutureTask
在java多线程编程中,我们经常使用线程池提交任务,并且通过Future来获取任务执行的结果,以此达到异步或者并行执行的效果。在jdk1.7以前,FutureTask是Future唯一的实现类,1.7后加入了ForkJoinTask类。本文主要总结一下我对FutureTask的理解。
Future类
Future接口定义了5个方法,分别是
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
分别介绍一下这五个接口的用途:
- boolean cancel(boolean mayInterruptInRunning) 取消一个正在执行中的任务,并且返回调用结果。如果取消成功则返回true,反之返回false。这里要注意,即使方法返回true,当前任务也未必真的被取消了,后面会介绍。
- boolean isCancelled() 返回当前任务是否被取消。
- Boolean isDone() 返回当前任务是否执行完毕。这里done的概念比较广,包括了futureTask被执行后的任意状态,例如正常执行完毕、执行异常或者任务被取消。
- V get() 这个接口就是用来获取futureTask执行结果,调用这个接口时会被阻塞,直到拿到结果或者异常。
- V get(long timeout, TimeUnit unit) 这个接口多了一个超时时间,如果过了这个时间task仍然没有结果返回,则抛出timeout异常
写个demo便于理解
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ExecutorService executorService = Executors.newCachedThreadPool(); 4 Future future = executorService.submit(new Callable<Object>() { 5 @Override 6 public Object call() throws Exception { 7 Long start = System.currentTimeMillis(); 8 while (true) { 9 Long current = System.currentTimeMillis();10 if ((current - start) > 1000) {11 return 1;12 }13 }14 }15 });16 17 try {18 Integer result = (Integer)future.get();19 System.out.println(result);20 }catch (Exception e){21 e.printStackTrace();22 }23 }24 }
这里我们模拟了1s钟的CPU空转,当执行future.get()的时候,主线程阻塞了大约一秒后,把结果打印出来:1。
当然我们也可以使用V get(long timeout, TimeUnit unit),这个方法提供了一个超时时间的设置,如果超过当前时间任务线程还未返回,那么就会停止阻塞状态,并且抛出一个timeout异常。如下
1 try {2 Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);3 System.out.println(result);4 } catch (Exception e) {5 e.printStackTrace();6 }
这里我们设置的超时时间是500毫秒,由于一开始我们模拟了1s的CPU计算时间,这里便会抛出超时异常,打印出堆栈信息
当然,如果我们把超时时间设置的长一些,还是可以得到预期的结果的。
FutureTask内部实现机制
刚我们测试了最常用的两个方法,接下来我们来探一探FutureTask的内部实现机制。首先我们看一下FutureTask的继承结构:
FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既可以当做一个Runnable,也可以当做一个Future。
FutureTask内部定义了7个状态,代表了FutureTask当前所处状态。如下
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
当一个任务刚提交的时候,状态为NEW,由FutureTask的构造器可知:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
任务执行正常结束前,state会被设置成COMPLETING,代表任务即将完成,接下来很快就会被设置为NARMAL或者EXCEPTIONAL,这取决于调用Runnable中的call()方法是否抛出了异常。如果没有异常,则state设为NARMAL,反之为EXCEPTIONAL。
如果任务提交后,在任务执行结束之前调用cancel(boolean mayInterruptIfRunning) 取消任务,那么有可能进入到后3个状态。如果传入的参数是false,state会被置为CANCELLED,反之如果传入true,state先被置为INTERRUPTING,后被置为INTERRUPTED。
总结下,FutureTask的状态流转过程,可以出现以下三种状态:
1. 正常执行完毕。 NEW -> COMPLETING -> NORMAL
2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL
3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED
4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED
那么以上状态为什么会这么流转呢?接下来我们一起扒一扒FutureTask的源码。我们从futureTask的方法看起。
1 public void run()
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result;10 boolean ran;11 try {12 result = c.call();13 ran = true;14 } catch (Throwable ex) {15 result = null;16 ran = false;17 setException(ex);18 }19 if (ran)20 set(result);21 }22 } finally {23 // runner must be non-null until state is settled to24 // prevent concurrent calls to run()25 runner = null;26 // state must be re-read after nulling runner to prevent27 // leaked interrupts28 int s = state;29 if (s >= INTERRUPTING)30 handlePossibleCancellationInterrupt(s);31 }32 }
翻译一下,这个方法经历了以下几步
1. 校验Task状态和当前线程引用runner,如果state不为NEW或者runner引用为null,直接返回。
2. 调用runner的call()方法执行主逻辑,并且尝试获得返回值result。
3. 如果抛出异常,调用setException(Throwable t)方法
4. 如果没有异常,调用set(V v)方法
5. 一些扫尾工作
那么setException(Throwable t)和set(V v)做了什么呢?我们看一下源码
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
set(V v) 方法首先做一个CAS操作,将state字段由NEW->COMPLETING,这里的CAS操作读者可以自行百度原理。如果成功,那么把执行结果v赋给成员变量outcome,再把state的值设置为NORMAL,最后做一些清理工作,唤醒所有等待线程并把callable对象置为null。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
同理,setException(Throwable t) 方法大同小异,只不过state字段流转为NEW->COMPLETING->EXCEPTION。同时把异常对象赋予v。
这里我们就清楚了,当一个任务被提交后,状态流转中1、2是怎么来的了。同时我们可以确定,outcome变量,存着是执行结果或者抛出的异常对象。
2 public V get() throws InterruptionException,ExecutionException
get() 和 get(long timeout, TimeUnit unit)方法是获取执行结果的两个方法,我们这里就看get()方法即可。首先贴源码
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
首先检查state值,如果小于COMPLETING,则阻塞,阻塞时可能会抛出异常,这里我们不纠结这个,往下看。如果没有抛出异常,获取执行后返回的state值,最后调用report(s)方法。接着我们看report方法,如果s为NORMAL,返回执行结果outcome,否则抛出异常。结合之前的run()方法,我们这里可以得出,如果主逻辑正常执行完毕,则返回执行结果,如果抛出异常,那么这里会封装该异常为ExecutionException并且抛出。如果任务执行过程中被取消了,则可能抛出CancellationException()。
3 public boolean cancel(boolean mayInterruptIfRunning)
这个方法个人认为是最具争议的方法。这里我们先贴个demo
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4 // 预创建线程 5 executorService.prestartCoreThread(); 6 7 Future future = executorService.submit(new Callable<Object>() { 8 @Override 9 public Object call() {10 System.out.println("start to run callable");11 Long start = System.currentTimeMillis();12 while (true) {13 Long current = System.currentTimeMillis();14 if ((current - start) > 1000) {15 System.out.println("当前任务执行已经超过1s");16 return 1;17 }18 }19 }20 });21 22 System.out.println(future.cancel(false));23 24 try {25 Thread.currentThread().sleep(3000);26 executorService.shutdown();27 } catch (Exception e) {28 //NO OP29 }30 }31 }
我们多次测试后发现,出现了2种打印结果,如图
结果1
结果2
咦,两个结果和预期的都好像不太一样?第一种是任务压根没取消,第二种则是任务压根没提交成功,似乎和方法签名cancel不太一致?
我们先看一下方法签名上的作者注释
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
这里我们可以看到,"尝试"取消任务的执行,如果当前任务已经结束或者已经取消,则当前取消操作会失败。如果任务还没开始就被取消,那么任务则不会被执行。
这里我们就知道了,如果任务还没开始执行时cancel(false)就被调用,那么这个任务是不会被执行的,这就解释了出现上图结果2的情况。那如果任务已经开始执行,并且
调用cancel(false),是不会终止任务的。我们还是从源码去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
执行逻辑如下
1. 如果当前futureTask状态不为NEW,直接返回false,表示取消操作失败。
2. 如果传入true,代表可能会引发线程中断。一个CAS操作,把状态由NEW->INTERRUPTING,如果执行失败则直接返回false。设置当前工作线程中断标识为true,然后把futureTask状态设置为INTERRUPTED。
3. 如果传入false,把futureTask状态设置为CANCELLED。
4. 做一些清理工作
可见,cancel()方法仅仅是改变了futureTask的状态位!如果传入的是false,当前任务是不会被终止的,而是会继续执行,直到异常或者执行完毕。如果传入的是true,会调用当前线程的interrupt()方法,把中断标志位设为true。所以cancel()方法其实个人理解是有歧义的,它并不能真正取消一个任务的执行。事实上,除非线程自己停止自己的任务,或者退出JVM,是没有其他方法完全终止一个线程的任务的。cancel(true)方法也只是希望当前线程可以响应中断而已,当线程被阻塞,抛出InterruptedException。同时,由之前的future.get()方法可知,如果一个futureTask被cancel()了,调用get()方法会抛出CancellationException。
总结
理解FutureTask,我们使用Future类才能更加得心应手。这里也只是作者自己的理解,如有不对之处,还望读者批评指正。
作者:mayday芋头
揭密FutureTask