首页 > 代码库 > Hadoop - MapReduce MRAppMaster-剖析

Hadoop - MapReduce MRAppMaster-剖析

  一 概述

        MRv1主要由编程模型(MapReduce API)、资源管理与作业控制块(由JobTracker和TaskTracker组成)和数据处理引擎(由MapTask和ReduceTask组成)三部分组成。

而YARN出现之后。资源管理模块则交由YARN实现,这样为了让MapReduce框架执行在YARN上。仅须要一个ApplicationMaster组件完毕作业控制模块功能就可以,其他部分,包含编程模型和数据处理引擎等,可直接採用MRv1原有的部分。


二 MRAppMaster组成
   
     MRAppMaster是MapReduce的ApplicationMaster实现。它使得MapReduce应用程序能够直接执行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期。包含作业管理、资源申请与再分配、Container启动与释放、作业恢复等。


   MRAppMaster 主要由已下几种组件/服务组成:
      ConainterAllocator
          与RM通信,为MapReduce作业申请资源。

作业的每一个任务资源需求可描写叙述为5元组:         

   <Priority,hostname,capacity,containers,relax_locality>,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存和CPU两种资源)、Container数据是否松弛本地化
     ClientService
          ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,client能够通过该协议获取作业的运行状态(不必通过RM)和控制作业(比方杀死作业、改变作业优先级等)。

      Job
          表示一个MapReduce作业,与MRv1中的JobInProgress功能是一样的。负责监控作业的执行状态。

它维护了一个作业的状态机,以实现异步执行各种作业相关的操作。

      Task 
           表示一个MapReduce作业的某个任务。与MRv1中的TaskInProgress功能类似。负责监控一个任务的执行状态。它维护了一个任务状态机。以实现异步执行各种任务相关的操作。
       TaskAttempt 
           表示一个任务执行实例。它的执行逻辑与MRV1中的MapTask和ReduceTask执行实例全然一致。实际上,它直接使用了MRv1中的数据处理引擎,但经过了一些优化。
       TaskCleaner
            负责清理失败任务或被杀死任务使用的文件夹和产生的暂时结果(统称为垃圾数据),它维护了一个线程池和一个共享队列。异步删除任务产生的垃圾数据。
       Speculator 
            完毕猜測执行功能。当同一个作业的某个任务执行速度明显慢于其它任务时,会为该任务启动一个备份任务。

    

       ContainerLauncher
            负责与NM通信,以启动一个Container.当RM为作业分配资源后,ContainerLauncher会将任务执行相关信息填充到Container中。包含任务执行所需资源、任务执行命令、任务执行环境、任务依赖的外部文件等。然后与相应的NodeManager通信,要求它启动Container.
       TaskAttemptListener
             负责管理各个任务的心跳信息,假设一个任务一段时间内未汇报心跳,则觉得它死掉了。会将其从系统中移除。
          JobHistoryEventHandler
              负责对作业的各个事件记录日志。

当MRApMaster出现问题时。YARN会将其又一次调度到还有一个节点上。

未了避免又一次计算。MRAppMaster首先从HDFS上读取上次执行产生的日志,以恢复已经完毕的任务,进而可以仅仅执行尚未执行完毕的任务。


三 MapReduceclient

    MapReduceclient是MapReduce用户与YARN进行通信的唯一途径,通过该client。用户能够向YARN提交作业,获取作业的执行状态和控制作业(比方杀死作业、杀死任务等).MapReduceclient涉及两个RPC通信协议:
     1.ApplicationClientProtol
          在YARN中,RM实现了ApplicationClientProtocol协议,不论什么client须要使用该协议完毕提交作业、杀死作业、改变作业的优先级等操作。

     2.MRClientProtocol
        当作业的ApplicationMaster成功启动后,它会启动MRClientService服务,该服务实现了MRClientProtoclo协议,从而同意client直接通过该协议与ApplicationMater通信以控制作业和查询作业执行状态。以减轻ResourceManager负载。

四 MRAppMaster工作流程

       依照作业的大小不同。MRAppMaster提供了三种作业执行模式:
      本地模式(通经常使用于作业调试,同MRv1一样,不再赘述)、Uber模式Non-Uber模式
        对于小作业为了减少延迟。可採用Uber模式,在该模式下,全部Map Task和Reduce Task在同一个Container(MRAppMaster所在的Container)中顺次执行;对于大作业。则採用Non-Uber模式,在该模式下,MRAppMaster先为Map Task申请资源。当Ma Task执行完毕数目达到一定比例之后再为Reduce Task申请资源。
        对于Map Task而言。它的生命周期为Scheduled->assigned->completed;
        而对于Reduce Task而言,它的生命周期为pending->scheduled->assigned->completed.
       
        在YARN之上执行MapReduce作业须要解决两个关键问题:怎样确定Reduce Task启动时机以及怎样完毕Shuffle功能。
         为了避免Reduce Task过早启动造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态。以便可以依据Map Task执行情况决定是否对其进行调度。

         MRAppMaster在MRv1原有策略基础之上加入了更为严格的资源控制策略和抢占策略。

