首页 > 代码库 > Camel路由启动过程--续

Camel路由启动过程--续

上篇Camel启动路由过程中讲到启动Consumer,调用了DefaultCamelContext.startService(service)方法,下面是方法源码:

private void startService(Service service) throws Exception {
	if (service instanceof StartupListener) {
		StartupListener listener = (StartupListener) service;
		addStartupListener(listener);
	}
	service.start();
}

上面方法中调用了service的start()方法,在我们的示例中,这个service对象其实是FileConsumer对象,FileConsumer继承关系如下:
FileConsumer extends GenericFileConsumer<File> extends ScheduledBatchPollingConsumer extends ScheduledPollConsumer extends DefaultConsumer extends ServiceSupport,  FileConsumer的start()方法是从ServiceSupport继承而来,在ServiceSupport的start()方法中调用了doStart()方法,而FileConsumer的doStart()方法又是从GenericFileConsumer继承而来,GenericFileConsumer又调用父类ScheduledPollConsumer的doStart()方法,这里经过一系列的多态方法调用。

最重要的就是ScheduledPollConsumer的doStart()方法:

protected void doStart() throws Exception {
	//先调用父类的doStart()方法
	super.doStart();

	//省略...

	//如果调用器为空,则创建一个
	if (scheduler == null) {
		scheduler = new DefaultScheduledPollConsumerScheduler();
	}
	scheduler.setCamelContext(getEndpoint().getCamelContext());
	//设置scheduler的consumer成员变量
	scheduler.onInit(this);
	//设置scheduler的task成员变量
	scheduler.scheduleTask(this);

	//省略...

	//启动scheduler,该方法调用了DefaultScheduledPollConsumerScheduler的doStart()方法
	//为scheduler获取java.util.concurrent.ScheduledExecutorService,即线程池
	//因为要轮询文件肯定是要开启新的线程的,所以需要一个线程池对象
	ServiceHelper.startService(scheduler);

	if (isStartScheduler()) {
		//运行scheduler
		startScheduler();
	}
}


这里最重要的就是startScheduler()方法,该方法调用了scheduler.startScheduler()方法,下面是DefaultScheduledPollConsumerScheduler.startScheduler()方法源码:

@Override
public void startScheduler() {
	// only schedule task if we have not already done that
	if (future == null) {
		if (isUseFixedDelay()) {
			//省略...

			future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit());
		} else {
			//省略...

			future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit());
		}
	}
}
该方法就是根据设置参数调用线程池不能的方法,注意,这里执行的任务(task)就是FileConsumer对象,所以会执行FileConsumer的run()方法,run()方法从ScheduledPollConsumer继承而来,里面调用doRun()方法,下面是源码:

private void doRun() {
	boolean done = false;
	int polledMessages = 0;
	
	while (!done) {
		try {
			if (isPollAllowed()) {
				polling = true;
				try {
					boolean begin = pollStrategy.begin(this, getEndpoint());
					if (begin) {
						retryCounter++;
						//调用poll()方法进行轮询
						polledMessages = poll();
						LOG.trace("Polled {} messages", polledMessages);
						
						//判断是否要发送空消息
						if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
							// send an "empty" exchange
							processEmptyMessage();
						}
					}
				}
			}
		}
	}
}

上面列出的代码中省略了很多代码,最重要的就是poll()方法,下面是GenericFileConsumer.poll()方法源码:

protected int poll() throws Exception {
	//省略...

	//收集需要处理的文件
	List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
	String name = endpoint.getConfiguration().getDirectory();
	
	//该方法将文件轮询出来
	boolean limitHit = !pollDirectory(name, files, 0);

	//将轮询出来的文本绑定到Exchange
	LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
	for (GenericFile<T> file : files) {
		Exchange exchange = endpoint.createExchange(file);
		endpoint.configureExchange(exchange);
		endpoint.configureMessage(file, exchange.getIn());
		exchanges.add(exchange);
	}
	//是否需要排序
	if (endpoint.getSortBy() != null) {
		Collections.sort(exchanges, endpoint.getSortBy());
	}

	// use a queue for the exchanges
	Deque<Exchange> q = exchanges;

	//省略...
	
	//传入创建的Exchange对象进行批量处理
	int polledMessages = processBatch(CastUtils.cast(q));

	postPollCheck();

	return polledMessages;
}

