首页 > 代码库 > java并发编程框架 Executor ExecutorService invokeall

java并发编程框架 Executor ExecutorService invokeall

首先介绍两个重要的接口,Executor和ExecutorService,定义如下: 

Java代码  技术分享
  1. public interface Executor {  
  2.     void execute(Runnable command);  
  3. }  

 

Java代码  技术分享
  1. public interface ExecutorService extends Executor {  
  2.     //不再接受新任务,待所有任务执行完毕后关闭ExecutorService  
  3.     void shutdown();  
  4.     //不再接受新任务,直接关闭ExecutorService,返回没有执行的任务列表  
  5.     List<Runnable> shutdownNow();  
  6.     //判断ExecutorService是否关闭  
  7.     boolean isShutdown();  
  8.     //判断ExecutorService是否终止  
  9.     boolean isTerminated();  
  10.     //等待ExecutorService到达终止状态  
  11.     boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
  12.     <T> Future<T> submit(Callable<T> task);  
  13.     //当task执行成功的时候future.get()返回result  
  14.     <T> Future<T> submit(Runnable task, T result);  
  15.     //当task执行成功的时候future.get()返回null  
  16.     Future<?> submit(Runnable task);  
  17.     //批量提交任务并获得他们的future,Task列表与Future列表一一对应  
  18.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
  19.         throws InterruptedException;  
  20.     //批量提交任务并获得他们的future,并限定处理所有任务的时间  
  21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  
  22. long timeout, TimeUnit unit) throws InterruptedException;  
  23.     //批量提交任务并获得一个已经成功执行的任务的结果  
  24.     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;  
  25.   
  26.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
  27.                     long timeout, TimeUnit unit)  
  28.         throws InterruptedException, ExecutionException, TimeoutException;  
  29. }  



为了配合使用上面的并发编程接口,有一个Executors工厂类,负责创建各类满足ExecutorService接口的线程池,具体如下: 
newFixedThreadPool:创建一个固定长度的线程池,线程池中线程的数量从1增加到最大值后保持不变。如果某个线程坏死掉,将会补充一个新的线程。 
newCachedThreadPool:创建长度不固定的线程池,线程池的规模不受限制,不常用。 
newSingleThreadExecutor:创建一个单线程的Executor,他其中有一个线程来处理任务,如果这个线程坏死掉,将补充一个新线程。 
newScheduledThreadPool:创建固定长度的线程池,以延时或定时的方式来执行任务。 

下面是Executor和ExecutorService中常用方法的示例: 

Java代码  技术分享
  1. import java.util.ArrayList;  
  2. import java.util.Collection;  
  3. import java.util.Iterator;  
  4. import java.util.List;  
  5. import java.util.concurrent.Callable;  
  6. import java.util.concurrent.Executor;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.TimeUnit;  
  11.   
  12. public class Demo{  
  13.     public static void main(String [] args){  
  14.         //--------Executor示例------------//  
  15.         Executor s=Executors.newSingleThreadExecutor();  
  16.         s.execute(new MyRunnableTask("1"));  
  17.           
  18.         //--------ExecutorService示例------------//  
  19.         ExecutorService es=Executors.newFixedThreadPool(2);  
  20.           
  21.         //--------get()示例------------//  
  22.         Future<String> future=es.submit(new MyCallableTask("10"));  
  23.         try{  
  24.             System.out.println(future.get());             
  25.         }catch(Exception e){}  
  26.           
  27.         //--------get(timeout, timeunit)示例------------//  
  28.         future=es.submit(new MyCallableTask("11"));  
  29.         try{  
  30.             System.out.println(future.get(500,TimeUnit.MILLISECONDS));  
  31.         }catch(Exception e){  
  32.             System.out.println("cancle because timeout");  
  33.         }  
  34.           
  35.         //--------invokeAll(tasks)示例------------//  
  36.         List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>();  
  37.         for(int i=0;i<6;i++){  
  38.             myCallableTasks.add(new MyCallableTask(i+""));  
  39.         }  
  40.         try {  
  41.             List<Future<String>> results = es.invokeAll(myCallableTasks);  
  42.             Iterator<Future<String>> iterator=results.iterator();  
  43.             while(iterator.hasNext()){  
  44.                 future=iterator.next();  
  45.                 System.out.println(future.get());  
  46.             }  
  47.         } catch (Exception e) {}  
  48.   
  49.         //--------invokeAll(tasks,timeout,timeunit))示例------------//  
  50.         try {  
  51.             //限定执行时间为2100ms,每个任务需要1000ms,线程池的长度为2,因此最多只能处理4个任务。一共6个任务,有2个任务会被取消。  
  52.             List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS);  
  53.             Iterator<Future<String>> iterator=results.iterator();  
  54.             while(iterator.hasNext()){  
  55.                 future=iterator.next();  
  56.                 if(!future.isCancelled())  
  57.                     System.out.println(future.get());  
  58.                 else  
  59.                     System.out.println("cancle because timeout");  
  60.             }  
  61.         } catch (Exception e) {}  
  62.         es.shutdown();  
  63.     }  
  64. }  
  65.   
  66. class MyRunnableTask implements Runnable{  
  67.     private String name;  
  68.     public MyRunnableTask(String name) {  
  69.         this.name=name;  
  70.     }  
  71.     @Override  
  72.     public void run() {  
  73.         try {  
  74.             Thread.sleep(1000);  
  75.         } catch (InterruptedException e) {  
  76.             e.printStackTrace();  
  77.         }  
  78.         System.out.println("runnable task--"+name);  
  79.     }  
  80. }  
  81. class MyCallableTask implements Callable<String>{  
  82.     private String name;  
  83.     public MyCallableTask(String name) {  
  84.         this.name=name;  
  85.     }  
  86.     @Override  
  87.     public String call() throws Exception {  
  88.         try {  
  89.             Thread.sleep(1000);  
  90.         } catch (InterruptedException e) {}  
  91.         StringBuilder sb=new StringBuilder("callable task--");  
  92.         return sb.append(name).toString();  
  93.     }  
  94. }  