在YARN中。NodeManager作为一种组合服务模式。同意动态载入应用程序暂时须要的附属服务,利用这一特性,YARN将Shuffle HTTP Sever组成一种服务,以便让各个NodeManager启动时载入它。

   
       当用户向YARN提交一个MapReduce应用程序后,YARN 将分两个阶段执行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MARppMaster创建应用程序。为它申请资源。并监控它的整个执行过程。直到执行完毕。
        
        步骤1 用户向YARN中(RM)提交应用程序,当中包含ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

          
        步骤2 ResourceManager为该应用程序分配第一个Container,ResouceManage与某个NodeManager通信,启动应用程序ApplicationMaster,NodeManager接到命令后,首先从HDFS上下载文件(缓存),然后启动ApplicationMaser。

       当ApplicationMaster启动后,它与ResouceManager通信,以请求和获取资源。

ApplicationMaster获取到资源后,与相应的NodeManager通信以启动任务。


        注:1.假设该应用程序第一次在给节点上启动任务。则NodeManager首先从HDFS上下载文件缓存到本地,这个是由分布式缓存实现的。然后启动该任务。
               2. 分布式缓存并非将文件缓存到集群中各个结点的内存中,而是将文件换到各个结点的磁盘上,以便运行任务时候直接从本地磁盘上读取文件。


       步骤3 ApplicationMaster首先向ResourceManager注冊。这样用户能够直接通过ResourceManage查看应用程序的执行状态,然后它将为各个任务申请资源。并监控它们的执行状态。直到执行结束,即反复步骤4~7。

       步骤4 ApplicationMaster採用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

       步骤5 一旦ApplicationMaster申请到资源后,ApplicationMaster就会将启动命令交给NodeManager,要求它启动任务。启动命令里包括了一些信息使得Container能够与Application Master进行通信。

       步骤6 NodeManager为任务设置好执行环境(包含环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过执行该脚本启动任务(Container)。

       步骤7   在Container内执行用户提交的代码,各个Container通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的执行状态,从而能够在任务失败时又一次启动任务。

       步骤8  在应用程序执行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前执行状态。

       步骤9 应用程序执行完毕后。ApplicationMaster向ResourceManager注销并关闭自己


五 MRAppMaster 生命周期
    
        MRAppMaster依据InputFormat组件的详细实现(一般是依据数据量切分数据),将作业分解成若干个Map Task和Reduce Task,当中每一个Map Task 负责处理一片Inputsplit数据,而每一个Reduce Task则进一步处理Map Task产生的中间结果。每一个Map/Reduce Task仅仅是一个详细计算任务的描写叙述,真正的任务计算工作则是由执行实例TaskAttempt完毕的。每一个Map/Reduce Task可能顺次启动多个执行实例,比方第一个执行实例失败了,则另起一个新的实例又一次计算,直到这一份数据处理完毕或者尝试次数达到上限。

       Job状态机
       Job状态机维护了一个MapReduce应用程序的生命周期,即从提交到执行结束的整个过程。一个Job由多个Map Task和Reduce Task构成。而Job状态机负责管理这些任务。Job状态机由类JobImpl实现。

      Task状态机
      Task维护了一个任务的生命周期。即从创建到执行结束整个过程

一个任务可能存在多次执行尝试。每次执行尝试被称为一个“执行实例”,Task状态机负责管理这些执行实例。Task状态机由TaskImpl实现。


      注意:1.MRAppMaster为任务申请到资源后,与相应的NodeManager通信成功启动Container。

须要注意的是,在某一个时刻,一个任务可能有多个执行实例,且可能存在执行失败的实例。可是仅仅要有一个实例执行成功,则意味着该任务执行完毕。

               2. 每一个任务的执行实例数目都有一定上限,一旦超过该上限,才觉得该任务执行失败,当中Map Task执行实例数目上限默认位4,Reduce Task执行实例默认也是 4.一个任务的失败并不一定导致整个作业执行失败,这取决于作业的错误容错率。
     
      TaskAttempt状态机
       TaskAttempt状态机维护了 一个任务执行实例的生命周期,即从创建到执行结束整个过程。它由TaskAttempImpl类实现。

        在YARN 中,任务实例是执行在Container中的。因此。Container状态的变化往往伴随任务实例的状态变化,比方任务实例执行完毕后。会清理Container占用的空间,而Container空间的清理实际上就是任务实例空间的清理。任务实例执行完后,需向MRAppMaster请求提交终于结果,一旦提交完毕后。该任务的其他实例就将被杀死。

    
   总结一个作业的运行过程大致例如以下:
        创建实例=》MRApMaster向ResourceManager申请资源=》获得Container=》启动Container(执行实例)=》提交执行结果=》清理结果
        当一个Container执行结束后,MRAppMaster可直接从ResourceManager上获知。

各个任务执行实例需定期向MRAppMaster汇报进度和状态,否则MRAppMaster觉得该任务处于僵死状态。会将它杀死,每次汇报均会触发一个TA_UPDATE事件。

      注:1.MRAppMaster能够由两条路径来得知Conainer的当前执行状态:
              a. 通过ResourceManager(MRAppMaster与ResouceManager中维护一个心跳信息)
              b. 还有一个是直接通过Task Attempt(每一个Task Attempt与MRAppMaster之间有专用的协议) 
            2. 这两条路径是独立的,没有先后顺序之分,假设MRAppMaster直接从ResouceManager获取Container执行完毕信息。则任务实例直接从Running转化为SUCCESS_CONTAINER_CLEANUP状态,假设首先从TaskAttempt中获知任务完毕信息。则将首先转化为COMMIT_PENDING状态。然后再转化为SUCCESS_CONTAINER_CLEANUP状态。

       当任务运行失败或者被杀死时,需清理它占用的磁盘空间和产生的结果。当Reduce Task远程复制一个已经运行完毕的Map Task输出数据时,可能由于磁盘或者网络等原因,导致数据损坏或者数据丢失。这是会触发一个TA_TOO_MANY_FETCH_FAILURE事件。从而触发MRAppMaster又一次调度运行该Map Task.

六  资源申请和再分配
     
         ContainerAllocator是MRAppMaster中负责资源申请和分配的模块。

用户提交的作业被分解成Map Task和Reduce Task后,这些Task所需的资源统一由ContainerAllocator模块负责从ResourceManager中申请,而一旦ContainAllocator得到资源后,需採用一定的策略进一步分配给作业的各个任务。

       在YARN中,作业的资源描写叙述能够被描写叙述为五元组:priority,hostname,capabiity,containers,relax_locality分别表示 作业优先级    期望资源所在的host  资源量(当前支持内存与CPU两种资源) 、Containers数目  是否松弛本地化。

比如:

     <10,"node1","memeory:1G,CPU:1",3,true)//  优先级是一个正整数,优先级值越小,优先级越高
        ContainerAllocator周期性的通过心跳与ResourceManager通信。以获取已经分配的Contaienr列表,完毕的Container列表、近期更新的节点*+列表等信息,而ContanerAllocator依据这些信息完毕对应的操作。
       当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列的Map Task和TaskReduce  Task任务,因为Reduce Task依赖于Map Task之间的结果,所以Reduce Task会延后调度。

     
 任务状态描写叙述
       Map:  scheduled->assigned->completed
       Task:  pending-> scheduled->assigned->completed
            pending 表示等待ContainerAllocator发送资源请求的集合
            scheduled 标识已经发送了资源申请给RM。但还没收到分配的资源的任务集合
            assignd 已经受到RM分配的资源的任务集合
            complet 表示已完毕的任务集合
     
        三种作业状态:Failed Map Task ,Map Task,Reduce Task分别赋予它们优先级5 20 10也就是说,当三种任务同一时候有资源请求的时候。会优先分配给Failed Map Task,然后是Reduce Task,最后是Map Task.
       假设一个任务执行失败,则会又一次为该任务申请资源
       假设一个任务执行速度过慢。则会为其额外申请资源已启动备份任务(假设启动了猜測执行过程)
       假设一个节点的失败任务数目过多,则会撤销对该节点的全部资源的申请请求。


        注:在大多数数的情况下,RMAppMaster与RM的心跳信息都是空的。即心跳信息不包括新的资源请求信息,这样的心跳信息有一下几个作用:
              1. 周期性发送心跳。告诉RM自己还活着
              2. 周期性询问RM,以获取新分配的资源和各个Container执行状况。
  
  资源再分配
    一旦MRAppMaster收到新分配的Container后,会将这些Container进一步分配给各个任务。Container分配步骤例如以下:
      1.推断新收到的Container包括的资源是否满足,假设不满足。则通过下次心跳通知ResourceManager释放该Container.
      2.推断收到的Container所在的节点是否被增加到黑名单中,假设是。则寻找一个与该Container匹配的任务,并又一次为该任务申请资源。同一时候通过下次心跳通知ResourceManager释放该Container.
      3.依据Container的优先级。将它分配给相应类型的任务。



