首页 > 代码库 > Camel路由启动过程

Camel路由启动过程

路由启动由CamelContext的start()方法开始,在该方法中调用了super.start(),即调用父类ServiceSupport的start()方法,ServiceSupport的start()方法中调用了doStart()方法又回到CamelContext的doStart()方法,该方法中调用了doStartCamel()方法,在doStartCamel()方法中有两个最重要的方法:startRouteDefinitions()与doStartOrResumeRoutes()方法。

我们先看startRouteDefinitions()方法:

protected void startRouteDefinitions(Collection<RouteDefinition> list) throws Exception {
	if (list != null) {
		for (RouteDefinition route : list) {
			startRoute(route);
		}
	}
}

该方法遍历所有路由定义,调用startRoute()方法,下面是源码:

public void startRoute(RouteDefinition route) throws Exception {
    //省略一些代码...

	//设置正在启动路由为true
    isStartingRoutes.set(true);
    try {
        //确定路由已经预处理完毕
        route.prepare(this);
		//创建一个路由list
        List<Route> routes = new ArrayList<Route>();
		//调用RouteDefinition的addRoutes方法,返回一个路由上下文list
        List<RouteContext> routeContexts = route.addRoutes(this, routes);
		//创建路由服务
        RouteService routeService = new RouteService(this, route, routeContexts, routes);
		//启动路由服务
        startRouteService(routeService, true);
    } finally {
        // we are done staring routes
        isStartingRoutes.remove();
    }
}

下面是RouteDefinition的.addRoutes方法源码:

public List<RouteContext> addRoutes(ModelCamelContext camelContext, Collection<Route> routes) throws Exception {
    List<RouteContext> answer = new ArrayList<RouteContext>();

    @SuppressWarnings("deprecation")
    ErrorHandlerFactory handler = camelContext.getErrorHandlerBuilder();
    if (handler != null) {
        setErrorHandlerBuilderIfNull(handler);
    }
	
	//迭代路由定义输入集体,在示例中只有一个输入
    for (FromDefinition fromType : inputs) {
        RouteContext routeContext;
        try {
        	//调用另一重载addRoutes方法
            routeContext = addRoutes(camelContext, routes, fromType);
        } catch (FailedToCreateRouteException e) {
            throw e;
        } catch (Exception e) {
            // wrap in exception which provide more details about which route was failing
            throw new FailedToCreateRouteException(getId(), toString(), e);
        }
        answer.add(routeContext);
    }
    return answer;
}

下面是重载addRoutes(camelContext, routes, fromType)方法源码:

protected RouteContext addRoutes(CamelContext camelContext, Collection<Route> routes, FromDefinition fromType) throws Exception {
    RouteContext routeContext = new DefaultRouteContext(camelContext, this, fromType, routes);

	//省略很多代码...
	//解析Endpoint
	routeContext.getEndpoint();
    
    //省略很多代码...
	
	//创建一list将路由定义的所有输出添加进去
    List<ProcessorDefinition<?>> list = new ArrayList<ProcessorDefinition<?>>(outputs);
    //迭代所有输出
    for (ProcessorDefinition<?> output : list) {
        try {
        	//调用输出ProcessorDefinition的addRoutes方法
            output.addRoutes(routeContext, routes);
        } catch (Exception e) {
            RouteDefinition route = routeContext.getRoute();
            throw new FailedToCreateRouteException(route.getId(), route.toString(), output.toString(), e);
        }
    }

    routeContext.commit();
    return routeContext;
}

在该方法中先调用了RouteContext.getEndpoint()方法,下面是getEndpoint方法源码:

public Endpoint getEndpoint() {
    if (endpoint == null) {
    	//调用FromDefinition的resolveEndpoint方法返回一个Endpoint对象并赋给endpoint成员变量
        endpoint = from.resolveEndpoint(this);
    }
    return endpoint;
}

FromDefinition的resolveEndpoint方法中调用RouteContext的resolveEndpoint(uri, ref)方法,下面源码:

public Endpoint resolveEndpoint(String uri, String ref) {
    Endpoint endpoint = null;
    if (uri != null) {
        endpoint = resolveEndpoint(uri);
        if (endpoint == null) {
            throw new NoSuchEndpointException(uri);
        }
    }
    if (ref != null) {
        endpoint = lookup(ref, Endpoint.class);
        if (endpoint == null) {
            throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
        }
        // Check the endpoint has the right CamelContext 
        if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
            throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
        }
    }
    if (endpoint == null) {
        throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
    } else {
        return endpoint;
    }
}