上面的ExecutorSerivce接口中的invokeAll(tasks)方法用于批量执行任务,并且将结果按照task列表中的顺序返回。此外,还存在一个批量执行任务的接口CompletionTask。ExecutorCompletionService是实现CompletionService接口的一个类,该类的实现原理很简单: 

用Executor类来执行任务,同时把在执行任务的Future放到BlockingQueue<Future<V>>队列中。该类实现的关键就是重写FutureTask类的done()方法,FutureTask类的done()方法是一个钩子函数(关于钩子函数,请读者自行查询),done()方法在FutureTask任务被执行的时候被调用。 

ExecutorCompletionService类的核心代码如下: 

Java代码  技术分享
  1. public Future<V> submit(Runnable task, V result) {  
  2.     if (task == null) throw new NullPointerException();  
  3.     RunnableFuture<V> f = newTaskFor(task, result);  
  4.     executor.execute(new QueueingFuture(f));  
  5.     return f;  
  6. }  
  7. private class QueueingFuture extends FutureTask<Void> {  
  8.     QueueingFuture(RunnableFuture<V> task) {  
  9.         super(task, null);  
  10.         this.task = task;  
  11.     }  
  12.     protected void done() { completionQueue.add(task); }  
  13.     private final Future<V> task;  
  14. }  


其中的done()方法定义如下: 

Java代码  技术分享
  1. /** 
  2.     * Protected method invoked when this task transitions to state 
  3.     * <tt>isDone</tt> (whether normally or via cancellation). The 
  4.     * default implementation does nothing.  Subclasses may override 
  5.     * this method to invoke completion callbacks or perform 
  6.     * bookkeeping. Note that you can query status inside the 
  7.     * implementation of this method to determine whether this task 
  8.     * has been cancelled. 
  9.     */  
  10.    protected void done() { }  



ExecutorCompletionService的使用示例如下: 

Java代码  技术分享
    1. import java.util.concurrent.Callable;  
    2. import java.util.concurrent.CompletionService;  
    3. import java.util.concurrent.ExecutionException;  
    4. import java.util.concurrent.ExecutorCompletionService;  
    5. import java.util.concurrent.Executors;  
    6. import java.util.concurrent.Future;  
    7.   
    8. public class Demo{  
    9.     public static void main(String [] args) throws InterruptedException, ExecutionException{  
    10.         CompletionService<String> cs=new ExecutorCompletionService<String>(  
    11.                 Executors.newFixedThreadPool(2));  
    12.         for(int i=0;i<6;i++){  
    13.             cs.submit(new MyCallableTask(i+""));  
    14.         }  
    15.         for(int i=0;i<6;i++){  
    16.             Future<String> future=cs.take();  
    17.             //Retrieves and removes the Future representing the next completed task,   
    18.             //waiting if none are yet present.  
    19.             System.out.println(future.get());  
    20.         }  
    21.     }  
    22. }  
    23.   
    24. class MyCallableTask implements Callable<String>{  
    25.     private String name;  
    26.     public MyCallableTask(String name) {  
    27.         this.name=name;  
    28.     }  
    29.     @Override  
    30.     public String call() throws Exception {  
    31.         try {  
    32.             Thread.sleep(1000);  
    33.         } catch (InterruptedException e) {}  
    34.         StringBuilder sb=new StringBuilder("callable task--");  
    35.         return sb.append(name).toString();  
    36.     }  
    37. }  

java并发编程框架 Executor ExecutorService invokeall