首页 > 代码库 > YARN NM与RM通信
YARN NM与RM通信
NM端发送心跳
//NM发送心跳,增加一个NODE_UPDATE事件,简单返回一个respone,异步驱动型,事件再驱动assignContainers,从资源请求结构里取出需求分配资源 //AsyncDispatcher原理 //一个event队列,一个eventtype.class 到处理器对应关系(仅仅是一个class对应一个处理器,class是个Enum可能会有很多种值,具体逻辑在处理器内部) //从队列取出event,再从event取到type类型,再找到处理器,处理器调用handler(event)方法 //nodeHeartBeat增加一个RMStatusEvent事件(事件类型是RMNodeType.Status_UPDATE) RM register到他对应的处理器 该处理器 最终调用RMNodeImpl RMNodeImpl会增加SchedulerEvent // NodeManager类会调以下这个类 NodeStatusUpdaterImpl类 protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { int lastHeartBeatID = 0; while (!isStopped) { .... response = resourceTracker.nodeHeartbeat(request); //发送心跳到ResourceTrackerService .. 会rpc远程调用 ResourceTrackerService类里 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus remoteNodeStatus = request.getNodeStatus(); /** * Here is the node heartbeat sequence... * 1. Check if it's a registered node * 2. Check if it's a valid (i.e. not excluded) node * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 4. Send healthStatus to RMNode */ .... // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), //RMNodeStatusEvent是RMNodeEvent的子类,构造器指定RMNodeEventType.STATUS_UPDATE 类型 事件 //在RM会通过register给asyncDispatcher指定类型对应的处理器,可查看后面代码,对应到NodeEventDispatcher处理器,该类内部会用RMNodeImpl,该类又会引起 //scheduler相关事件 remoteNodeStatus.getContainersStatuses(), // 包含各个container状态,是一个list remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); //新建个事件,把他放入AsyncDispatcher里的队列,最后应该会激起ResourceScheduler来处理 rmContext是在ResourceManager里构建,这里重点知道Dispatcher用的是哪个 .. this.rmContext = new RMContextImpl(this.rmDispatcher, rmStore, this.containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager, this.containerTokenSecretManager, this.nmTokenSecretManager, this.clientToAMSecretManager); 。。 protected Dispatcher createDispatcher() { return new AsyncDispatcher(); //rmDispatcher 通过该方法构建,org.apache.hadoop.yarn.event.AsyncDispatcher //有个事件队列,和事件类型到事件处理器的map关系,异步线程根据event内部取出事件类型(包含事件是哪种事件类型是在其内部设置的) //,再找到哪个处理器,具体处理器内部处理逻辑根据不同类型enum特定值区分 //类型.class与处理器对应关系,通过register } //同时RM里register注册各个,事件类型对应的事件处理器, 在AsyncDispatcher内的异步线程里再根据这个map对应关系知道用哪个事件处理器 this.rmDispatcher.register(SchedulerEventType.class, //enum类也有NODE-UPDATE的值 this.schedulerDispatcher); rmDispatcher // Register event handler for RmAppEvents this.rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(this.rmContext)); // Register event handler for RmAppAttemptEvents this.rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(this.rmContext)); // Register event handler for RmNodes this.rmDispatcher.register(RMNodeEventType.class, //枚举值,有NODE-UPDATE,NodeEventDispatcher里的处理逻辑会根据RMNodeEventType里的值做 //分别的处理,类似case when .... new NodeEventDispatcher(this.rmContext)); //注册事件处理器 NodeEventDispatcher类,在RM内部 public void handle(RMNodeEvent event) { //事件处理方法 NodeId nodeId = event.getNodeId(); RMNode node = this.rmContext.getRMNodes().get(nodeId); if (node != null) { try { ((EventHandler<RMNodeEvent>) node).handle(event) ; //通过RMNode强制转换成处理器,对RMNodeImpl同时也继承EventHandler,其内部 //会调用scheduler相关 } catch (Throwable t) { LOG.error("Error in handling event type " + event.getType() + " for node " + nodeId, t); } //总结 NodeManager发送心跳到RM端的ResourceManagerService,调用nodeHeartbeat方法,发送STATUS_UPDATE 类型的事件给到RMNode,RMNodeImpl类内 .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) StatusUpdateWhenHealthyTransition类 transition方法 rmNode.context.getDispatcher().getEventHandler().handle( new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器 //下面分析调度
RM端接受心跳后调度器分配
接上面分析 StatusUpdateWhenHealthyTransition类 rmNode.context.getDispatcher().getEventHandler().handle( new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器 会增加个scheduler的事件 在RM构造方法内已经注册了对应类型的处理事件,如下: // Initialize the scheduler this.scheduler = createScheduler(); this.schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(this.schedulerDispatcher); this.rmDispatcher.register(SchedulerEventType.class, this.schedulerDispatcher); //事件处理器 protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler); } SchedulerEventDispatcher内部又构建了个队列,将事件放入,异步处理,最后调用scheduler来处理该事件 public void run() { SchedulerEvent event; while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); scheduler.handle(event); //该方法调用调度器 ... public void handle(SchedulerEvent event) { try { int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0) { LOG.info("Size of scheduler event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.info("Very low remaining capacity on scheduler event queue: " + remCapacity); } this.eventQueue.put(event); } catch (InterruptedException e) { throw new YarnRuntimeException(e); } } //FIFOScheduler public void handle(SchedulerEvent event) { switch(event.getType()) { case NODE_ADDED: { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: { NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode()); .... nodeUpdate方法 private synchronized void nodeUpdate(RMNode rmNode) { FiCaSchedulerNode node = getNode(rmNode.getNodeID()); List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates(); .... assignContainers(node); //核心方法,分配containers private void assignContainers(FiCaSchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " #applications=" + applications.size()); // Try to assign containers to applications in fifo order for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications .entrySet()) { FiCaSchedulerApp application = e.getValue(); LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { // Check if this resource is on the blacklist if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) { continue; } for (Priority priority : application.getPriorities()) { int maxContainers = getMaxAllocatableContainers(application, priority, node, NodeType.OFF_SWITCH); // Ensure the application needs containers of this priority if (maxContainers > 0) { int assignedContainers = assignContainersOnNode(node, application, priority); //分配方法 // Do not assign out of order w.r.t priorities if (assignedContainers == 0) { break; } } } } LOG.debug("post-assignContainers"); application.showRequests(); // Done if (Resources.lessThan(resourceCalculator, clusterResource, node.getAvailableResource(), minimumAllocation)) { break; } } // Update the applications' headroom to correctly take into // account the containers assigned in this update. for (FiCaSchedulerApp application : applications.values()) { application.setHeadroom(Resources.subtract(clusterResource, usedResource)); } assignContainersOnNode private int assignContainersOnNode(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority ) { // Data-local int nodeLocalContainers = assignNodeLocalContainers(node, application, priority); // Rack-local int rackLocalContainers = assignRackLocalContainers(node, application, priority); ..... assignNodeLocalContainers .. int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, NodeType.NODE_LOCAL), request.getNumContainers()); assignedContainers = assignContainer(node, application, priority, assignableContainers, request, NodeType.NODE_LOCAL); //总结:NM发送心跳到RM,发送NODE_UPDATE事件,激发相关事件,最终到RMNode RMNodeImpl,将事件加入RMNodeImpl ,RMNodeImpl是一个状态机 //addTransition内可以看到会调用到StatusUpdateWhenHealthyTransition,StatusUpdateWhenHealthyTransition类 transition方法会将NodeUpdateSchedulerEvent //事件加入到异步处理器, 最终会调用scheduler的assignContainers方法,该方法从application里资源请求的内存结构里取资源请求,进行分配 //并将结果保存在application的分配内存结构等待appmaster来取 //appmaster来取的时候,首先更新资源请求内存结构,再取分配内存结构
YARN NM与RM通信
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。