首页 > 代码库 > Camel之AsyncProcessor
Camel之AsyncProcessor
Camel支持一种更复杂的异步的处理模型,异步处理器实现一个继承自Processor接口的AsyncProcessor接口,使用异步Processor的优点:
a.异步Processor不会因等待阻塞调用而耗尽线程,这样在处理同样工作量的情况下,通过减少线程的数量可以增加系统的伸缩性
b.使用异步Processor,可以将路由分阶段处理,不同的线程池处理其相应的路由阶段,这就意味着路由可以并行处理。
缺点:实现异步的Processor要比同步的Processor复杂得多。
异步Processor与同步Processor的区别:
a.必须提供一个AsyncCallback对象,该对象在exchange处理完成后被通知
b.在异步Processor处理exchange的时候不能抛出任何异常,而应该将异常存储在exchange的Exception属性中
c.异步Processor必须知道它将以什么方式完成处理,异步或同步,如果process方法返回true,则是同步完成,如果process方法返回false,则是异步完成。
d.当处理器处理完exchange时,它必须调用callback.done(boolean sync)方法,sync参数必须与process方法的返回值一致。
对于一个路由来说,完全使用异步模式可以降低线程的使用量,这要求从Consumer开始就必须使用异步的处理API(即调用异步的
process方法),如果Consumer调用的是同步process()方法,那么消费者线程在处理Exchange时将被强制阻塞。
有一点必须注意的是当你调用了异步的API,这并不意味着处理过程就是异步的,这仅仅是为不捆绑在调用者线程提供了可能。
当我看到异步两个字时,直觉就是使用异步Processor时会启用新的线程进行处理,但在上面的例子中,三个线程名称是一样的,
并且在阻塞了10秒后process2才打印出来,这说明上面的三个processor是在同一个线程中执行的,这也是阻塞10秒的原因。
我个人认为是对Camel异步Processor的"异步"两字理解出现了偏差,这里的异步只为processor的processor方法,提供一个
回调函数,而不是另启线程。而且我们自己写Processor处理器对这个异步的使用也很有限,因为我们写的处理器是被调用者,AsyncCallback是由上层提供的,我们只是能调用其done方法通知上层本次处理完成,而我们更多的需求应该是自己去注册回调函数,并且我们能够控制这个回调函数的回调时机,而现在我们无法提供回调函数的注册。那我们不禁要问,这个AsyncCallback对象那到底是谁提供的呢?AsyncCallback对象的源头当然是在消费者类提供的,对上面的例子来说是在FileConsumer类中,如下是GenericFileConsumer的processExchange方法的一个片段(FileConsumer继承自GenericFileConsumer)
这时创建的AsyncCallback对象就是源始的回调对象,当然在路由执行的后续过程中,该回调对象可以被包装,其中CamelInternalProcessor的process(Exchange exchange, AsyncCallback callback)方法就是一个例子:
callback = new InternalCallback(states, exchange, callback);
这里我们不禁又会问,既然CamelInternalProcessor能够对源始AsyncCallback对象进行包装加入自己的回调逻辑,为什么我们自己不行呢,其原来还是我们写的Processor是被调用者,是被包装者,具体过程可参看Camel路由启动过程。
如果非要添加自己的回调逻辑也不是不可能,就只能自己写消费者,自己写消费者就能控制源AsyncCallback对象,其后续只是对
源AsyncCallback对象的一个包装的过程,只要保证最外层的AsyncCallback对象被调用,那么源AsyncCallback对象也一定会被调用。所以在上例中,如果在第二个Processor中如果不执行callback.done(false);的话路由过程将永远不会结束,因为上层一直认为下层处理还未结束。当然如果我们不写异常Processor,路由过程还是会正常结束的,Camel内部会自行处理,但是如果我们写了异步Processor就一定要调用callback.done方法。
所以这么一通下来,并没有感受到官方提及的不阻塞调用、降价线程使用、路由分阶段处理等,个人的感觉就是多了一个回调方法,而且这个回调功能还很有限,当然这也有可能是自己什么地方理解错了,如若如此,尽请指正......
a.异步Processor不会因等待阻塞调用而耗尽线程,这样在处理同样工作量的情况下,通过减少线程的数量可以增加系统的伸缩性
b.使用异步Processor,可以将路由分阶段处理,不同的线程池处理其相应的路由阶段,这就意味着路由可以并行处理。
缺点:实现异步的Processor要比同步的Processor复杂得多。
异步Processor与同步Processor的区别:
a.必须提供一个AsyncCallback对象,该对象在exchange处理完成后被通知
b.在异步Processor处理exchange的时候不能抛出任何异常,而应该将异常存储在exchange的Exception属性中
c.异步Processor必须知道它将以什么方式完成处理,异步或同步,如果process方法返回true,则是同步完成,如果process方法返回false,则是异步完成。
d.当处理器处理完exchange时,它必须调用callback.done(boolean sync)方法,sync参数必须与process方法的返回值一致。
对于一个路由来说,完全使用异步模式可以降低线程的使用量,这要求从Consumer开始就必须使用异步的处理API(即调用异步的
process方法),如果Consumer调用的是同步process()方法,那么消费者线程在处理Exchange时将被强制阻塞。
有一点必须注意的是当你调用了异步的API,这并不意味着处理过程就是异步的,这仅仅是为不捆绑在调用者线程提供了可能。
至于是否是进行异步处理依赖于Camel路由的配置.
以上是Camel官方对异步Processor的解释,下面是本人用于测试的一个例子:
public static void main(String[] args) throws Exception { RouteBuilder builder = new RouteBuilder() { @Override public void configure() throws Exception { RouteDefinition definition1 = this.from("file:H:/temp/in"); RouteDefinition definition2 = definition1.process(new Processor() { @Override public void process(Exchange exchange) throws Exception { System.out.println(Thread.currentThread().getName()); System.out.println("process1"); } }).process(new AsyncProcessor() { @Override public void process(Exchange exchange) throws Exception { System.out.println("process"); } @Override public boolean process(Exchange exchange, AsyncCallback callback) { System.out.println(Thread.currentThread().getName()); System.out.println("async process"); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } callback.done(false); return false; } }).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { System.out.println(Thread.currentThread().getName()); System.out.println("process2"); } }); definition2.to("file:H:/temp/out"); } }; DefaultCamelContext camelContext = new DefaultCamelContext(); camelContext.addRoutes(builder); camelContext.start(); Object object = new Object(); synchronized (object) { object.wait(); } }
当我看到异步两个字时,直觉就是使用异步Processor时会启用新的线程进行处理,但在上面的例子中,三个线程名称是一样的,
并且在阻塞了10秒后process2才打印出来,这说明上面的三个processor是在同一个线程中执行的,这也是阻塞10秒的原因。
我个人认为是对Camel异步Processor的"异步"两字理解出现了偏差,这里的异步只为processor的processor方法,提供一个
回调函数,而不是另启线程。而且我们自己写Processor处理器对这个异步的使用也很有限,因为我们写的处理器是被调用者,AsyncCallback是由上层提供的,我们只是能调用其done方法通知上层本次处理完成,而我们更多的需求应该是自己去注册回调函数,并且我们能够控制这个回调函数的回调时机,而现在我们无法提供回调函数的注册。那我们不禁要问,这个AsyncCallback对象那到底是谁提供的呢?AsyncCallback对象的源头当然是在消费者类提供的,对上面的例子来说是在FileConsumer类中,如下是GenericFileConsumer的processExchange方法的一个片段(FileConsumer继承自GenericFileConsumer)
getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // noop if (log.isTraceEnabled()) { log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); } } });
这时创建的AsyncCallback对象就是源始的回调对象,当然在路由执行的后续过程中,该回调对象可以被包装,其中CamelInternalProcessor的process(Exchange exchange, AsyncCallback callback)方法就是一个例子:
callback = new InternalCallback(states, exchange, callback);
这里我们不禁又会问,既然CamelInternalProcessor能够对源始AsyncCallback对象进行包装加入自己的回调逻辑,为什么我们自己不行呢,其原来还是我们写的Processor是被调用者,是被包装者,具体过程可参看Camel路由启动过程。
如果非要添加自己的回调逻辑也不是不可能,就只能自己写消费者,自己写消费者就能控制源AsyncCallback对象,其后续只是对
源AsyncCallback对象的一个包装的过程,只要保证最外层的AsyncCallback对象被调用,那么源AsyncCallback对象也一定会被调用。所以在上例中,如果在第二个Processor中如果不执行callback.done(false);的话路由过程将永远不会结束,因为上层一直认为下层处理还未结束。当然如果我们不写异常Processor,路由过程还是会正常结束的,Camel内部会自行处理,但是如果我们写了异步Processor就一定要调用callback.done方法。
所以这么一通下来,并没有感受到官方提及的不阻塞调用、降价线程使用、路由分阶段处理等,个人的感觉就是多了一个回调方法,而且这个回调功能还很有限,当然这也有可能是自己什么地方理解错了,如若如此,尽请指正......
Camel之AsyncProcessor
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。