首页 > 代码库 > Java_并发线程_CompletionService

Java_并发线程_CompletionService

1.CompletionService源码分析

CompletionService内部实现还是维护了一个可阻塞的队列,通过代理设计模式,从而操作队列。
    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>(); //新建一个完成队列
    }
	//通过submit提交Callable任务对象
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));//线程池执行task对象
        return f;
    }
    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { 
        	completionQueue.add(task); //执行玩后将task返回对象放置于完成队列
        }
        private final Future<V> task;
    }
	//通过take方法取得Future对象
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }


2.实例

	public static void main(String[] args) {

		ExecutorService threadPool = Executors.newFixedThreadPool(3);
		
		CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
		//将任务添加至threadPool池中,但是只分配3个Thread对象
		for (int i = 1; i <= 10; i++) {
			final int seq = i;
			completionService.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					Thread.sleep(new Random().nextInt(5000));
					return seq;
				}
			});
		}
		
		
		for (int i = 0; i < 10; i++) {
			try {
				//completionService.take(), 至于call方法执行完成,take阻塞采用数据
				//future.get() 阻塞, 只有当call执行完成,
				System.out.println(completionService.take().get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
	}

Java_并发线程_CompletionService