首页 > 代码库 > YARN的 AM与RM通信,申请资源分配过程

YARN的 AM与RM通信,申请资源分配过程

AppMaster向RM请求资源

MRAppMaster :serviceinit  // service to allocate containers from RM (if non-uber) or to fake it (uber)      containerAllocator = createContainerAllocator(null, context);      addIfService(containerAllocator);      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);              protected ContainerAllocator createContainerAllocator(      final ClientService clientService, final AppContext context) {    return new ContainerAllocatorRouter(clientService, context);  //  }        private final class ContainerAllocatorRouter extends AbstractService      implements ContainerAllocator, RMHeartbeatHandler {    private final ClientService clientService;    private final AppContext context;    private ContainerAllocator containerAllocator;   .....    @Override    protected void serviceStart() throws Exception {      if (job.isUber()) {        this.containerAllocator = new LocalContainerAllocator(            this.clientService, this.context, nmHost, nmPort, nmHttpPort            , containerID);      } else {        this.containerAllocator = new RMContainerAllocator(              ///            this.clientService, this.context);      }      ((Service)this.containerAllocator).init(getConfig());      ((Service)this.containerAllocator).start();      super.serviceStart();                 org.apache.hadoop.mapreduce.v2.app.rm; RMContainerAllocator类有该方法             protected synchronized void heartbeat() throws Exception {    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");    List<Container> allocatedContainers = getResources();  //发远程RM发送心跳信息,注意心跳里可能没有新的资源请求信息    //只是告诉RM自己还活着,或者只是从RM取得分配资源    if (allocatedContainers.size() > 0) {      scheduledRequests.assign(allocatedContainers); //获得的container具体分配到任务task (应该是重排序)    }        资源请求包括的字段:    优先级,期望在的host,内存大小等 (默认三路复制,可能会有7个资源请求,3个local,3个 rack,1个随机)    }        RMContainerAllocator父类RMCommunicator的方法  protected void startAllocatorThread() {    allocatorThread = new Thread(new Runnable() {      @Override      public void run() {        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {          try {            Thread.sleep(rmPollInterval);  //默认每秒            try {              heartbeat();   //发送心跳              ...        private List<Container> getResources() throws Exception {    int headRoom = getAvailableResources() != null        ? getAvailableResources().getMemory() : 0;//first time it would be null    AllocateResponse response;    /*     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS     * milliseconds before aborting. During this interval, AM will still try     * to contact the RM.     */    try {      response = makeRemoteRequest();  //关键            makeRemoteRequest方法为其父类RMContainerRequestor定义的方法              protected AllocateResponse makeRemoteRequest() throws IOException {    ResourceBlacklistRequest blacklistRequest =        ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),            new ArrayList<String>(blacklistRemovals));    AllocateRequest allocateRequest =   //新建个资源请求        AllocateRequest.newInstance(lastResponseID,          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),  //这个ask是集合类,存ResourceRequest实例,          //只有个新建方法,在哪赋值的呢          new ArrayList<ContainerId>(release), blacklistRequest);    AllocateResponse allocateResponse;    try {      allocateResponse = scheduler.allocate(allocateRequest);   //关键,分配资源,此处的scheduler 并非是调度器      //而是ApplicationMasterProtocol,他会终调用到调度器            scheduler为其父类RMCommunicator新建       protected ApplicationMasterProtocol scheduler;       ...         protected void serviceStart() throws Exception {    scheduler= createSchedulerProxy();    ..          protected ApplicationMasterProtocol createSchedulerProxy() {    final Configuration conf = getConfig();    try {      return ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);  //ApplicationMasterProtocol协议是关键      //通过他远程调用ApplicationMasterService中的方法    } catch (IOException e) {      throw new YarnRuntimeException(e);    }  }  
//后面追踪ask的的赋值最终是在哪里调用
//ask的赋值方法,最后是由  addContainerReq方法,该方法在RMContainerAllocator调用    private void addResourceRequestToAsk(ResourceRequest remoteRequest) {    // because objects inside the resource map can be deleted ask can end up     // containing an object that matches new resource object but with different    // numContainers. So exisintg values must be replaced explicitly    if(ask.contains(remoteRequest)) {      ask.remove(remoteRequest);    }    ask.add(remoteRequest);      }      protected void addContainerReq(ContainerRequest req) {    // Create resource requests    for (String host : req.hosts) {      // Data-local      if (!isNodeBlacklisted(host)) {        addResourceRequest(req.priority, host, req.capability);      }          }    // Nothing Rack-local for now    for (String rack : req.racks) {      addResourceRequest(req.priority, rack, req.capability);    }    // Off-switch    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);  }  RMContainerAllocator内void addMap(ContainerRequestEvent event) {  //addMap方法      ContainerRequest request = null;            if (event.getEarlierAttemptFailed()) {        earlierFailedMaps.add(event.getAttemptID());        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);        LOG.info("Added "+event.getAttemptID()+" to list of failed maps");      } else {        for (String host : event.getHosts()) {          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);          if (list == null) {            list = new LinkedList<TaskAttemptId>();            mapsHostMapping.put(host, list);          }          list.add(event.getAttemptID());          if (LOG.isDebugEnabled()) {            LOG.debug("Added attempt req to host " + host);          }       }       for (String rack: event.getRacks()) {         LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);         if (list == null) {           list = new LinkedList<TaskAttemptId>();           mapsRackMapping.put(rack, list);         }         list.add(event.getAttemptID());         if (LOG.isDebugEnabled()) {            LOG.debug("Added attempt req to rack " + rack);         }       }       request = new ContainerRequest(event, PRIORITY_MAP);      }      maps.put(event.getAttemptID(), request);      addContainerReq(request);           //调用                  //addMap在该方法内被调用    protected synchronized void handleEvent(ContainerAllocatorEvent event) {    recalculateReduceSchedule = true;    ..................        scheduledRequests.addMap(reqEvent);//maps are immediately scheduled                                          protected void serviceStart() throws Exception {    this.eventHandlingThread = new Thread() {      @SuppressWarnings("unchecked")      @Override      public void run() {        ContainerAllocatorEvent event;        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {          try {            event = RMContainerAllocator.this.eventQueue.take();  //取出事件          } catch (InterruptedException e) {            if (!stopped.get()) {              LOG.error("Returning, interrupted : " + e);            }            return;          }          try {            handleEvent(event);   //调用                       // 事件加入在MRAppMaster内,加入的事件在上面的方法被处理,该方法在哪里调用了呢?              public void handle(ContainerAllocatorEvent event) {      this.containerAllocator.handle(event);    }   

