首页 > 代码库 > 浅析Java CompletionService
浅析Java CompletionService
JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。
可以简单查看一下CompletionService的唯一实现类ExecutorCompletionService源码
通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。
举个例子:现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端
package com.yf.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CompletionServiceTest { private ExecutorService threadPool = Executors.newCachedThreadPool(); private CompletionService<Response> completionService = new ExecutorCompletionService<Response>( Executors.newCachedThreadPool()); public CompletionServiceTest() { new Thread() { public void run() { while (true) { try { Future<Response> f = completionService.take(); /** * 获取响应信息,返回给客户端 * 如果completionService任务队列为空,此处将阻塞 */ Response resp = f.get(); System.out.println(resp.getId()); } catch (Exception e) { System.out.println("Exception happened:"+e.getMessage()); } } }; }.start(); } class Request{ private int rid; private String body; public int getRid() { return rid; } public void setRid(int rid) { this.rid = rid; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class Response { private int id; private String body; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class HTTPExecutor { public Future<Response> execute(final Request request) { Future<Response> f = threadPool.submit(new Callable<Response>() { public Response call() throws Exception { Response response = new Response(); Thread.currentThread().sleep(3000); response.setId(request.getRid()); response.setBody("response"); return response; } }); return f; } } public void submitHTTP(final Request request) { completionService.submit(new Callable<Response>() { public Response call() throws Exception { return new HTTPExecutor().execute(request).get(); } }); } public static void main(String[] args) { CompletionServiceTest t = new CompletionServiceTest(); for (int i = 0; i < 10; i++) { /** * 发送10个HTTP请求 */ Request request =t.new Request(); request.setRid(i); request.setBody("request"); t.submitHTTP(request); } } }
可以简单查看一下CompletionService的唯一实现类ExecutorCompletionService源码
关键代码如下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。