七 Contianer启动和释放
     
      当ContainerAllocator为某个任务申请到资源后,会将执行该任务相关的全部信息封装到Container中。并要求相应的节点启动该Container。须要注意的是。Container中执行的任务相应的数据处理引擎与MRv1中全然一致,仍为Map Task和 Reduce Task。

正由于如此。MRv1的程序与YARN中的MapReduce程序全然兼容。


        ContainerLaunche负责与各个NodeManager通信,已启动或者释放Container。在YARN中。执行的Task所需的所有信息被封装到Container中,包含所需的资源、依赖的外部文件、JAR包、执行时环境变量、执行命令等。ContainerLauncher通过RPC协议ContainerManager与NodeManager通信,以控制Container的启动和释放。进而控制任务的执行(比方启动任务、杀死任务等)。
   有多种可能触发停止/杀死一个Container,常见的有:
       1.猜測执行时一个任务执行完毕,需杀死还有一个同样输入数据的任务。
       2.用户发送一个杀死任务请求。

       3.随意一个任务执行结束时,YARN会触发一个杀死任务的命令,以释放相应的Container占用的资源。

八 猜測运行机制

        为了防止运行速度慢的任务拖慢总体的运行进度,使用猜測运行机制,Hadoop会为该任务启动一个备份任务,让该备份任务与原始任务同一时候处理同一份数据,谁先运行完。则将谁的结果作为终于结果。
      注:1.每一个任务最多仅仅能有一个备份任务实例
     2. 启动备份的时候。必须保证已经有足够多的Map任务已经完毕,依据这些完毕的任务来估算是否来启动备份任务。

       这样的算法的长处是可最大化备份任务的有效率。当中有效率指有效备份任务数与全部备份任务数的比值,有效任务是指完毕时间早于原始任务完毕时间的备份任务(即带来实际收益的备份任务)。备份任务的有效率越高。猜測运行算法越优秀,带来的收益也就越大。
       猜測运行机制实际上採用了经典的算法优化方法,以空间换时间,它同一时候启动多个同样的任务处理同样的数据,并让这些任务竞争以缩短数据的处理时间。

