首页 > 代码库 > Camel路由启动过程--续
Camel路由启动过程--续
上篇Camel启动路由过程中讲到启动Consumer,调用了DefaultCamelContext.startService(service)方法,下面是方法源码:
private void startService(Service service) throws Exception { if (service instanceof StartupListener) { StartupListener listener = (StartupListener) service; addStartupListener(listener); } service.start(); }
上面方法中调用了service的start()方法,在我们的示例中,这个service对象其实是FileConsumer对象,FileConsumer继承关系如下:
FileConsumer extends GenericFileConsumer<File> extends ScheduledBatchPollingConsumer extends ScheduledPollConsumer extends DefaultConsumer extends ServiceSupport, FileConsumer的start()方法是从ServiceSupport继承而来,在ServiceSupport的start()方法中调用了doStart()方法,而FileConsumer的doStart()方法又是从GenericFileConsumer继承而来,GenericFileConsumer又调用父类ScheduledPollConsumer的doStart()方法,这里经过一系列的多态方法调用。
最重要的就是ScheduledPollConsumer的doStart()方法:
protected void doStart() throws Exception { //先调用父类的doStart()方法 super.doStart(); //省略... //如果调用器为空,则创建一个 if (scheduler == null) { scheduler = new DefaultScheduledPollConsumerScheduler(); } scheduler.setCamelContext(getEndpoint().getCamelContext()); //设置scheduler的consumer成员变量 scheduler.onInit(this); //设置scheduler的task成员变量 scheduler.scheduleTask(this); //省略... //启动scheduler,该方法调用了DefaultScheduledPollConsumerScheduler的doStart()方法 //为scheduler获取java.util.concurrent.ScheduledExecutorService,即线程池 //因为要轮询文件肯定是要开启新的线程的,所以需要一个线程池对象 ServiceHelper.startService(scheduler); if (isStartScheduler()) { //运行scheduler startScheduler(); } }
这里最重要的就是startScheduler()方法,该方法调用了scheduler.startScheduler()方法,下面是DefaultScheduledPollConsumerScheduler.startScheduler()方法源码:
@Override public void startScheduler() { // only schedule task if we have not already done that if (future == null) { if (isUseFixedDelay()) { //省略... future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit()); } else { //省略... future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit()); } } }该方法就是根据设置参数调用线程池不能的方法,注意,这里执行的任务(task)就是FileConsumer对象,所以会执行FileConsumer的run()方法,run()方法从ScheduledPollConsumer继承而来,里面调用doRun()方法,下面是源码:
private void doRun() { boolean done = false; int polledMessages = 0; while (!done) { try { if (isPollAllowed()) { polling = true; try { boolean begin = pollStrategy.begin(this, getEndpoint()); if (begin) { retryCounter++; //调用poll()方法进行轮询 polledMessages = poll(); LOG.trace("Polled {} messages", polledMessages); //判断是否要发送空消息 if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { // send an "empty" exchange processEmptyMessage(); } } } } } } }
上面列出的代码中省略了很多代码,最重要的就是poll()方法,下面是GenericFileConsumer.poll()方法源码:
protected int poll() throws Exception { //省略... //收集需要处理的文件 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); String name = endpoint.getConfiguration().getDirectory(); //该方法将文件轮询出来 boolean limitHit = !pollDirectory(name, files, 0); //将轮询出来的文本绑定到Exchange LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); for (GenericFile<T> file : files) { Exchange exchange = endpoint.createExchange(file); endpoint.configureExchange(exchange); endpoint.configureMessage(file, exchange.getIn()); exchanges.add(exchange); } //是否需要排序 if (endpoint.getSortBy() != null) { Collections.sort(exchanges, endpoint.getSortBy()); } // use a queue for the exchanges Deque<Exchange> q = exchanges; //省略... //传入创建的Exchange对象进行批量处理 int polledMessages = processBatch(CastUtils.cast(q)); postPollCheck(); return polledMessages; }
在processBatch()方法中遍历各个Exchange对象,然后调用processExchange方法,下面是源码:
protected boolean processExchange(final Exchange exchange) { //省略很多代码... getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // noop if (log.isTraceEnabled()) { log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); } } }); } return true; }
getAsyncProcessor()方法就是将Consumer中的processor成员变量包装成了AsyncProcessor类型,然后调用其process方法在Camel路由启动过程讲到,Consumer中的processor就是经过层层包装了的CamelInternalProcessor类型,CamelInternalProcessor包装了Pipeline,Pipeline包装了DefaultChannel,DefaultChannel包装了路由定义输出ProcessorDefiniton。
下面我们就去看看CamelInternalProcessor的process方法:
@Override public boolean process(Exchange exchange, AsyncCallback callback) { ////省略... Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); if (exchange.isTransacted() || synchronous != null) { //省略... try { processor.process(exchange);//同步处理 } catch (Throwable e) { exchange.setException(e); } callback.done(true); return true; } else { //省略... boolean sync = processor.process(exchange, async);//异步处理 //省略... } }
在示例中调用的是异步处理方法,而processor就是CamelInternalProcessor包装的Pipeline对象了,下面是Pipeline的process方法:
public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = getProcessors().iterator(); Exchange nextExchange = exchange; boolean first = true; while (continueRouting(processors, nextExchange)) { if (first) { first = false; } else { // prepare for next run nextExchange = createNextExchange(nextExchange); } // get the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, nextExchange, callback, processors, async); } callback.done(true); return true; }
该方法中就是在遍历Pipeline内部的DefaultChannel对象,并调用其process方法,最后调用路由定义输出ProcessorDefinition的createProcess方法创建一个Processor对象,在示例中即ProcessDefinition与ToDefinition的createProcess方法,ProcessDefinition的process方法返回的就是我们自己在配置路由定义时传入的
processor对象,并且调用其process方法。这样就调用到了我们自定义的处理器。
下面我们去看看ToDefinition的createProcess方法(从SendDefiniton继承而来):
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { Endpoint endpoint = resolveEndpoint(routeContext); return new SendProcessor(endpoint, getPattern()); }这就是创建一个SendProcessor对象,下面是SendProcessor对象的process方法:
public boolean process(Exchange exchange, final AsyncCallback callback) { //省略... final ExchangePattern existingPattern = exchange.getPattern(); // if we have a producer then use that as its optimized if (producer != null) { //省略... return producer.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { try { // restore previous MEP target.setPattern(existingPattern); // emit event that the exchange was sent to the endpoint long timeTaken = watch.stop(); EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken); } finally { callback.done(doneSync); } } }); } //省略... }
可以看到,这时调用了producer的process方法,而这个producer就是GenericFileProducer,因为FileEndpoint创建的Proceducer就是GenericFileProducer类型,在GenericFileProducer的process方法中调用了其processExchange方法,下面是源码:
protected void processExchange(Exchange exchange, String target) throws Exception { try { //省略很多代码... // write/upload the file writeFile(exchange, tempTarget != null ? tempTarget : target); //省略很多代码... exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); } catch (Exception e) { handleFailedWrite(exchange, e); } postWriteCheck(); }
看到writeFile方法大家应该明白了,就是把Exchange对象当中的文件写到指定目录了,至于是怎么写文件的应该就不用多说了。
至此,路由定义的配置与构建过程,路由定义启动过程就说完了,如果有错误之处尽请指正。
Camel路由启动过程--续
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。