首页 > 代码库 > hadoop运行原理之Job运行(三) TaskTracker的启动及初始化
hadoop运行原理之Job运行(三) TaskTracker的启动及初始化
与JobTracker一样,TaskTracker也有main()方法,然后以线程的方式启动(继承了Runnable接口)。main()方法中主要包含两步:一是创建一个TaskTracker对象;二是启动TaskTracker线程。
1 public static void main(String argv[]) throws Exception { 2 ... 3 try { 4 JobConf conf=new JobConf(); 5 ... 6 TaskTracker tt = new TaskTracker(conf); 7 ... 8 tt.run(); 9 } catch (Throwable e) {10 ...11 }12 }
一、TaskTracker tt = new TaskTracker(conf); 该方法会首先获取Map和Reduce的slot数量、磁盘检查时间间隔和JobTracker的地址;然后构造一个HttpServer(Jetty服务器)对象,这样可以通过web查看该tasktracker的运行情况及所有task;接着构造TaskController对象,用来控制任务的初始化、终结和清理工作;再接着调用initialize()方法来初始化一些变量并构造TaskTracker对象;最后启动HttpServe服务器。
着重说下initialize()方法。这个方法真正完成了构造TaskTracker对象的工作,并初始化了一些变量。用这个单独的方法来构造TaskTracker,可以使我们多次调用它。首先initialize()方法会清理一些历史文件(第一次启动TaskTracker不会有历史文件),并新建一些目录;接着初始化一些变量;构造JvmManager对象用来管理JVM;初始化并启动taskReportServer,Task通过RPC向该Server汇报进度;初始化distributedCacheManager对象用来管理分布式缓存;初始化InterTrackerProtocol对象,TaskTracker由此与JobTracker通信;构造两个TaskLauncher线程:mapLauncher和reduceLauncher并启动,这个线程的run函数就是不断监测taskToLaunch队列中是否有新的 TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的 startNewTask(TaskInProgress tip)来启动一个task。
二、tt.run(); 该方法维护一个与JobTracker的通信连接,以周期性地向JobTracker发送心跳并领取新任务。
1 public void run() { 2 try { 3 getUserLogManager().start(); 4 startCleanupThreads(); 5 boolean denied = false; 6 while (running && !shuttingDown) { 7 boolean staleState = false; 8 try { 9 // This while-loop attempts reconnects if we get network errors10 while (running && !staleState && !shuttingDown && !denied) { //该循环条件表示与JobTracker连接成功;11 //否则的话会在外层循环中一直尝试执行该内层循环12 try {13 //如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。14 //offerService()中的循环会每隔一段时间与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。15 State osState = offerService();16 if (osState == State.STALE) {17 staleState = true;18 } else if (osState == State.DENIED) {19 denied = true;20 }21 } catch (Exception ex) {22 if (!shuttingDown) {23 LOG.info("Lost connection to JobTracker [" +24 jobTrackAddr + "]. Retrying...", ex);25 try {26 Thread.sleep(5000);27 } catch (InterruptedException ie) {28 }29 }30 }31 }32 } finally {33 // If denied we‘ll close via shutdown below. We should close34 // here even if shuttingDown as shuttingDown can be set even35 // if shutdown is not called.36 if (!denied) {37 close();38 }39 }40 if (shuttingDown) { return; }41 if (denied) { break; }42 LOG.warn("Reinitializing local state");43 initialize();44 }45 if (denied) {46 shutdown(); //关闭该TaskTracker47 }48 } catch (IOException iex) {49 LOG.error("Got fatal exception while reinitializing TaskTracker: " +50 StringUtils.stringifyException(iex));51 return;52 } catch (InterruptedException i) {53 LOG.error("Got interrupted while reinitializing TaskTracker: " +54 i.getMessage());55 return;56 }57 }
该方法主体是两层while循环。如果与JobTracker的连接一旦断开,则反复尝试运行内层循环,直到与JobTracker成功连接为止;然后会调用内层循环中的offerService()方法与JobTracker进行通信。
1 State offerService() throws Exception { 2 long lastHeartbeat = System.currentTimeMillis(); //上次心跳时间 3 4 while (running && !shuttingDown) { 5 try { 6 long now = System.currentTimeMillis(); 7 8 // accelerate to account for multiple finished tasks up-front 9 //计算上次心跳到现在的时间隔 10 //通过完成的任务数动态调整时间间隔 11 long remaining = 12 (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; 13 14 if (remaining <= 0) { //达到心跳时间间隔 15 finishedCount.set(0); 16 } 17 18 while (remaining > 0) { //未达到心跳时间间隔 19 // sleeps for the wait time or 20 // until there are *enough* empty slots to schedule tasks 21 synchronized (finishedCount) { 22 finishedCount.wait(remaining); //wait相差的时间(remaining) 23 24 // Recompute 25 now = System.currentTimeMillis(); 26 remaining = 27 (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; 28 29 if (remaining <= 0) { 30 // Reset count 31 finishedCount.set(0); 32 break; 33 } 34 } 35 } 36 37 // If the TaskTracker is just starting up: 38 // 1. Verify the versions matches with the JobTracker 39 // 2. Get the system directory & filesystem 40 if(justInited) { 41 String jtBuildVersion = jobClient.getBuildVersion(); 42 String jtVersion = jobClient.getVIVersion(); 43 if (!isPermittedVersion(jtBuildVersion, jtVersion)) { 44 String msg = "Shutting down. Incompatible version or build version." + 45 "TaskTracker version ‘" + VersionInfo.getVersion() + 46 "‘ and build ‘" + VersionInfo.getBuildVersion() + 47 "‘ and JobTracker version ‘" + jtVersion + 48 "‘ and build ‘" + jtBuildVersion + 49 " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY + 50 " is " + (relaxedVersionCheck ? "enabled" : "not enabled") + 51 " and " + CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY + 52 " is " + (skipVersionCheck ? "enabled" : "not enabled"); 53 LOG.fatal(msg); 54 try { 55 jobClient.reportTaskTrackerError(taskTrackerName, null, msg); 56 } catch(Exception e ) { 57 LOG.info("Problem reporting to jobtracker: " + e); 58 } 59 return State.DENIED; 60 } 61 62 String dir = jobClient.getSystemDir(); 63 while (dir == null) { 64 LOG.info("Failed to get system directory..."); 65 66 // Re-try 67 try { 68 // Sleep interval: 1000 ms - 5000 ms 69 int sleepInterval = 1000 + r.nextInt(4000); 70 Thread.sleep(sleepInterval); 71 } catch (InterruptedException ie) 72 {} 73 dir = jobClient.getSystemDir(); 74 } 75 systemDirectory = new Path(dir); 76 systemFS = systemDirectory.getFileSystem(fConf); 77 } 78 79 now = System.currentTimeMillis(); 80 if (now > (lastCheckDirsTime + diskHealthCheckInterval)) { //是否达到磁盘检查间隔时间 81 localStorage.checkDirs(); 82 lastCheckDirsTime = now; 83 int numFailures = localStorage.numFailures(); 84 // Re-init the task tracker if there were any new failures 85 if (numFailures > lastNumFailures) { 86 lastNumFailures = numFailures; 87 return State.STALE; 88 } 89 } 90 91 // Send the heartbeat and process the jobtracker‘s directives 92 //发送Heartbeat到JobTracker,得到response 93 HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); 94 95 // Note the time when the heartbeat returned, use this to decide when to send the 96 // next heartbeat 97 lastHeartbeat = System.currentTimeMillis(); 98 99 // Check if the map-event list needs purging100 Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();101 if (jobs.size() > 0) {102 synchronized (this) {103 // purge the local map events list104 for (JobID job : jobs) {105 RunningJob rjob;106 synchronized (runningJobs) {107 rjob = runningJobs.get(job); 108 if (rjob != null) {109 synchronized (rjob) {110 FetchStatus f = rjob.getFetchStatus();111 if (f != null) {112 f.reset();113 }114 }115 }116 }117 }118 119 // Mark the reducers in shuffle for rollback120 synchronized (shouldReset) {121 for (Map.Entry<TaskAttemptID, TaskInProgress> entry 122 : runningTasks.entrySet()) {123 if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {124 this.shouldReset.add(entry.getKey());125 }126 }127 }128 }129 }130 131 //调用HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令,即一个TaskTrackerAction数组132 TaskTrackerAction[] actions = heartbeatResponse.getActions();133 if(LOG.isDebugEnabled()) {134 LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 135 heartbeatResponse.getResponseId() + " and " + 136 ((actions != null) ? actions.length : 0) + " actions");137 }138 if (reinitTaskTracker(actions)) {139 return State.STALE;140 }141 142 // resetting heartbeat interval from the response.143 heartbeatInterval = heartbeatResponse.getHeartbeatInterval();144 justStarted = false;145 justInited = false;146 if (actions != null){ 147 //遍历这个数组148 for(TaskTrackerAction action: actions) {149 if (action instanceof LaunchTaskAction) { //如果是LaunchTaskAction,则调用调用addToTaskQueue加入到待执行队列150 addToTaskQueue((LaunchTaskAction)action);151 } else if (action instanceof CommitTaskAction) { //如果是CommitTaskAction,则添加到需要提交的队列中152 CommitTaskAction commitAction = (CommitTaskAction)action;153 if (!commitResponses.contains(commitAction.getTaskID())) {154 LOG.info("Received commit task action for " + 155 commitAction.getTaskID());156 commitResponses.add(commitAction.getTaskID());157 }158 } else { //如果是KillJobAction或者KillTaskAction,则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理159 addActionToCleanup(action);160 }161 }162 }163 //杀死一定时间没没有汇报进度的task 164 markUnresponsiveTasks();165 166 //当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间167 killOverflowingTasks();168 169 //we‘ve cleaned up, resume normal operation170 if (!acceptNewTasks && isIdle()) {171 acceptNewTasks=true;172 }173 //The check below may not be required every iteration but we are 174 //erring on the side of caution here. We have seen many cases where175 //the call to jetty‘s getLocalPort() returns different values at 176 //different times. Being a real paranoid here.177 checkJettyPort(server.getPort());178 } catch (InterruptedException ie) {179 LOG.info("Interrupted. Closing down.");180 return State.INTERRUPTED;181 } catch (DiskErrorException de) {182 String msg = "Exiting task tracker for disk error:\n" +183 StringUtils.stringifyException(de);184 LOG.error(msg);185 synchronized (this) {186 jobClient.reportTaskTrackerError(taskTrackerName, 187 "DiskErrorException", msg);188 }189 // If we caught a DEE here we have no good dirs, therefore shutdown.190 return State.DENIED;191 } catch (RemoteException re) {192 String reClass = re.getClassName();193 if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {194 LOG.info("Tasktracker disallowed by JobTracker.");195 return State.DENIED;196 }197 } catch (Exception except) {198 String msg = "Caught exception: " + 199 StringUtils.stringifyException(except);200 LOG.error(msg);201 }202 }203 204 return State.NORMAL;205 }
阿发书法
hadoop运行原理之Job运行(三) TaskTracker的启动及初始化