首页 > 代码库 > Quartz与Spring集成——启动调度器
Quartz与Spring集成——启动调度器
前言
在《Quartz与Spring集成——创建调度器》一文中介绍了调度器的创建过程,本文将分析其启动过程。熟悉Spring原理的人都知道AbstractApplicationContext的refresh方法的重要性,在refresh方法中调用了finishRefresh方法,最后会调用到SchedulerFactoryBean的start方法,其调用栈如图1所示。
图1 SchedulerFactoryBean的start方法的调用栈
根据图1的内容,我们知道spring容器初始化完毕的最后会启动所有的bean,SchedulerFactoryBean的start方法就是这时候被调用的。
启动调度器
SchedulerFactoryBean的start方法的实现见代码清单1所示。
代码清单1
@Override public void start() throws SchedulingException { if (this.scheduler != null) { try { startScheduler(this.scheduler, this.startupDelay); } catch (SchedulerException ex) { throw new SchedulingException("Could not start Quartz Scheduler", ex); } } }
startScheduler方法的实现,见代码清单2。
代码清单2
protected void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException { if (startupDelay <= 0) { logger.info("Starting Quartz Scheduler now"); scheduler.start(); } else { //此分支启动一个后台线程,sleep参数startupDelay指定的秒数后,再启动调度器(即调用scheduler.start()),主要用于延迟启动 } }startScheduler方法中的第二个条件分支用于延迟启动调度器,即当参数startupDelay大于0时,启动一个后台线程,睡眠(sleep)参数startupDelay指定的秒数后,再启动调度器(即调用scheduler.start();)。由于实际上也是执行scheduler.start();故此没有列出其代码。
以默认的Scheduler实现类StdScheduler为例,其start方法的实现如下:
public void start() throws SchedulerException { sched.start(); }根据《Quartz与Spring集成——创建调度器》一文的内容我们知道,这里的sched实际是QuartzScheduler的实例,其start方法的实现见代码清单3。
public void start() throws SchedulerException { if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } // QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null) { initialStart = new Date(); this.resources.getJobStore().schedulerStarted(); startPlugins(); } else { resources.getJobStore().schedulerResumed(); } schedThread.togglePause(false); getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted(); }
阅读代码清单3,整理启动QuartzScheduler的步骤如下:
- 启动前对关闭中(shuttingDown)、已关闭(closed)的状态校验,此时会抛出SchedulerException,避免进入不正确的状态;
- 通知所有对调度器监听的监听器——现在正在启动调度器,notifySchedulerListenersStarting方法的实现见代码清单4;
- 启动调度器,以LocalDataSourceJobStore为例,实际调用了其父类JobStoreSupport的schedulerStarted方法;
- 启动插件,startPlugins方法的实现见代码清单5;
- 调用QuartzSchedulerThread的togglePause方法(见代码清单6),其作用为在保证线程安全的前提下将paused设置为false,同时唤醒所有等待sigLock这个锁的线程。在《Quartz与Spring集成——创建调度器》一文中介绍QuartzSchedulerThread的启动的时候,曾经说过由于paused为true时导致不断轮询和等待sigLock。到这里QuartzSchedulerThread被唤醒后,run方法将挑出这个轮询继续执行,QuartzSchedulerThread的启动才真正开始;
- 通知所有对调度器监听的监听器——现在启动调度器已完成,notifySchedulerListenersStarted方法的实现见代码清单7;
public void notifySchedulerListenersStarting() { // build a list of all scheduler listeners that are to be notified... List<SchedulerListener> schedListeners = buildSchedulerListenerList(); // notify all scheduler listeners for (SchedulerListener sl : schedListeners) { try { sl.schedulerStarting(); } catch (Exception e) { getLog().error( "Error while notifying SchedulerListener of startup.", e); } } }
代码清单5
private void startPlugins() { java.util.Iterator<SchedulerPlugin> itr = resources.getSchedulerPlugins().iterator(); while (itr.hasNext()) { SchedulerPlugin plugin = itr.next(); plugin.start(); } }
void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); } else { sigLock.notifyAll(); } } }
public void notifySchedulerListenersStarted() { // build a list of all scheduler listeners that are to be notified... List<SchedulerListener> schedListeners = buildSchedulerListenerList(); // notify all scheduler listeners for(SchedulerListener sl: schedListeners) { try { sl.schedulerStarted(); } catch (Exception e) { getLog().error( "Error while notifying SchedulerListener of startup.", e); } } }
启动JobStore
以LocalDataSourceJobStore为例,特别来分析下其schedulerStarted方法(见代码清单8)的实现,其处理步骤如下:
- 如果是集群部署,则创建集群管理器ClusterManager(直接继承了Thread,),并调用其initialize方法(见代码清单9)执行ClusterManager自身;
- 如果不是集群部署,则调用recoverJobs方法恢复任何失败的或者触发失常的作业;
- 创建MisfireHandler(也直接继承了Thread,用于恢复任何失败的或者触发失常的作业),并调用其initialize方法(见代码清单10)执行MisfireHandler自身;
public void schedulerStarted() throws SchedulerException { if (isClustered()) { clusterManagementThread = new ClusterManager(); if(initializersLoader != null) clusterManagementThread.setContextClassLoader(initializersLoader); clusterManagementThread.initialize(); } else { try { recoverJobs(); } catch (SchedulerException se) { throw new SchedulerConfigException( "Failure occured during job recovery.", se); } } misfireHandler = new MisfireHandler(); if(initializersLoader != null) misfireHandler.setContextClassLoader(initializersLoader); misfireHandler.initialize(); schedulerRunning = true; getLog().debug("JobStore background threads started (as scheduler was started)."); }
代码清单9
public void initialize() { this.manage(); ThreadExecutor executor = getThreadExecutor(); executor.execute(ClusterManager.this); }
public void initialize() { ThreadExecutor executor = getThreadExecutor(); executor.execute(MisfireHandler.this); }
小结
经过以上分析,对Quartz如何启动调度器的原理有了较深入的了解。前面说过当paused设置为false,QuartzSchedulerThread才正式启动,有关QuartzSchedulerThread的正式启动将专门开一篇博文介绍。后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html
Quartz与Spring集成——启动调度器