在processBatch()方法中遍历各个Exchange对象,然后调用processExchange方法,下面是源码:

protected boolean processExchange(final Exchange exchange) {
        //省略很多代码...

		getAsyncProcessor().process(exchange, new AsyncCallback() {
			public void done(boolean doneSync) {
				// noop
				if (log.isTraceEnabled()) {
					log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
				}
			}
		});

	}
	return true;
}

getAsyncProcessor()方法就是将Consumer中的processor成员变量包装成了AsyncProcessor类型,然后调用其process方法在Camel路由启动过程讲到,Consumer中的processor就是经过层层包装了的CamelInternalProcessor类型,CamelInternalProcessor包装了Pipeline,Pipeline包装了DefaultChannel,DefaultChannel包装了路由定义输出ProcessorDefiniton。

下面我们就去看看CamelInternalProcessor的process方法:

@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
	////省略...
	Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
	if (exchange.isTransacted() || synchronous != null) {
		//省略...
		try {
			processor.process(exchange);//同步处理
		} catch (Throwable e) {
			exchange.setException(e);
		}
		callback.done(true);
		return true;
	} else {
		//省略...

		boolean sync = processor.process(exchange, async);//异步处理
		
		//省略...
	}
}

在示例中调用的是异步处理方法,而processor就是CamelInternalProcessor包装的Pipeline对象了,下面是Pipeline的process方法:

public boolean process(Exchange exchange, AsyncCallback callback) {
    Iterator<Processor> processors = getProcessors().iterator();
    Exchange nextExchange = exchange;
    boolean first = true;

    while (continueRouting(processors, nextExchange)) {
        if (first) {
            first = false;
        } else {
            // prepare for next run
            nextExchange = createNextExchange(nextExchange);
        }

        // get the next processor
        Processor processor = processors.next();

        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
        boolean sync = process(exchange, nextExchange, callback, processors, async);
    }

    callback.done(true);
    return true;
}

该方法中就是在遍历Pipeline内部的DefaultChannel对象,并调用其process方法,最后调用路由定义输出ProcessorDefinition的createProcess方法创建一个Processor对象,在示例中即ProcessDefinition与ToDefinition的createProcess方法,ProcessDefinition的process方法返回的就是我们自己在配置路由定义时传入的
processor对象,并且调用其process方法。这样就调用到了我们自定义的处理器。
下面我们去看看ToDefinition的createProcess方法(从SendDefiniton继承而来):

@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
    Endpoint endpoint = resolveEndpoint(routeContext);
    return new SendProcessor(endpoint, getPattern());
}
这就是创建一个SendProcessor对象,下面是SendProcessor对象的process方法:

public boolean process(Exchange exchange, final AsyncCallback callback) {
    //省略...
    final ExchangePattern existingPattern = exchange.getPattern();

    // if we have a producer then use that as its optimized
    if (producer != null) {
		//省略...
        return producer.process(exchange, new AsyncCallback() {
            @Override
            public void done(boolean doneSync) {
                try {
                    // restore previous MEP
                    target.setPattern(existingPattern);
                    // emit event that the exchange was sent to the endpoint
                    long timeTaken = watch.stop();
                    EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken);
                } finally {
                    callback.done(doneSync);
                }
            }
        });
    }

    //省略...
}

可以看到,这时调用了producer的process方法,而这个producer就是GenericFileProducer,因为FileEndpoint创建的Proceducer就是GenericFileProducer类型,在GenericFileProducer的process方法中调用了其processExchange方法,下面是源码:

protected void processExchange(Exchange exchange, String target) throws Exception {
    try {
        //省略很多代码...

        // write/upload the file
        writeFile(exchange, tempTarget != null ? tempTarget : target);

        //省略很多代码...
        exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target);
    } catch (Exception e) {
        handleFailedWrite(exchange, e);
    }

    postWriteCheck();
}

看到writeFile方法大家应该明白了,就是把Exchange对象当中的文件写到指定目录了,至于是怎么写文件的应该就不用多说了。

至此,路由定义的配置与构建过程,路由定义启动过程就说完了,如果有错误之处尽请指正。



Camel路由启动过程--续