八 作业恢复

       从作业恢复粒度角度来看,当前存在三种不同级别的恢复机制,级别由低到高依次是作业级别、任务级别和记录级别。当中级别越低实现越简单,但造成的资源浪费也越严重。

当前MRAppMaster採用了任务级别的恢复机制,即以任务为基本单位进行恢复,这样的机制是基于事务型日志完毕作业恢复的,它仅仅关注两种任务:执行完毕的任务和未完毕的任务。作业执行过程中,MRAppMaster会以日志的形式将作业以及状态记录下来,一旦MRAppMaster重新启动,则可从日志中恢复作业的执行状态。

      当前MRAppMaster的作业恢复机制仅能做到恢复上一次已经执行完毕的任务,对于正在执行的任务。则在前一次MRAppMaster执行实例退出时由ResourceManager强制将其杀死并回收资源。
       MRAppMaster採用了开源数据序列化工具Apache Avro记录这些事件。

Avro是一个数据序列化系统,通经常使用于支持大批数据交换和跨语言RPC的应用。

 
九 MRv1与MRv2简单对照
  
     MRAppMaster仍採用了MRv1中的数据处理引擎。分别由数据处理引擎MapTask和ReduceTask完毕Map任务和Reduce任务的处理。


 MRv1与MRv2的比較
   MRv2中在Map端 用Netty取代Jetty. Reduce端採用批拷贝、shuffle和排序插件化

  应用程序编程接口                新旧API                                                      新旧API 
  执行时环境              由JobTracker与TaskTracker组成    YARN (由RM和NM组成)和MRAppMaster
  数据处理引擎                   MapTask/Reduce Task                                   MapTask/Reduce Task


       须要注意的是。YARN并不会改变MapReduce编程模型,它仅仅是应用开发者使用的API。YARN提供了一种新的资源管理模型和实现,用来 执行MapReduce任务。

因此。在最简单的情况下。现有的MapReduce应用仍然能照原样执行(须要又一次编译),YARN仅仅只是能让开发者更精 确地指定运行參数。


十 小结

     MapRecuce On YARN的执行时环境由YARN与ApplicationMaster构成,这样的新颖的执行时环境使得MapReduce能够与其它计算框架执行在一个集群中,从而达到共享集群资源、提高资源利用率的目的。

随着YARN的程序与完好,MRv1的独立执行模式将被MapRedcue On YARN代替。


Hadoop - MapReduce MRAppMaster-剖析