在上面的方法中一是根据uri解析出Endpoint,二是根据ref值从注册表中获取Endpoint,这里看前一种方式:在resolveEndpoint(uri)方法中调用了RouteDefinition的resolveEndpoint方法,该方法又调用CamelContextHelper.getMandatoryEndpoint(camelContext, uri)方法,
getMandatoryEndpoint再调用CamelContext的getEndpoint(String uri),在这个方法中调用了getComponent(uri)方法,然后由getComponent(uri)返回的Component调用createEndpoint()方法获取Endpoint,这里就和Camel查找组件方式串接起来了。


现在返回到RouteDefinition的addRoutes方法中,接下来就是迭代路由定义输出了,在示例中,两个输出ProcessorDefinition分别为ProcessDefinition与ToDefinition,两者都继承自ProcessorDefinition,最终都是调用ProcessorDefinition的addRoutes方法,下面是该方法源码:

public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
    Processor processor = makeProcessor(routeContext);
    if (processor == null) {
        // no processor to add
        return;
    }

    if (!routeContext.isRouteAdded()) {
        //省略一些代码...

        // only add regular processors as event driven
        if (endpointInterceptor) {
            log.debug("Endpoint interceptor should not be added as an event driven consumer route: {}", processor);
        } else {
            log.trace("Adding event driven processor: {}", processor);
            routeContext.addEventDrivenProcessor(processor);
        }

    }
}

这里最重要的是makeProcessor方法,就是层层包装创建出一个处理器,它其实是一个DefaultChanel实例,然后调用addEventDrivenProcessor将其添加到RouteContext中。下面是makeProcessor(routeContext)方法源码:

protected Processor makeProcessor(RouteContext routeContext) throws Exception {
    Processor processor = null;
	
    //省略很多代码...
    
    if (processor == null) {
    	//这里就是在调用路由定义输出ProcessDefinition与ToDefinition的createProcessor方法
        processor = createProcessor(routeContext);
    }

    if (processor == null) {
        // no processor to make
        return null;
    }
    //processor创建出来了进行包装
    return wrapProcessor(routeContext, processor);
}

对于ProcessDefinition的createProcessor方法返回的就是我们传入的Processor对象,ToDefinition的createProcessor方法返回的是一个SendProcessor对象。

在wrapProcessor方法调用了wrapChannel方法,就是要将Processor包装成一个DefaultChannel对象,然后对该对象进行一些初始化的操作。

再回到RouteDefinition的addRoutes方法,调用完ProcessorDefinition的addRoutes方法后调用RouteContext的commit()方法,下面是commit()方法源码:

public void commit() {
    //前面说到创建出的DefaultChannel对象已经全部添加到RouteContext中了所以eventDrivenProcessors就不会为空
    if (!eventDrivenProcessors.isEmpty()) {
    	//创建Pipeline对象,将eventDrivenProcessors传进去
        Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);

        String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());

        //将Pipeline包装进CamelInternalProcessor中
        CamelInternalProcessor internal = new CamelInternalProcessor(target);
        
        //省略很多代码
        
        //创建EventDrivenConsumerRoute对象,这才是真正的路由对象,前面的都是路由定义
        //getEndpoint()得到的Endpoint对象就是FromDefinition.resolveEndpoint返回的Endpoint对象
        //示例中就是FileEndpoint对象
        //将CamelInternalProcessor对象传入Route
        Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
        
        //省略很多代码
        
		//将创建出来的路由对象添加进路由集合
        routes.add(edcr);
    }
}

现在回到DefaultCamelContext.startRoute(RouteDefinition route)方法,routeContexts list返回后,用于创建一个RouteService对象然后调用startRouteService方法,该方法中调用safelyStartRouteServices方法时有一判断,shouldStartRoutes是否应该启动路由,对示例中来说,由于这里CamelContext还没有启动完成,所以并没有调用到safelyStartRouteServices()方法,如果是CamelContext先启动的,添加的路由则会启动。

现在回到DefaultCamelContext.doStartCamel()中,接下来就要调用doStartOrResumeRoutes()方法:

protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash,
                                         boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception {
    // 过滤掉已经启动的路由服务
    Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
    for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
        boolean startable = false;

        Consumer consumer = entry.getValue().getRoutes().iterator().next().getConsumer();
        if (consumer instanceof SuspendableService) {
            // consumer could be suspended, which is not reflected in the RouteService status
            startable = ((SuspendableService) consumer).isSuspended();
        }

        if (!startable && consumer instanceof StatefulService) {
            // consumer could be stopped, which is not reflected in the RouteService status
            startable = ((StatefulService) consumer).getStatus().isStartable();
        } else if (!startable) {
            // no consumer so use state from route service
            startable = entry.getValue().getStatus().isStartable();
        }

        if (startable) {
            filtered.put(entry.getKey(), entry.getValue());
        }
    }

    if (!filtered.isEmpty()) {
        //启动还未启动的路由服务
        safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
    }

    // now notify any startup aware listeners as all the routes etc has been started,
    // allowing the listeners to do custom work after routes has been started
    for (StartupListener startup : startupListeners) {
        startup.onCamelContextStarted(this, isStarted());
    }
}

上面方法中又调用了safelyStartRouteServices方法,然后其调用一重载方法,重载方法中调用了doWarmUpRoutes与doStartRouteConsumers方法,在doWarmUpRoutes方法中调用了RouteService.warmUp(),下面是warmUp方法源码:

public synchronized void warmUp() throws Exception {
    if (endpointDone.compareAndSet(false, true)) {
        for (Route route : routes) {
            // 确保Endpoint先启动
            ServiceHelper.startService(route.getEndpoint());
        }
    }

    if (warmUpDone.compareAndSet(false, true)) {

        for (Route route : routes) {
            // warm up the route first
            route.warmUp();
			
	    //该方法中调用addServices方法,addServices方法中创建了Consumer并赋给了Route的consumer属性
	    route.onStartingServices(services);
            //省略很多代码
            
            List<Service> childServices = new ArrayList<Service>();
            for (Service service : list) {

                // inject the route
                if (service instanceof RouteAware) {
                    ((RouteAware) service).setRoute(route);
                }

                if (service instanceof Consumer) {
                	//如果是Consumer实例则添加到inputs中
                    inputs.put(route, (Consumer) service);
                } else {
                    childServices.add(service);
                }
            }
            startChildService(route, childServices);
        }

        // ensure lifecycle strategy is invoked which among others enlist the route in JMX
        for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
            strategy.onRoutesAdd(routes);
        }

        // add routes to camel context
        camelContext.addRouteCollection(routes);
    }
}

这里说明一个Consumer的创建,下面是EventDrivenConsumerRoute.addServices方法源码:

@Override
protected void addServices(List<Service> services) throws Exception {
	Endpoint endpoint = getEndpoint();
	//这里调用了endpoint(示例中即FileEndpoint)的createConsumer方法 
	//传入了一个processor,而这个processor就是前面经过层层包装了的CamelInternalProcessor
	consumer = endpoint.createConsumer(processor);
	if (consumer != null) {
		services.add(consumer);
		if (consumer instanceof RouteAware) {
			((RouteAware) consumer).setRoute(this);
		}
	}
	Processor processor = getProcessor();
	if (processor instanceof Service) {
		services.add((Service)processor);
	}
}


doStartRouteConsumers方法中调用doStartOrResumeRouteConsumers方法,下面是源码:

private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception {
    List<Endpoint> routeInputs = new ArrayList<Endpoint>();

    for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
        Integer order = entry.getKey();
        Route route = entry.getValue().getRoute();
        //取出RouteService
        RouteService routeService = entry.getValue().getRouteService();

        //省略...

        // start the service
        for (Consumer consumer : routeService.getInputs().values()) {
            Endpoint endpoint = consumer.getEndpoint();

            //省略...

            if (resumeOnly && route.supportsSuspension()) {
                // if we are resuming and the route can be resumed
                ServiceHelper.resumeService(consumer);
                log.info("Route: " + route.getId() + " resumed and consuming from: " + endpoint);
            } else {
                // when starting we should invoke the lifecycle strategies
                for (LifecycleStrategy strategy : lifecycleStrategies) {
                    strategy.onServiceAdd(this, consumer, route);
                }
                //启动Consumer,在示例中就是FileConsumer
                startService(consumer);
                log.info("Route: " + route.getId() + " started and consuming from: " + endpoint);
            }

            //省略...
        }

        if (resumeOnly) {
            routeService.resume();
        } else {
            //启动RouteService,其内部就是调用各个Route的start()方法,但因其doStart没做什么操作
            //所以该方法也就没什么操作了
            routeService.start(false);
        }
    }
}

该方法中就把Consumer(FileConsumer)启动起来了,这篇就先讲到这里,下篇讲解文件是如何被轮询出来再经过处理最后搬运到另一目录的

Camel路由启动过程