首页 > 代码库 > 揭密FutureTask

揭密FutureTask

      在java多线程编程中,我们经常使用线程池提交任务,并且通过Future来获取任务执行的结果,以此达到异步或者并行执行的效果。在jdk1.7以前,FutureTask是Future唯一的实现类,1.7后加入了ForkJoinTask类。本文主要总结一下我对FutureTask的理解。

Future类

  Future接口定义了5个方法,分别是 

 boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled();  boolean isDone();  V get() throws InterruptedException, ExecutionException;  V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

    分别介绍一下这五个接口的用途:

  • boolean cancel(boolean mayInterruptInRunning) 取消一个正在执行中的任务,并且返回调用结果。如果取消成功则返回true,反之返回false。这里要注意,即使方法返回true,当前任务也未必真的被取消了,后面会介绍。
  • boolean isCancelled() 返回当前任务是否被取消。
  • Boolean isDone() 返回当前任务是否执行完毕。这里done的概念比较广,包括了futureTask被执行后的任意状态,例如正常执行完毕、执行异常或者任务被取消。
  • V get() 这个接口就是用来获取futureTask执行结果,调用这个接口时会被阻塞,直到拿到结果或者异常。
  • V get(long timeout, TimeUnit unit) 这个接口多了一个超时时间,如果过了这个时间task仍然没有结果返回,则抛出timeout异常

    写个demo便于理解

 1 public class FutureDemo { 2     public static void main(String[] args) { 3         ExecutorService executorService = Executors.newCachedThreadPool(); 4         Future future = executorService.submit(new Callable<Object>() { 5             @Override 6             public Object call() throws Exception { 7                 Long start = System.currentTimeMillis(); 8                 while (true) { 9                     Long current = System.currentTimeMillis();10                     if ((current - start) > 1000) {11                         return 1;12                     }13                 }14             }15         });16 17         try {18             Integer result = (Integer)future.get();19             System.out.println(result);20         }catch (Exception e){21             e.printStackTrace();22         }23     }24 }

   这里我们模拟了1s钟的CPU空转,当执行future.get()的时候,主线程阻塞了大约一秒后,把结果打印出来:1。

        当然我们也可以使用V get(long timeout, TimeUnit unit),这个方法提供了一个超时时间的设置,如果超过当前时间任务线程还未返回,那么就会停止阻塞状态,并且抛出一个timeout异常。如下

1         try {2             Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);3             System.out.println(result);4         } catch (Exception e) {5             e.printStackTrace();6         }

        这里我们设置的超时时间是500毫秒,由于一开始我们模拟了1s的CPU计算时间,这里便会抛出超时异常,打印出堆栈信息

     技术分享

      当然,如果我们把超时时间设置的长一些,还是可以得到预期的结果的。

FutureTask内部实现机制

  刚我们测试了最常用的两个方法,接下来我们来探一探FutureTask的内部实现机制。首先我们看一下FutureTask的继承结构:

          技术分享

      FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既可以当做一个Runnable,也可以当做一个Future。

  FutureTask内部定义了7个状态,代表了FutureTask当前所处状态。如下

    private volatile int state;    private static final int NEW          = 0;    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;

      当一个任务刚提交的时候,状态为NEW,由FutureTask的构造器可知:

public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        this.callable = callable;        this.state = NEW;       // ensure visibility of callable    }

  任务执行正常结束前,state会被设置成COMPLETING,代表任务即将完成,接下来很快就会被设置为NARMAL或者EXCEPTIONAL,这取决于调用Runnable中的call()方法是否抛出了异常。如果没有异常,则state设为NARMAL,反之为EXCEPTIONAL。

  如果任务提交后,在任务执行结束之前调用cancel(boolean mayInterruptIfRunning) 取消任务,那么有可能进入到后3个状态。如果传入的参数是false,state会被置为CANCELLED,反之如果传入true,state先被置为INTERRUPTING,后被置为INTERRUPTED。

     总结下,FutureTask的状态流转过程,可以出现以下三种状态:

        1. 正常执行完毕。 NEW -> COMPLETING -> NORMAL

    2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL

        3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED

   4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED  

  那么以上状态为什么会这么流转呢?接下来我们一起扒一扒FutureTask的源码。我们从futureTask的方法看起。

1 public void run()

 1 public void run() { 2         if (state != NEW || 3             !UNSAFE.compareAndSwapObject(this, runnerOffset, 4                                          null, Thread.currentThread())) 5             return; 6         try { 7             Callable<V> c = callable; 8             if (c != null && state == NEW) { 9                 V result;10                 boolean ran;11                 try {12                     result = c.call();13                     ran = true;14                 } catch (Throwable ex) {15                     result = null;16                     ran = false;17                     setException(ex);18                 }19                 if (ran)20                     set(result);21             }22         } finally {23             // runner must be non-null until state is settled to24             // prevent concurrent calls to run()25             runner = null;26             // state must be re-read after nulling runner to prevent27             // leaked interrupts28             int s = state;29             if (s >= INTERRUPTING)30                 handlePossibleCancellationInterrupt(s);31         }32     }

  翻译一下,这个方法经历了以下几步

      1. 校验Task状态和当前线程引用runner,如果state不为NEW或者runner引用为null,直接返回。

  2. 调用runner的call()方法执行主逻辑,并且尝试获得返回值result。

  3. 如果抛出异常,调用setException(Throwable t)方法

  4. 如果没有异常,调用set(V v)方法

  5. 一些扫尾工作

 那么setException(Throwable t)和set(V v)做了什么呢?我们看一下源码

protected void set(V v) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = v;            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state            finishCompletion();        }    }

  set(V v) 方法首先做一个CAS操作,将state字段由NEW->COMPLETING,这里的CAS操作读者可以自行百度原理。如果成功,那么把执行结果v赋给成员变量outcome,再把state的值设置为NORMAL,最后做一些清理工作,唤醒所有等待线程并把callable对象置为null。

 protected void setException(Throwable t) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = t;            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state            finishCompletion();        }    }

  同理,setException(Throwable t) 方法大同小异,只不过state字段流转为NEW->COMPLETING->EXCEPTION。同时把异常对象赋予v。

  这里我们就清楚了,当一个任务被提交后,状态流转中1、2是怎么来的了。同时我们可以确定,outcome变量,存着是执行结果或者抛出的异常对象。

