首页 > 代码库 > YARN NM与RM通信

YARN NM与RM通信

NM端发送心跳

//NM发送心跳,增加一个NODE_UPDATE事件,简单返回一个respone,异步驱动型,事件再驱动assignContainers,从资源请求结构里取出需求分配资源
//AsyncDispatcher原理
//一个event队列,一个eventtype.class 到处理器对应关系(仅仅是一个class对应一个处理器,class是个Enum可能会有很多种值,具体逻辑在处理器内部)
//从队列取出event,再从event取到type类型,再找到处理器,处理器调用handler(event)方法

//nodeHeartBeat增加一个RMStatusEvent事件(事件类型是RMNodeType.Status_UPDATE)
RM register到他对应的处理器
该处理器 最终调用RMNodeImpl
RMNodeImpl会增加SchedulerEvent
//
NodeManager类会调以下这个类
  
  NodeStatusUpdaterImpl类
  protected void startStatusUpdater() {

    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        int lastHeartBeatID = 0;
        while (!isStopped) {
 ....
            response = resourceTracker.nodeHeartbeat(request); //发送心跳到ResourceTrackerService
            ..
            
            
会rpc远程调用   ResourceTrackerService类里
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * Here is the node heartbeat sequence...
     * 1. Check if it's a registered node
     * 2. Check if it's a valid (i.e. not excluded) node 
     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat 
     * 4. Send healthStatus to RMNode
     */
....

             // 4. Send status to RMNode, saving the latest response.
    this.rmContext.getDispatcher().getEventHandler().handle(
        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),  //RMNodeStatusEvent是RMNodeEvent的子类,构造器指定RMNodeEventType.STATUS_UPDATE 类型 事件
        //在RM会通过register给asyncDispatcher指定类型对应的处理器,可查看后面代码,对应到NodeEventDispatcher处理器,该类内部会用RMNodeImpl,该类又会引起
        //scheduler相关事件
            remoteNodeStatus.getContainersStatuses(),  // 包含各个container状态,是一个list
            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); //新建个事件,把他放入AsyncDispatcher里的队列,最后应该会激起ResourceScheduler来处理
            
   rmContext是在ResourceManager里构建,这里重点知道Dispatcher用的是哪个
   ..
    this.rmContext =
        new RMContextImpl(this.rmDispatcher, rmStore,
          this.containerAllocationExpirer, amLivelinessMonitor,
          amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
          this.containerTokenSecretManager, this.nmTokenSecretManager,
          this.clientToAMSecretManager);     
          
      。。
      
        protected Dispatcher createDispatcher() {
    return new AsyncDispatcher();  //rmDispatcher 通过该方法构建,org.apache.hadoop.yarn.event.AsyncDispatcher
    //有个事件队列,和事件类型到事件处理器的map关系,异步线程根据event内部取出事件类型(包含事件是哪种事件类型是在其内部设置的)
    //,再找到哪个处理器,具体处理器内部处理逻辑根据不同类型enum特定值区分
    //类型.class与处理器对应关系,通过register
    
      
  }
   //同时RM里register注册各个,事件类型对应的事件处理器,  在AsyncDispatcher内的异步线程里再根据这个map对应关系知道用哪个事件处理器
   
       this.rmDispatcher.register(SchedulerEventType.class,  //enum类也有NODE-UPDATE的值
        this.schedulerDispatcher);
        
   rmDispatcher   
       // Register event handler for RmAppEvents
    this.rmDispatcher.register(RMAppEventType.class,
        new ApplicationEventDispatcher(this.rmContext));

    // Register event handler for RmAppAttemptEvents
    this.rmDispatcher.register(RMAppAttemptEventType.class,
        new ApplicationAttemptEventDispatcher(this.rmContext));

    // Register event handler for RmNodes
    this.rmDispatcher.register(RMNodeEventType.class,  //枚举值,有NODE-UPDATE,NodeEventDispatcher里的处理逻辑会根据RMNodeEventType里的值做
    //分别的处理,类似case when .... 
        new NodeEventDispatcher(this.rmContext));     //注册事件处理器
 
 
 NodeEventDispatcher类,在RM内部
     public void handle(RMNodeEvent event) {  //事件处理方法
      NodeId nodeId = event.getNodeId();
      RMNode node = this.rmContext.getRMNodes().get(nodeId);
      if (node != null) {
        try {
          ((EventHandler<RMNodeEvent>) node).handle(event)    ;  //通过RMNode强制转换成处理器,对RMNodeImpl同时也继承EventHandler,其内部
          //会调用scheduler相关
        } catch (Throwable t) {
          LOG.error("Error in handling event type " + event.getType()
              + " for node " + nodeId, t);
        }
        
   
   
   //总结
   NodeManager发送心跳到RM端的ResourceManagerService,调用nodeHeartbeat方法,发送STATUS_UPDATE 类型的事件给到RMNode,RMNodeImpl类内
   .addTransition(NodeState.RUNNING, 
         EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
         RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
         
         
   StatusUpdateWhenHealthyTransition类 transition方法
    rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器
            
    //下面分析调度        
    

RM端接受心跳后调度器分配

接上面分析
          
   StatusUpdateWhenHealthyTransition类
    rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器
            
会增加个scheduler的事件
            
  
  在RM构造方法内已经注册了对应类型的处理事件,如下:
    // Initialize the scheduler
    this.scheduler = createScheduler();
    this.schedulerDispatcher = createSchedulerEventDispatcher();
    addIfService(this.schedulerDispatcher);
    this.rmDispatcher.register(SchedulerEventType.class,
        this.schedulerDispatcher);  //事件处理器
        
        
 
  protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
    return new SchedulerEventDispatcher(this.scheduler);
  }


SchedulerEventDispatcher内部又构建了个队列,将事件放入,异步处理,最后调用scheduler来处理该事件

   public void run() {

        SchedulerEvent event;

        while (!stopped && !Thread.currentThread().isInterrupted()) {
          try {
            event = eventQueue.take();
             scheduler.handle(event); //该方法调用调度器
            ...
            

    public void handle(SchedulerEvent event) {
      try {
        int qSize = eventQueue.size();
        if (qSize !=0 && qSize %1000 == 0) {
          LOG.info("Size of scheduler event-queue is " + qSize);
        }
        int remCapacity = eventQueue.remainingCapacity();
        if (remCapacity < 1000) {
          LOG.info("Very low remaining capacity on scheduler event queue: "
              + remCapacity);
        }
        this.eventQueue.put(event);
      } catch (InterruptedException e) {
        throw new YarnRuntimeException(e);
      }
    }


//FIFOScheduler

  public void handle(SchedulerEvent event) {
    switch(event.getType()) {
    case NODE_ADDED:
    {
      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
      addNode(nodeAddedEvent.getAddedRMNode());
    }
    break;
    case NODE_REMOVED:
    {
      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
      removeNode(nodeRemovedEvent.getRemovedRMNode());
    }
    break;
    case NODE_UPDATE:
    {
      NodeUpdateSchedulerEvent nodeUpdatedEvent = 
      (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
      ....
      
      
  nodeUpdate方法
     private synchronized void nodeUpdate(RMNode rmNode) {
    FiCaSchedulerNode node = getNode(rmNode.getNodeID());
    
    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
....
      assignContainers(node);   
      
   
   
  //核心方法,分配containers
    private void assignContainers(FiCaSchedulerNode node) {
    LOG.debug("assignContainers:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " #applications=" + applications.size());

    // Try to assign containers to applications in fifo order
    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
        .entrySet()) {
      FiCaSchedulerApp application = e.getValue();
      LOG.debug("pre-assignContainers");
      application.showRequests();
      synchronized (application) {
        // Check if this resource is on the blacklist
        if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
          continue;
        }
        
        for (Priority priority : application.getPriorities()) {
          int maxContainers = 
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.OFF_SWITCH); 
          // Ensure the application needs containers of this priority
          if (maxContainers > 0) {
            int assignedContainers = 
              assignContainersOnNode(node, application, priority); //分配方法
            // Do not assign out of order w.r.t priorities
            if (assignedContainers == 0) {
              break;
            }
          }
        }
      }
      
      LOG.debug("post-assignContainers");
      application.showRequests();

      // Done
      if (Resources.lessThan(resourceCalculator, clusterResource,
              node.getAvailableResource(), minimumAllocation)) {
        break;
      }
    }

    // Update the applications' headroom to correctly take into
    // account the containers assigned in this update.
    for (FiCaSchedulerApp application : applications.values()) {
      application.setHeadroom(Resources.subtract(clusterResource, usedResource));
    }    
    
    
  assignContainersOnNode  
    private int assignContainersOnNode(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority 
  ) {
    // Data-local
    int nodeLocalContainers = 
      assignNodeLocalContainers(node, application, priority); 

    // Rack-local
    int rackLocalContainers = 
      assignRackLocalContainers(node, application, priority);
    .....
    
    
   assignNodeLocalContainers
   ..
         int assignableContainers = 
        Math.min(
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.NODE_LOCAL), 
                request.getNumContainers());
      assignedContainers = 
        assignContainer(node, application, priority, 
            assignableContainers, request, NodeType.NODE_LOCAL);   
            
            
         //总结:NM发送心跳到RM,发送NODE_UPDATE事件,激发相关事件,最终到RMNode RMNodeImpl,将事件加入RMNodeImpl ,RMNodeImpl是一个状态机
         //addTransition内可以看到会调用到StatusUpdateWhenHealthyTransition,StatusUpdateWhenHealthyTransition类 transition方法会将NodeUpdateSchedulerEvent
         //事件加入到异步处理器, 最终会调用scheduler的assignContainers方法,该方法从application里资源请求的内存结构里取资源请求,进行分配
         //并将结果保存在application的分配内存结构等待appmaster来取
         //appmaster来取的时候,首先更新资源请求内存结构,再取分配内存结构


YARN NM与RM通信