首页 > 代码库 > JobTracker启动流程源码级分析
JobTracker启动流程源码级分析
org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数。JobTracker是在网络环境中提交及运行MR任务的核心位置。
main方法主要代码有两句:
1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService();
一、startTracker(new JobConf())根据配置文件启动JobTracker,这个方法会调用startTracker(conf, generateNewIdentifier())方法进行启动操作,generateNewIdentifier()将会返回一个以节点当前时间格式化成“yyyyMMddHHmm”的字符串。
startTracker函数是一个静态函数,它调用JobTracker的构造函数生成一个JobTracker类的实例,构造函数主要工作是对一些重要的变量进行初始化,名为result。然后,进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。
初始化的重要对象包括:
1、secretManager:DelegationTokenSecretManager的实例,MR安全管理相关类;
2、aclsManager:ACLsManager的实例,作业级别和队列级别的管理和访问权限控制;
3、taskScheduler:TaskScheduler的实例,调度器对象,hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler;
4、interTrackerServer:Server的实例,RPC Server;
5、infoServer:HttpServer的实例,将Job、Task、TaskTracker相关信息显示到WEB前端,封装的是jetty;
6、recoveryManager:RecoveryManager的实例,作业恢复管理,即JobTracker启动时,恢复上次停止时正在运行的作业,并恢复各个任务的运行状态;recoveryManager.checkAndAddJob(status)会检查出那些作业需要恢复并放入Set<JobID> jobsToRecover; // set of jobs to be recovered,为后面的recoveryManager.recover()做准备;
7、jobHistoryServer:JobHistoryServer的实例,用于查看作业历史信息的Server;
8、dnsToSwitchMapping:DNSToSwitchMapping的实例,用于构建集群的网络拓扑结构,它能将节点地址(IP或者host)映射成网络位置。
二、 tracker.offerService()
1 /** 2 * Run forever 3 */ 4 public void offerService() throws InterruptedException, IOException { 5 // Prepare for recovery. This is done irrespective of the status of restart 6 // flag. 7 while (true) { 8 try { 9 recoveryManager.updateRestartCount(); 10 break; 11 } catch (IOException ioe) { 12 LOG.warn("Failed to initialize recovery manager. ", ioe); 13 // wait for some time 14 Thread.sleep(FS_ACCESS_RETRY_PERIOD); 15 LOG.warn("Retrying..."); 16 } 17 } 18 19 taskScheduler.start(); 20 21 // Start the recovery after starting the scheduler 22 try { 23 recoveryManager.recover(); 24 } catch (Throwable t) { 25 LOG.warn("Recovery manager crashed! Ignoring.", t); 26 } 27 // refresh the node list as the recovery manager might have added 28 // disallowed trackers 29 refreshHosts(); 30 //用于发现和清理死掉的TaskTracker 31 this.expireTrackersThread = new Thread(this.expireTrackers, 32 "expireTrackers"); 33 this.expireTrackersThread.start(); 34 //用于清理长时间驻留在内存中的已经运行完成的作业信息 35 this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); 36 this.retireJobsThread.start(); 37 //用于发现已经被分配给某个TaskTracker但一直未汇报信息的任务 38 expireLaunchingTaskThread.start(); 39 40 if (completedJobStatusStore.isActive()) { 41 completedJobsStoreThread = new Thread(completedJobStatusStore, 42 "completedjobsStore-housekeeper"); 43 //将已经运行完成的作业运行信息保存到HDFS上,并提供了一套存取这些信息的API。 44 completedJobsStoreThread.start(); 45 } 46 47 // start the inter-tracker server once the jt is ready 48 this.interTrackerServer.start(); 49 50 synchronized (this) { 51 state = State.RUNNING; 52 } 53 LOG.info("Starting RUNNING"); 54 55 this.interTrackerServer.join(); 56 LOG.info("Stopped interTrackerServer"); 57 }
1、首先是不论重启是什么状态都必须要做的recoveryManager.updateRestartCount()更新JobTracker集群重启次数,更新文件${hadoop.tmp.dir}/mapred/system/jobtracker.info。该方法首先判断如果有restartFile(就是前面说的更新文件),就删除tmpRestartFile(不管存在与否);如果不存在restartFile而存在tmpRestartFile,则将tmpRestartFile重命名为restartFile;如果两个文件都没有可能是第一次启动也可能是文件丢失了,这时就不用恢复操作了shouldRecover = false,并且创建一个restartFile写入0;再读出restartFile文件的数字,并+1,创建一个tmpRestartFile将增加后的重启次数计数器restartCount写入这个文件,删除restartFile文件,将tmpRestartFile改名为restartFile。这个重启次数存在的目的官方说法是“The whole purpose of this api is to obtain restart counts across restarts to avoid attempt-id clashes.”
2、taskScheduler.start()是启动调度器。默认的调度器是JobQueueTaskScheduler,其start()方法如下:
1 //JobQueueTaskScheduler类的start方法主要注册了两个非常重要的监听 器: 2 //jobQueueJobInProgressListener和eagerTaskInitializationListener。 3 //前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列, 4 //并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例, 5 //该类不断监 听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入), 6 //则立即调用该实例的initTasks方 法,对job进行初始化。 7 @Override 8 public synchronized void start() throws IOException { 9 //调用TaskScheduler.start()方法,实际上没有做任何事情 10 super.start(); 11 //注册一个JobInProgressListerner监听器 12 taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); 13 eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); 14 eagerTaskInitializationListener.start(); 15 taskTrackerManager.addJobInProgressListener( 16 eagerTaskInitializationListener); 17 }
taskTrackerManager其实就是JobTracker。eagerTaskInitializationListener.start()会启动一个线程始终监控List<JobInProgress> jobInitQueue一旦发现里面有新的JobInProgress就构造一个InitJob线程放入threadPool线程池中运行,该线程通过JobTracker.initJob(JobInProgress job)对Job进行初始化。然后向JobTracker注册eagerTaskInitializationListener。
3、recoveryManager.recover()。JobTracker节点由于意外情况而宕机的话,那么可能有一部分Job正在执行,也有一部分Job被用户成功提交了可还没有开始被调度执行,那么当我们重启JobTracker节点的时候就需要恢复或者重做这些还没有完成的Job。这里要说的是RecoveryManager启动对未完成Job的恢复是在JobTracker节点的主线程中完成的,而且是在JobTracker节点的所有后台线程启动之前,这个调用必须要在所有的未完成的Job被完成之后才返回。也就是说,JobTracker的作业恢复管理器在恢复作业的处理过程中,JobTracker节点不会接受客户端的任何请求,也不接受TaskTracker的任何请求。这个比较复杂以后再讲解。
4、refreshHosts()方法会先重新加载mapred.hosts和mapred.hosts.exclude指定的文件中主机信息到相应的Set中;然后从taskTrackers中找出没在mapred.hosts中但在mapred.hosts.exclude中的taskTracker从相关的数据结构中删除此taskTracker。节点均以mapred.hosts和mapred.hosts.exclude中的为准。
5、启动一个ExpireTrackers线程会监控trackerExpiryQueue一旦里面TaskTracker有超过10分钟没有心跳的,JobTracker就认为它死了,将其从相关的数据结构trackerToMarkedTasksMap、trackerToJobsToCleanup、trackerToTasksToCleanup以及trackerToTaskMap删除。这在lostTaskTracker(TaskTracker taskTracker)方法中进行。
6、启动一个RetireJobs线程,会将jobs中的完成的job存储一定时长后且,从taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap、jobs、userToJobsMap(每个用户完成的job数要>100)数据结构中删除。
7、启动一个ExpireLaunchingTasks线程,如果一个TaskAttemptID超过10分钟没有回报信息,则JobTracker认为这个task已经失败,从launchingTasks删除相关信息,并将此task状态标注为FAILED。
8、启动一个CompletedJobStatusStore线程,默认"mapred.job.tracker.persist.jobstatus.active"是false表示不启动这个线程,如果启用则需要指定保存时间"mapred.job.tracker.persist.jobstatus.hours"(默认是0,不保存)和保存路径"mapred.job.tracker.persist.jobstatus.dir"(默认是/jobtracker/jobsInfo)。如果不启用该线程则所有的作业运行信息全部在内存中,且随着时间及运行任务的增多早期的作业信息会被删除。
这样JobTracker就启动了。。。。就等着Client提交Job。。。
参考:1、董西成,《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》
2、http://blog.csdn.net/xhh198781/article/details/7354257