2  public V get() throws InterruptionException,ExecutionException

    get() 和 get(long timeout, TimeUnit unit)方法是获取执行结果的两个方法,我们这里就看get()方法即可。首先贴源码

  

public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING)            s = awaitDone(false, 0L);        return report(s);    }private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL)            return (V)x;        if (s >= CANCELLED)            throw new CancellationException();        throw new ExecutionException((Throwable)x);    }

  首先检查state值,如果小于COMPLETING,则阻塞,阻塞时可能会抛出异常,这里我们不纠结这个,往下看。如果没有抛出异常,获取执行后返回的state值,最后调用report(s)方法。接着我们看report方法,如果s为NORMAL,返回执行结果outcome,否则抛出异常。结合之前的run()方法,我们这里可以得出,如果主逻辑正常执行完毕,则返回执行结果,如果抛出异常,那么这里会封装该异常为ExecutionException并且抛出。如果任务执行过程中被取消了,则可能抛出CancellationException()。

3 public boolean cancel(boolean mayInterruptIfRunning)

  这个方法个人认为是最具争议的方法。这里我们先贴个demo

  

 1 public class FutureDemo { 2     public static void main(String[] args) { 3         ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4         // 预创建线程 5         executorService.prestartCoreThread(); 6  7         Future future = executorService.submit(new Callable<Object>() { 8             @Override 9             public Object call() {10                 System.out.println("start to run callable");11                 Long start = System.currentTimeMillis();12                 while (true) {13                     Long current = System.currentTimeMillis();14                     if ((current - start) > 1000) {15                         System.out.println("当前任务执行已经超过1s");16                         return 1;17                     }18                 }19             }20         });21 22         System.out.println(future.cancel(false));23 24         try {25             Thread.currentThread().sleep(3000);26             executorService.shutdown();27         } catch (Exception e) {28             //NO OP29         }30     }31 }

我们多次测试后发现,出现了2种打印结果,如图

技术分享

                结果1

技术分享

                结果2    

咦,两个结果和预期的都好像不太一样?第一种是任务压根没取消,第二种则是任务压根没提交成功,似乎和方法签名cancel不太一致?

我们先看一下方法签名上的作者注

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
  这里我们可以看到,"尝试"取消任务的执行,如果当前任务已经结束或者已经取消,则当前取消操作会失败。如果任务还没开始就被取消,那么任务则不会被执行。
这里我们就知道了,如果任务还没开始执行时cancel(false)就被调用,那么这个任务是不会被执行的,这就解释了出现上图结果2的情况。那如果任务已经开始执行,并且
调用cancel(false),是不会终止任务的。我们还是从源码去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) {        if (state != NEW)            return false;        if (mayInterruptIfRunning) {            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))                return false;            Thread t = runner;            if (t != null)                t.interrupt();            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state        }        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))            return false;        finishCompletion();        return true;    }

  执行逻辑如下

     1. 如果当前futureTask状态不为NEW,直接返回false,表示取消操作失败。

  2. 如果传入true,代表可能会引发线程中断。一个CAS操作,把状态由NEW->INTERRUPTING,如果执行失败则直接返回false。设置当前工作线程中断标识为true,然后把futureTask状态设置为INTERRUPTED。

  3. 如果传入false,把futureTask状态设置为CANCELLED。

  4. 做一些清理工作

    可见,cancel()方法仅仅是改变了futureTask的状态位!如果传入的是false,当前任务是不会被终止的,而是会继续执行,直到异常或者执行完毕。如果传入的是true,会调用当前线程的interrupt()方法,把中断标志位设为true。所以cancel()方法其实个人理解是有歧义的,它并不能真正取消一个任务的执行。事实上,除非线程自己停止自己的任务,或者退出JVM,是没有其他方法完全终止一个线程的任务的。cancel(true)方法也只是希望当前线程可以响应中断而已,当线程被阻塞,抛出InterruptedException。同时,由之前的future.get()方法可知,如果一个futureTask被cancel()了,调用get()方法会抛出CancellationException。

总结

  理解FutureTask,我们使用Future类才能更加得心应手。这里也只是作者自己的理解,如有不对之处,还望读者批评指正。

 

 

作者:mayday芋头

出处:http://www.cnblogs.com/maypattis/
本博客中未标明转载的文章归作者mayday芋头和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利

   

 

 

揭密FutureTask