一 概述
MRv1主要由编程模型(MapReduce API)、资源管理与作业控制块(由JobTracker和TaskTracker组成)和数据处理引擎(由MapTask和ReduceTask组成)三部分组成。而YARN出现之后,资源管理模块则交由YARN实现,这样为了让MapReduce框架运行在YARN上,仅需要一个ApplicationMaster组件完成作业控制模块功能即可,其它部分,包括编程模型和数据处理引擎等,可直接采用MRv1原有的部分。
二 MRAppMaster组成
MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce应用程序可以直接运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括作业管理、资源申请与再分配、Container启动与释放、作业恢复等。
MRAppMaster 主要由已下几种组件/服务组成:
ConainterAllocator
与RM通信,为MapReduce作业申请资源。作业的每个任务资源需求可描述为5元组:
<Priority,hostname,capacity,containers,relax_locality>,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存和CPU两种资源)、Container数据是否松弛本地化
ClientService
ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,客户端可以通过该协议获取作业的执行状态(不必通过RM)和控制作业(比如杀死作业、改变作业优先级等)。
Job
表示一个MapReduce作业,与MRv1中的JobInProgress功能是一样的,负责监控作业的运行状态。它维护了一个作业的状态机,以实现异步执行各种作业相关的操作。
Task
表示一个MapReduce作业的某个任务,与MRv1中的TaskInProgress功能类似,负责监控一个任务的运行状态。它维护了一个任务状态机,以实现异步执行各种任务相关的操作。
TaskAttempt
表示一个任务运行实例,它的执行逻辑与MRV1中的MapTask和ReduceTask运行实例完全一致,实际上,它直接使用了MRv1中的数据处理引擎,但经过了一些优化。
TaskCleaner
负责清理失败任务或被杀死任务使用的目录和产生的临时结果(统称为垃圾数据),它维护了一个线程池和一个共享队列,异步删除任务产生的垃圾数据。
Speculator
完成推测执行功能,当同一个作业的某个任务运行速度明显慢于其他任务时,会为该任务启动一个备份任务。
ContainerLauncher
负责与NM通信,以启动一个Container.当RM为作业分配资源后,ContainerLauncher会将任务执行相关信息填充到Container中,包括任务运行所需资源、任务运行命令、任务运行环境、任务依赖的外部文件等,然后与对应的NodeManager通信,要求它启动Container.
TaskAttemptListener
负责管理各个任务的心跳信息,如果一个任务一段时间内未汇报心跳,则认为它死掉了,会将其从系统中移除。
JobHistoryEventHandler
负责对作业的各个事件记录日志。当MRApMaster出现故障时,YARN会将其重新调度到另一个节点上。未了避免重新计算,MRAppMaster首先从HDFS上读取上次运行产生的日志,以恢复已经完成的任务,进而能够只运行尚未运行完成的任务。
三 MapReduce客户端
MapReduce客户端是MapReduce用户与YARN进行通信的唯一途径,通过该客户端,用户可以向YARN提交作业,获取作业的运行状态和控制作业(比如杀死作业、杀死任务等).MapReduce客户端涉及两个RPC通信协议:
1.ApplicationClientProtol
在YARN中,RM实现了ApplicationClientProtocol协议,任何客户端需要使用该协议完成提交作业、杀死作业、改变作业的优先级等操作。
2.MRClientProtocol
当作业的ApplicationMaster成功启动后,它会启动MRClientService服务,该服务实现了MRClientProtoclo协议,从而允许客户端直接通过该协议与ApplicationMater通信以控制作业和查询作业运行状态,以减轻ResourceManager负载。
四 MRAppMaster工作流程
按照作业的大小不同,MRAppMaster提供了三种作业运行模式:
本地模式(通常用于作业调试,同MRv1一样,不再赘述)、Uber模式和Non-Uber模式。
对于小作业为了降低延迟,可采用Uber模式,在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在的Container)中顺次执行;对于大作业,则采用Non-Uber模式,在该模式下,MRAppMaster先为Map Task申请资源,当Ma Task运行完成数目达到一定比例之后再为Reduce Task申请资源。
对于Map Task而言,它的生命周期为Scheduled->assigned->completed;
而对于Reduce Task而言,它的生命周期为pending->scheduled->assigned->completed.
在YARN之上运行MapReduce作业需要解决两个关键问题:如何确定Reduce Task启动时机以及如何完成Shuffle功能。
为了避免Reduce Task过早启动造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态,以便能够根据Map Task运行情况决定是否对其进行调度。
MRAppMaster在MRv1原有策略基础之上添加了更为严格的资源控制策略和抢占策略。在YARN中,NodeManager作为一种组合服务模式,允许动态加载应用程序临时需要的附属服务,利用这一特性,YARN将Shuffle HTTP Sever组成一种服务,以便让各个NodeManager启动时加载它。
当用户向YARN提交一个MapReduce应用程序后,YARN 将分两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MARppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。
步骤1 用户向YARN中(RM)提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
步骤2 ResourceManager为该应用程序分配第一个Container,ResouceManage与某个NodeManager通信,启动应用程序ApplicationMaster,NodeManager接到命令后,首先从HDFS上下载文件(缓存),然后启动ApplicationMaser。
当ApplicationMaster启动后,它与ResouceManager通信,以请求和获取资源。ApplicationMaster获取到资源后,与对应的NodeManager通信以启动任务。
注:1.如果该应用程序第一次在给节点上启动任务,则NodeManager首先从HDFS上下载文件缓存到本地,这个是由分布式缓存实现的,然后启动该任务。
2. 分布式缓存并不是将文件缓存到集群中各个结点的内存中,而是将文件换到各个结点的磁盘上,以便执行任务时候直接从本地磁盘上读取文件。
步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤4~7。
步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5 一旦ApplicationMaster申请到资源后,ApplicationMaster就会将启动命令交给NodeManager,要求它启动任务。启动命令里包含了一些信息使得Container可以与Application Master进行通信。
步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务(Container)。
步骤7 在Container内执行用户提交的代码,各个Container通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
步骤8 在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤9 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己
五 MRAppMaster 生命周期
MRAppMaster根据InputFormat组件的具体实现(通常是根据数据量切分数据),将作业分解成若干个Map Task和Reduce Task,其中每个Map Task 负责处理一片Inputsplit数据,而每个Reduce Task则进一步处理Map Task产生的中间结果。每个Map/Reduce Task只是一个具体计算任务的描述,真正的任务计算工作则是由运行实例TaskAttempt完成的,每个Map/Reduce Task可能顺次启动多个运行实例,比如第一个运行实例失败了,则另起一个新的实例重新计算,直到这一份数据处理完成或者尝试次数达到上限。
Job状态机
Job状态机维护了一个MapReduce应用程序的生命周期,即从提交到运行结束的整个过程。一个Job由多个Map Task和Reduce Task构成,而Job状态机负责管理这些任务。Job状态机由类JobImpl实现。
Task状态机
Task维护了一个任务的生命周期,即从创建到运行结束整个过程。一个任务可能存在多次运行尝试,每次运行尝试被称为一个“运行实例”,Task状态机负责管理这些运行实例。Task状态机由TaskImpl实现。
注意:1.MRAppMaster为任务申请到资源后,与对应的NodeManager通信成功启动Container。需要注意的是,在某一个时刻,一个任务可能有多个运行实例,且可能存在运行失败的实例,但是只要有一个实例运行成功,则意味着该任务运行完成。
2. 每个任务的运行实例数目都有一定上限,一旦超过该上限,才认为该任务运行失败,其中Map Task运行实例数目上限默认位4,Reduce Task运行实例默认也是 4.一个任务的失败并不一定导致整个作业运行失败,这取决于作业的错误容错率。
TaskAttempt状态机
TaskAttempt状态机维护了 一个任务运行实例的生命周期,即从创建到运行结束整个过程。它由TaskAttempImpl类实现。
在YARN 中,任务实例是运行在Container中的,因此,Container状态的变化往往伴随任务实例的状态变化,比如任务实例运行完成后,会清理Container占用的空间,而Container空间的清理实际上就是任务实例空间的清理。任务实例运行完后,需向MRAppMaster请求提交最终结果,一旦提交完成后,该任务的其它实例就将被杀死。
总结一个作业的执行过程大致如下:
创建实例=》MRApMaster向ResourceManager申请资源=》获得Container=》启动Container(运行实例)=》提交运行结果=》清理结果
当一个Container运行结束后,MRAppMaster可直接从ResourceManager上获知。各个任务运行实例需定期向MRAppMaster汇报进度和状态,否则MRAppMaster认为该任务处于僵死状态,会将它杀死,每次汇报均会触发一个TA_UPDATE事件。
注:1.MRAppMaster可以由两条路径来得知Conainer的当前运行状态:
a. 通过ResourceManager(MRAppMaster与ResouceManager中维护一个心跳信息)
b. 另一个是直接通过Task Attempt(每个Task Attempt与MRAppMaster之间有专用的协议)
2. 这两条路径是独立的,没有先后顺序之分,如果MRAppMaster直接从ResouceManager获取Container运行完成信息,则任务实例直接从Running转化为SUCCESS_CONTAINER_CLEANUP状态,如果首先从TaskAttempt中获知任务完成信息。则将首先转化为COMMIT_PENDING状态,然后再转化为SUCCESS_CONTAINER_CLEANUP状态。
当任务执行失败或者被杀死时,需清理它占用的磁盘空间和产生的结果。当Reduce Task远程复制一个已经运行完成的Map Task输出数据时,可能因为磁盘或者网络等原因,导致数据损坏或者数据丢失,这是会触发一个TA_TOO_MANY_FETCH_FAILURE事件,从而触发MRAppMaster重新调度执行该Map Task.
六 资源申请和再分配
ContainerAllocator是MRAppMaster中负责资源申请和分配的模块。用户提交的作业被分解成Map Task和Reduce Task后,这些Task所需的资源统一由ContainerAllocator模块负责从ResourceManager中申请,而一旦ContainAllocator得到资源后,需采用一定的策略进一步分配给作业的各个任务。
在YARN中,作业的资源描述可以被描述为五元组:priority,hostname,capabiity,containers,relax_locality分别表示 作业优先级 期望资源所在的host 资源量(当前支持内存与CPU两种资源) 、Containers数目 是否松弛本地化。例如:
<10,"node1","memeory:1G,CPU:1",3,true)// 优先级是一个正整数,优先级值越小,优先级越高
ContainerAllocator周期性的通过心跳与ResourceManager通信,以获取已经分配的Contaienr列表,完成的Container列表、最近更新的节点*+列表等信息,而ContanerAllocator根据这些信息完成相应的操作。
当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列的Map Task和TaskReduce Task任务,由于Reduce Task依赖于Map Task之间的结果,所以Reduce Task会延后调度。
任务状态描述
Map: scheduled->assigned->completed
Task: pending-> scheduled->assigned->completed
pending 表示等待ContainerAllocator发送资源请求的集合
scheduled 标识已经发送了资源申请给RM,但还没收到分配的资源的任务集合
assignd 已经受到RM分配的资源的任务集合
complet 表示已完成的任务集合
三种作业状态:Failed Map Task ,Map Task,Reduce Task分别赋予它们优先级5 20 10也就是说,当三种任务同时有资源请求的时候,会优先分配给Failed Map Task,然后是Reduce Task,最后是Map Task.
如果一个任务运行失败,则会重新为该任务申请资源
如果一个任务运行速度过慢,则会为其额外申请资源已启动备份任务(如果启动了推测执行过程)
如果一个节点的失败任务数目过多,则会撤销对该节点的所有资源的申请请求。
注:在大多数数的情况下,RMAppMaster与RM的心跳信息都是空的,即心跳信息不包含新的资源请求信息,这种心跳信息有一下几个作用:
1. 周期性发送心跳,告诉RM自己还活着
2. 周期性询问RM,以获取新分配的资源和各个Container运行状况。
资源再分配
一旦MRAppMaster收到新分配的Container后,会将这些Container进一步分配给各个任务,Container分配过程如下:
1.判断新收到的Container包含的资源是否满足,如果不满足,则通过下次心跳通知ResourceManager释放该Container.
2.判断收到的Container所在的节点是否被加入到黑名单中,如果是,则寻找一个与该Container匹配的任务,并重新为该任务申请资源,同时通过下次心跳通知ResourceManager释放该Container.
3.根据Container的优先级,将它分配给对应类型的任务。
七 Contianer启动和释放
当ContainerAllocator为某个任务申请到资源后,会将运行该任务相关的所有信息封装到Container中,并要求对应的节点启动该Container。需要注意的是,Container中运行的任务对应的数据处理引擎与MRv1中完全一致,仍为Map Task和 Reduce Task。正因为如此,MRv1的程序与YARN中的MapReduce程序完全兼容。
ContainerLaunche负责与各个NodeManager通信,已启动或者释放Container。在YARN中,运行的Task所需的全部信息被封装到Container中,包括所需的资源、依赖的外部文件、JAR包、运行时环境变量、运行命令等。ContainerLauncher通过RPC协议ContainerManager与NodeManager通信,以控制Container的启动和释放,进而控制任务的执行(比如启动任务、杀死任务等)。
有多种可能触发停止/杀死一个Container,常见的有:
1.推测执行时一个任务运行完成,需杀死另一个相同输入数据的任务。
2.用户发送一个杀死任务请求。
3.任意一个任务运行结束时,YARN会触发一个杀死任务的命令,以释放对应的Container占用的资源。
八 推测执行机制
为了防止执行速度慢的任务拖慢整体的执行进度,使用推测执行机制,Hadoop会为该任务启动一个备份任务,让该备份任务与原始任务同时处理同一份数据,谁先运行完,则将谁的结果作为最终结果。
注:1.每个任务最多只能有一个备份任务实例
2. 启动备份的时候,必须保证已经有足够多的Map任务已经完成,根据这些完成的任务来估算是否来启动备份任务。
这种算法的优点是可最大化备份任务的有效率,其中有效率指有效备份任务数与所有备份任务数的比值,有效任务是指完成时间早于原始任务完成时间的备份任务(即带来实际收益的备份任务)。备份任务的有效率越高,推测执行算法越优秀,带来的收益也就越大。
推测执行机制实际上采用了经典的算法优化方法,以空间换时间,它同时启动多个相同的任务处理相同的数据,并让这些任务竞争以缩短数据的处理时间。
八 作业恢复
从作业恢复粒度角度来看,当前存在三种不同级别的恢复机制,级别由低到高依次是作业级别、任务级别和记录级别,其中级别越低实现越简单,但造成的资源浪费也越严重。当前MRAppMaster采用了任务级别的恢复机制,即以任务为基本单位进行恢复,这种机制是基于事务型日志完成作业恢复的,它只关注两种任务:运行完成的任务和未完成的任务。作业执行过程中,MRAppMaster会以日志的形式将作业以及状态记录下来,一旦MRAppMaster重启,则可从日志中恢复作业的运行状态。
当前MRAppMaster的作业恢复机制仅能做到恢复上一次已经运行完成的任务,对于正在运行的任务,则在前一次MRAppMaster运行实例退出时由ResourceManager强制将其杀死并回收资源。
MRAppMaster采用了开源数据序列化工具Apache Avro记录这些事件。Avro是一个数据序列化系统,通常用于支持大批数据交换和跨语言RPC的应用。
九 MRv1与MRv2简单对比
MRAppMaster仍采用了MRv1中的数据处理引擎,分别由数据处理引擎MapTask和ReduceTask完成Map任务和Reduce任务的处理。
MRv1与MRv2的比较
MRv2中在Map端 用Netty代替Jetty. Reduce端采用批拷贝、shuffle和排序插件化
应用程序编程接口 新旧API 新旧API
运行时环境 由JobTracker与TaskTracker组成 YARN (由RM和NM组成)和MRAppMaster
数据处理引擎 MapTask/Reduce Task MapTask/Reduce Task
需要注意的是,YARN并不会改变MapReduce编程模型,它只是应用开发人员使用的API。YARN提供了一种新的资源管理模型和实现,用来 执行MapReduce任务。因此,在最简单的情况下,现有的MapReduce应用仍然能照原样运行(需要重新编译),YARN只不过能让开发人员更精 确地指定执行参数。
十 小结
MapRecuce On YARN的运行时环境由YARN与ApplicationMaster构成,这种新颖的运行时环境使得MapReduce可以与其他计算框架运行在一个集群中,从而达到共享集群资源、提高资源利用率的目的。随着YARN的程序与完善,MRv1的独立运行模式将被MapRedcue On YARN取代。