RM端接受AppMaster心跳请求

 //总结,applicationmaster最终通过ApplicationMasterProtocol#allocate向RM汇报资源需求,RM端的ApplicationMasterService提供服务,并最终调用调度器的allocate  //将新的资源需求写入内存结构,并返回已经分配的资源  public class ApplicationMasterService extends AbstractService implements    ApplicationMasterProtocol {  public AllocateResponse allocate(AllocateRequest request)      throws YarnException, IOException {          ..             // Allow only one thread in AM to do heartbeat at a time.    synchronized (lastResponse) {      // Send the status update to the appAttempt.      this.rmContext.getDispatcher().getEventHandler().handle(          new RMAppAttemptStatusupdateEvent(appAttemptId, request              .getProgress()));      List<ResourceRequest> ask = request.getAskList();  //ask,release为封装的请求      List<ContainerId> release = request.getReleaseList(                           // Send new requests to appAttempt.      Allocation allocation =          this.rScheduler.allocate(appAttemptId, ask, release,               blacklistAdditions, blacklistRemovals);  //调有RM端的调度器 rScheduler              ..                allocateResponse.setUpdatedNodes(updatedNodeReports);      }//封装一个response返回      allocateResponse.setAllocatedContainers(allocation.getContainers());      allocateResponse.setCompletedContainersStatuses(appAttempt          .pullJustFinishedContainers());      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);      allocateResponse.setAvailableResources(allocation.getResourceLimit());            allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());         // add preemption to the allocateResponse message (if any)      allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));      // Adding NMTokens for allocated containers.      if (!allocation.getContainers().isEmpty()) {        allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()            .createAndGetNMTokens(app.getUser(), appAttemptId,//FIFO Scheduler的allocate方法...    // Update application requests        application.updateResourceRequests(ask);  //将此次资源请求写入application的请求内存结构,等待nm发送心跳分配完后,写入application的分配内存结构,        //最终要更新到这样的一个内存结构 final Map<Priority, Map<String, ResourceRequest>> requests =    // new HashMap<Priority, Map<String, ResourceRequest>>();...      return new Allocation(          application.pullNewlyAllocatedContainers(),    //application内部的集合类,从分配好的内存结构里取          application.getHeadroom());          //application为FiCaSchedulerApp类  synchronized public List<Container> pullNewlyAllocatedContainers() {    List<Container> returnContainerList = new ArrayList<Container>(        newlyAllocatedContainers.size());    for (RMContainer rmContainer : newlyAllocatedContainers) {   //只是从newlyAllocatedContainers里面取,newlyAllocatedContainers的赋值是NM发送心跳后调用assignContainer后赋值的      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),          RMContainerEventType.ACQUIRED));      returnContainerList.add(rmContainer.getContainer());    }    newlyAllocatedContainers.clear();    return returnContainerList;  }  synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,      Priority priority, ResourceRequest request,       Container container) {   ....    // Add it to allContainers list.    newlyAllocatedContainers.add(rmContainer);  //给其赋值                //FIFO scheduler类调用上面方法,该方法是NM发送心跳最终调用的方法     private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,       Priority priority, int assignableContainers,       ResourceRequest request, NodeType type) {  ....        }        // Create the container        Container container =            BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()              .getHttpAddress(), capability, priority, containerToken);                // Allocate!                // Inform the application        RMContainer rmContainer =            application.allocate(type, node, priority, request, container);
//总结以上看到的,也就是appmaster向RM发送请求,是从当前内存结构返回资源请求,这个过程是异步的,当nm发送心跳,会根据appmaster的资源请求分配资源//写到内存结构,等appmaster来取 (发送的资源请求,要先保存下来,资源请求的内存结构里,保存在application FiCaSchedulerApp里application.showRequests()

YARN的 AM与RM通信,申请资源分配过程