首页 > 代码库 > hadoop运行原理之Job运行(三) TaskTracker的启动及初始化

hadoop运行原理之Job运行(三) TaskTracker的启动及初始化

  与JobTracker一样,TaskTracker也有main()方法,然后以线程的方式启动(继承了Runnable接口)。main()方法中主要包含两步:一是创建一个TaskTracker对象;二是启动TaskTracker线程。

 

 1 public static void main(String argv[]) throws Exception { 2     ... 3     try { 4       JobConf conf=new JobConf(); 5       ... 6       TaskTracker tt = new TaskTracker(conf); 7       ... 8       tt.run(); 9     } catch (Throwable e) {10      ...11     }12   }

 

  一、TaskTracker tt = new TaskTracker(conf);  该方法会首先获取Map和Reduce的slot数量、磁盘检查时间间隔和JobTracker的地址;然后构造一个HttpServer(Jetty服务器)对象,这样可以通过web查看该tasktracker的运行情况及所有task;接着构造TaskController对象,用来控制任务的初始化、终结和清理工作;再接着调用initialize()方法来初始化一些变量并构造TaskTracker对象;最后启动HttpServe服务器。

  着重说下initialize()方法。这个方法真正完成了构造TaskTracker对象的工作,并初始化了一些变量。用这个单独的方法来构造TaskTracker,可以使我们多次调用它。首先initialize()方法会清理一些历史文件(第一次启动TaskTracker不会有历史文件),并新建一些目录;接着初始化一些变量;构造JvmManager对象用来管理JVM;初始化并启动taskReportServer,Task通过RPC向该Server汇报进度;初始化distributedCacheManager对象用来管理分布式缓存;初始化InterTrackerProtocol对象,TaskTracker由此与JobTracker通信;构造两个TaskLauncher线程:mapLauncher和reduceLauncher并启动,这个线程的run函数就是不断监测taskToLaunch队列中是否有新的 TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的 startNewTask(TaskInProgress tip)来启动一个task。

  二、tt.run();  该方法维护一个与JobTracker的通信连接,以周期性地向JobTracker发送心跳并领取新任务。

 1 public void run() { 2     try { 3       getUserLogManager().start(); 4       startCleanupThreads(); 5       boolean denied = false; 6       while (running && !shuttingDown) { 7         boolean staleState = false; 8         try { 9           // This while-loop attempts reconnects if we get network errors10           while (running && !staleState && !shuttingDown && !denied) {    //该循环条件表示与JobTracker连接成功;11               //否则的话会在外层循环中一直尝试执行该内层循环12             try {13               //如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。14               //offerService()中的循环会每隔一段时间与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。15               State osState = offerService();16               if (osState == State.STALE) {17                 staleState = true;18               } else if (osState == State.DENIED) {19                 denied = true;20               }21             } catch (Exception ex) {22               if (!shuttingDown) {23                 LOG.info("Lost connection to JobTracker [" +24                          jobTrackAddr + "].  Retrying...", ex);25                 try {26                   Thread.sleep(5000);27                 } catch (InterruptedException ie) {28                 }29               }30             }31           }32         } finally {33           // If denied we‘ll close via shutdown below. We should close34           // here even if shuttingDown as shuttingDown can be set even35           // if shutdown is not called.36           if (!denied) {37             close();38           }39         }40         if (shuttingDown) { return; }41         if (denied) { break; }42         LOG.warn("Reinitializing local state");43         initialize();44       }45       if (denied) {46         shutdown();    //关闭该TaskTracker47       }48     } catch (IOException iex) {49       LOG.error("Got fatal exception while reinitializing TaskTracker: " +50                 StringUtils.stringifyException(iex));51       return;52     } catch (InterruptedException i) {53       LOG.error("Got interrupted while reinitializing TaskTracker: " +54           i.getMessage());55       return;56     }57   }

  该方法主体是两层while循环。如果与JobTracker的连接一旦断开,则反复尝试运行内层循环,直到与JobTracker成功连接为止;然后会调用内层循环中的offerService()方法与JobTracker进行通信。

  1 State offerService() throws Exception {  2     long lastHeartbeat = System.currentTimeMillis();    //上次心跳时间  3   4     while (running && !shuttingDown) {  5       try {  6         long now = System.currentTimeMillis();  7           8         // accelerate to account for multiple finished tasks up-front  9         //计算上次心跳到现在的时间隔 10         //通过完成的任务数动态调整时间间隔 11         long remaining =  12           (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; 13          14         if (remaining <= 0) {    //达到心跳时间间隔 15           finishedCount.set(0); 16         } 17            18         while (remaining > 0) {    //未达到心跳时间间隔 19           // sleeps for the wait time or  20           // until there are *enough* empty slots to schedule tasks 21           synchronized (finishedCount) { 22             finishedCount.wait(remaining);    //wait相差的时间(remaining) 23              24             // Recompute 25             now = System.currentTimeMillis(); 26             remaining =  27               (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; 28              29             if (remaining <= 0) { 30               // Reset count  31               finishedCount.set(0); 32               break; 33             } 34           } 35         } 36  37         // If the TaskTracker is just starting up: 38         // 1. Verify the versions matches with the JobTracker 39         // 2. Get the system directory & filesystem 40         if(justInited) { 41           String jtBuildVersion = jobClient.getBuildVersion(); 42           String jtVersion = jobClient.getVIVersion(); 43           if (!isPermittedVersion(jtBuildVersion, jtVersion)) { 44             String msg = "Shutting down. Incompatible version or build version." + 45               "TaskTracker version ‘" + VersionInfo.getVersion() + 46               "‘ and build ‘" + VersionInfo.getBuildVersion() + 47               "‘ and JobTracker version ‘" + jtVersion + 48               "‘ and build ‘" + jtBuildVersion + 49               " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY + 50               " is " + (relaxedVersionCheck ? "enabled" : "not enabled") + 51               " and " + CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY + 52               " is " + (skipVersionCheck ? "enabled" : "not enabled"); 53             LOG.fatal(msg); 54             try { 55               jobClient.reportTaskTrackerError(taskTrackerName, null, msg); 56             } catch(Exception e ) { 57               LOG.info("Problem reporting to jobtracker: " + e); 58             } 59             return State.DENIED; 60           } 61            62           String dir = jobClient.getSystemDir(); 63           while (dir == null) { 64             LOG.info("Failed to get system directory..."); 65              66             // Re-try 67             try { 68               // Sleep interval: 1000 ms - 5000 ms 69               int sleepInterval = 1000 + r.nextInt(4000); 70               Thread.sleep(sleepInterval); 71             } catch (InterruptedException ie)  72             {} 73             dir = jobClient.getSystemDir(); 74           } 75           systemDirectory = new Path(dir); 76           systemFS = systemDirectory.getFileSystem(fConf); 77         } 78  79         now = System.currentTimeMillis(); 80         if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {    //是否达到磁盘检查间隔时间 81           localStorage.checkDirs(); 82           lastCheckDirsTime = now; 83           int numFailures = localStorage.numFailures(); 84           // Re-init the task tracker if there were any new failures 85           if (numFailures > lastNumFailures) { 86             lastNumFailures = numFailures; 87             return State.STALE; 88           } 89         } 90  91         // Send the heartbeat and process the jobtracker‘s directives 92         //发送Heartbeat到JobTracker,得到response  93         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); 94  95         // Note the time when the heartbeat returned, use this to decide when to send the 96         // next heartbeat    97         lastHeartbeat = System.currentTimeMillis(); 98          99         // Check if the map-event list needs purging100         Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();101         if (jobs.size() > 0) {102           synchronized (this) {103             // purge the local map events list104             for (JobID job : jobs) {105               RunningJob rjob;106               synchronized (runningJobs) {107                 rjob = runningJobs.get(job);          108                 if (rjob != null) {109                   synchronized (rjob) {110                     FetchStatus f = rjob.getFetchStatus();111                     if (f != null) {112                       f.reset();113                     }114                   }115                 }116               }117             }118 119             // Mark the reducers in shuffle for rollback120             synchronized (shouldReset) {121               for (Map.Entry<TaskAttemptID, TaskInProgress> entry 122                    : runningTasks.entrySet()) {123                 if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {124                   this.shouldReset.add(entry.getKey());125                 }126               }127             }128           }129         }130         131         //调用HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令,即一个TaskTrackerAction数组132         TaskTrackerAction[] actions = heartbeatResponse.getActions();133         if(LOG.isDebugEnabled()) {134           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 135                     heartbeatResponse.getResponseId() + " and " + 136                     ((actions != null) ? actions.length : 0) + " actions");137         }138         if (reinitTaskTracker(actions)) {139           return State.STALE;140         }141             142         // resetting heartbeat interval from the response.143         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();144         justStarted = false;145         justInited = false;146         if (actions != null){ 147         //遍历这个数组148           for(TaskTrackerAction action: actions) {149             if (action instanceof LaunchTaskAction) {    //如果是LaunchTaskAction,则调用调用addToTaskQueue加入到待执行队列150               addToTaskQueue((LaunchTaskAction)action);151             } else if (action instanceof CommitTaskAction) {    //如果是CommitTaskAction,则添加到需要提交的队列中152               CommitTaskAction commitAction = (CommitTaskAction)action;153               if (!commitResponses.contains(commitAction.getTaskID())) {154                 LOG.info("Received commit task action for " + 155                           commitAction.getTaskID());156                 commitResponses.add(commitAction.getTaskID());157               }158             } else {    //如果是KillJobAction或者KillTaskAction,则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理159               addActionToCleanup(action);160             }161           }162         }163         //杀死一定时间没没有汇报进度的task 164         markUnresponsiveTasks();165         166         //当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间167         killOverflowingTasks();168             169         //we‘ve cleaned up, resume normal operation170         if (!acceptNewTasks && isIdle()) {171           acceptNewTasks=true;172         }173         //The check below may not be required every iteration but we are 174         //erring on the side of caution here. We have seen many cases where175         //the call to jetty‘s getLocalPort() returns different values at 176         //different times. Being a real paranoid here.177         checkJettyPort(server.getPort());178       } catch (InterruptedException ie) {179         LOG.info("Interrupted. Closing down.");180         return State.INTERRUPTED;181       } catch (DiskErrorException de) {182         String msg = "Exiting task tracker for disk error:\n" +183           StringUtils.stringifyException(de);184         LOG.error(msg);185         synchronized (this) {186           jobClient.reportTaskTrackerError(taskTrackerName, 187                                            "DiskErrorException", msg);188         }189         // If we caught a DEE here we have no good dirs, therefore shutdown.190         return State.DENIED;191       } catch (RemoteException re) {192         String reClass = re.getClassName();193         if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {194           LOG.info("Tasktracker disallowed by JobTracker.");195           return State.DENIED;196         }197       } catch (Exception except) {198         String msg = "Caught exception: " + 199           StringUtils.stringifyException(except);200         LOG.error(msg);201       }202     }203 204     return State.NORMAL;205   }

阿发书法

 

hadoop运行原理之Job运行(三) TaskTracker的启动及初始化