首页 > 代码库 > Java并发——Callable和Future
Java并发——Callable和Future
- Callable定义的方法是call( ),而Runnable定义的方法是run( )。
- Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
- Callable的call方法可抛出受检查的异常,而Runnable的run方法不能抛出异常。
在工具类Executors中有一些工具方法可以把Runnable任务转成Callable。你可以使用executor去执行一个Callable任务,也可以将Callable转成FutureTask对象,然后交由线程去执行。
Future是异步计算的结果,它描述了任务的生命周期,并提供了相关的方法来获得任务执行的结果、取消任务以及检查任务是否已经完成或者取消。
有多种方式可以创建一个Future。ExecutorService中的所有submit方法都会返回一个Future,利用这个返回的Future你可以获取任务的执行结果,或者取消任务。可以显示将Runnable或者Callable实例化一个FutureTask。
下面的例子演示了Callable和Future的一些方法,程序中定义了两个任务c1和c2,并且模拟c2的执行时间是8秒左右,然后依次调用future的相关方法
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableAndFuture { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(5); Callable<Integer> c1 = new Target(false); Callable<Integer> c2 = new Target(true); Future<Integer> f1 = es.submit(c1); Future<Integer> f2 = es.submit(c2); int res = 0; try { res = f1.get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } boolean isCancelled = f1.isCancelled(); boolean isDone = f1.isDone(); System.out.println(res); System.out.println(isCancelled); System.out.println(isDone); System.out.println("---------------------------"); try { boolean cancel = f2.cancel(true); int res2 = f2.get(); isCancelled = f1.isCancelled(); isDone = f1.isDone(); System.out.println(res2); System.out.println(cancel); System.out.println(isCancelled); System.out.println(isDone); } catch (CancellationException e) { // TODO Auto-generated catch block System.out.println("任务被取消."); } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("任务被中断."); } catch (ExecutionException e) { // TODO Auto-generated catch block System.out.println("任务执行异常."); } } } class Target implements Callable<Integer> { private boolean sleep = false; public Target(boolean sleep) { // TODO Auto-generated constructor stub this.sleep = sleep; } @Override public Integer call() throws Exception { // TODO Auto-generated method stub if(sleep) { Thread.sleep(8000); } int i = new Random().nextInt(1000); return i; } }
任务的执行结果:
982 false true --------------------------- 任务被取消.
Future接口的相关方法
cancel( )方法可以试图取消任务的执行,如果当前任务已经完成、或已经被取消、或由于某些原因无法取消,则取消操作失败,返回false;如果该任务尚未运行,调用cancel( )方法将会使该任务永不会运行;如果调用cancel( )方法时,该任务已经运行,那么取决于参数boolean的值,如果是true,则表示立即中断该任务的执行,否则,等待该运行的任务结束后,尝试cancel并返回false。
isCancel( ),如果在任务正常完成前将其取消,那么返回true,否则,返回false。
isDone( ) , 如果任务已完成,则返回true,由于正常终止、异常或取消而完成,也会返回true。
get( ) , 如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,get会阻塞直到它完成。如果任务抛出了异常,get会将该异常封装为ExecutionException,然后重新抛出;如果任务被取消,get会抛出CancellationException。
FutureTask
FutureTask类相当于同时实现了Runnable和Future接口,提供了Future接口的具体实现,可以使用FutureTask去包装Callable或Runnable任务。因为FutureTask实现了Runnable接口,所以可以将其交给Executor去执行,或者直接调用run( )去执行。
使用FutureTask的一个示例
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; public class MyFutureTask { public static void main(String[] args) throws Exception { Executor executor = Executors.newFixedThreadPool(5); Callable<Integer> callable = new MyTarget(); FutureTask<Integer> ft = new FutureTask<>(callable); executor.execute(ft); System.out.println(ft.get()); // 直接调用run // ft.run(); // System.out.println(ft.get()); System.out.println("-----------------------"); Runnable runnable = new MyRunnableTarget(); FutureTask<String> ft2 = new FutureTask<String>(runnable, "SUCCESS"); executor.execute(ft2); System.out.println(ft2.get()); } } class MyTarget implements Callable<Integer> { @Override public Integer call() throws Exception { // TODO Auto-generated method stub int i = new Random().nextInt(1000); return i; } } class MyRunnableTarget implements Runnable { @Override public void run() { // TODO Auto-generated method stub System.out.println("Runnable is invoke..."); } }
程序输出:
280 ----------------------- Runnable is invoke... SUCCESS
CompletionService
有时候我们需要利用executor去执行一批任务,每个任务都有一个返回值,利用Future就可以解决这个问题,为此我们需要保存每个任务提交后的Future,然后依次调用get方法轮询,获得已经执行完毕的任务的结果,这样的过程显得无趣。我们希望一次提交一批任务后,executor执行结束也是返回给我们一个已经执行完毕的Future集合。
CompletionService整合了Executor和BlockingQueue的功能。你可以将一批Callable任务交给它去执行,然后使用类似于队列中的take和poll方法,在结果完成时获得这个结果,就像一个打包的Future。将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit方法 执行的任务。使用者take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。ExecutorCompletionService类是一个实现了CompletionService接口的实现类,它将计算任务交给一个传入的Executor去执行。
下面是一个ExecutorCompletionService的示例
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; public class TestCompletionService { private class Target implements Callable<Integer> { @Override public Integer call() throws Exception { // TODO Auto-generated method stub int i = new Random().nextInt(1000); return i; } } public static void main(String[] args) throws Exception { Executor executor = Executors.newFixedThreadPool(5); ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(executor); Callable<Integer> c1 = new TestCompletionService().new Target(); Callable<Integer> c2 = new TestCompletionService().new Target(); Callable<Integer> c3 = new TestCompletionService().new Target(); ecs.submit(c1); ecs.submit(c2); ecs.submit(c3); System.out.println(ecs.take().get()); System.out.println(ecs.poll().get()); System.out.println(ecs.take().get()); } }
这样将Future分离开来,已经完成的任务的Future就会被加入到BlockingQueue中供用户直接获取。
关于poll方法和get方法的区别,poll方法是非阻塞的,有则返回,无则返回NULL,take方法是阻塞的,没有的话则会等待。
批处理与任务执行时限
在有些应用场景中,我们需要同时处理多个任务,并获取结果,使用上面的CompletionService将完成的任务与未完成的任务分隔开似乎能够解决,但是如果其中有一个任务相当耗时,就会影响整个批处理任务的完成速度。比如,在一个页面中,我们需要从多个数据源获取数据,并在页面展示,同时我们希望整个页面的加载过程不超过2秒,那么那些超过2秒没有响应成功的数据源数据则用默认值替换,ExecutorService提供了invokeAll( )来完成这个任务。
下面我们通过一个示例演示invokeAll方法,程序中定义了3个任务,c1、c2、c3模拟执行时间分别为1、2、3秒,程序允许的最大执行时间是2秒,超过2秒的任务就会被取消。
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class TestCompletionService { private class Target implements Callable<Integer> { private int a = 0; public Target(int a) { // TODO Auto-generated constructor stub this.a = a; } @Override public Integer call() throws Exception { // TODO Auto-generated method stub Thread.sleep(1000*a); return a; } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(5); Callable<Integer> c1 = new TestCompletionService().new Target(1); Callable<Integer> c2 = new TestCompletionService().new Target(2); Callable<Integer> c3 = new TestCompletionService().new Target(3); List<Callable<Integer>> list = new ArrayList<>(); list.add(c1); list.add(c2); list.add(c3); try { List<Future<Integer>> res = es.invokeAll(list, 2, TimeUnit.SECONDS); Iterator<Future<Integer>> it = res.iterator(); while(it.hasNext()) { Future<Integer> f = it.next(); int i = f.get(); System.out.println(i); } } catch (CancellationException e ) { System.out.println("任务取消"); } catch (InterruptedException e) { System.out.println("中断异常"); } catch (ExecutionException e) { System.out.println("执行异常"); } } }
程序的输出:
1 2 任务取消
需要注意的是,java.util.concurrent中所有的关于时间的方法都将负数作为0处理,不需要额外的处理
Java并发——Callable和Future