首页 > 代码库 > Yarn任务提交流程(源码分析)

Yarn任务提交流程(源码分析)

Based on Hadoop 2.7.1

JobSubmitter

  • addMRFrameworkToDistributedCache(Configuration conf) : mapreduce.application.framework.path, 用于指定其他framework的hdfs 路径配置,默认yarn的可以不管
  • Token相关的方法:读取认证信息(支持二进制、json),并将其添加至相应的fileSystem中,以便以同样权限访问文件系统
  • copyAndConfigureFiles(Job job, Path jobSubmitDir): 上传配置、jar、files、libjars、archives等
  • submitJobInternal: 真正的提交任务接口

核心代码提交链

  1. JobSubmitter -> 
  2. ClientProtocol(YARNRunner) -> 
  3. ResourceMgrDelegate -> 
  4. YarnClient(YarnClientImpl).submitApplication( ApplicationSubmissionContext appContext) -> 
  5. 【RM】ApplicationClientProtocol(ClientRMService).submitApplication( SubmitApplicationRequest request) -> // fill ASC with dft values
  6. RMAppManager.submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) -> 

  • ApplicationSubmissionContext 提交上下文,包含application各种元信息
  • SubmitApplicationRequest 提交Request对象
// Dispatcher is not yet started at this time, so these START events// enqueued should be guaranteed to be first processed when dispatcher// gets started.this.rmContext.getDispatcher().getEventHandler()    .handle(new RMAppEvent(applicationId, RMAppEventType.START));

START -> APP_NEW_SAVED 

 stateMachineFactory.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,        RMAppEventType.START, new RMAppNewlySavingTransition())        //...private static final class RMAppNewlySavingTransition extends RMAppTransition {    @Override    public void transition(RMAppImpl app, RMAppEvent event) {      // If recovery is enabled then store the application information in a      // non-blocking call so make sure that RM has stored the information      // needed to restart the AM after RM restart without further client      // communication      LOG.info("Storing application with id " + app.applicationId);      app.rmContext.getStateStore().storeNewApplication(app);    }  }    public synchronized void storeNewApplication(RMApp app) {    ApplicationSubmissionContext context = app                                            .getApplicationSubmissionContext();    assert context instanceof ApplicationSubmissionContextPBImpl;    ApplicationStateData appState =        ApplicationStateData.newInstance(            app.getSubmitTime(), app.getStartTime(), context, app.getUser());    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));  }       private static final class AddApplicationToSchedulerTransition extends      RMAppTransition {    @Override    public void transition(RMAppImpl app, RMAppEvent event) {      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,        app.submissionContext.getQueue(), app.user,        app.submissionContext.getReservationID()));    }  }

AppAddedSchedulerEvent 会由配置的Scheduler来handle。

P.S. 看 event 部分代码的方法,

  1. 找出状态,比如 APP_NEW_SAVED,
  2. 找出handle这个状态的事件类
  3. 找出处理这个事件的具体逻辑 (这里可能逻辑最复杂)
  4. 找下一个事件
  5. 重复。。

ApplicationMaster

START -> APPNEWSAVED -> APP_ACCEPTED ....

后面是一些attempt的启动等各种事件的反复。直接跳到 AM 部分。

ResourceManager内有 createApplicationMasterLauncher() 和 createApplicationMasterService() 

private void launch() throws IOException, YarnException {    connect();    ContainerId masterContainerID = masterContainer.getId();    ApplicationSubmissionContext applicationContext =      application.getSubmissionContext();    LOG.info("Setting up container " + masterContainer        + " for AM " + application.getAppAttemptId());      ContainerLaunchContext launchContext =        createAMContainerLaunchContext(applicationContext, masterContainerID);    StartContainerRequest scRequest =        StartContainerRequest.newInstance(launchContext,          masterContainer.getContainerToken());    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();    list.add(scRequest);    StartContainersRequest allRequests =        StartContainersRequest.newInstance(list);    StartContainersResponse response =        containerMgrProxy.startContainers(allRequests);    if (response.getFailedRequests() != null        && response.getFailedRequests().containsKey(masterContainerID)) {      Throwable t =          response.getFailedRequests().get(masterContainerID).deSerialize();      parseAndThrowException(t);    } else {      LOG.info("Done launching container " + masterContainer + " for AM "          + application.getAppAttemptId());    }  }       private ContainerLaunchContext createAMContainerLaunchContext(      ApplicationSubmissionContext applicationMasterContext,      ContainerId containerID) throws IOException {    // Construct the actual Container    ContainerLaunchContext container =         applicationMasterContext.getAMContainerSpec();    LOG.info("Command to launch container "        + containerID        + " : "        + StringUtils.arrayToString(container.getCommands().toArray(            new String[0])));        // Finalize the container    setupTokens(container, containerID);        return container;  }  

注意以上其中两行:

  • ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID) 创建 AM 请求
  • StartContainersResponse response = containerMgrProxy.startContainers(allRequests); 启动AM的容器并在容器内启动AM。
  @Override  public ContainerLaunchContext getAMContainerSpec() {    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;    if (this.amContainer != null) {      return amContainer;    } // Else via proto    if (!p.hasAmContainerSpec()) {      return null;    }    amContainer = convertFromProtoFormat(p.getAmContainerSpec());    return amContainer;  }    public class ApplicationSubmissionContextPBImpl extends ApplicationSubmissionContext {  ApplicationSubmissionContextProto proto =       ApplicationSubmissionContextProto.getDefaultInstance();  ApplicationSubmissionContextProto.Builder builder = null;  boolean viaProto = false;    private ApplicationId applicationId = null;  private Priority priority = null;  private ContainerLaunchContext amContainer = null;  private Resource resource = null;  private Set<String> applicationTags = null;  private ResourceRequest amResourceRequest = null;  private LogAggregationContext logAggregationContext = null;  private ReservationId reservationId = null;  /// ...  }

接下来便是启动后的AppMaster 创建job,并通过AMRMClient向ResourceManager申请资源等。

Yarn任务提交流程(源码分析)