首页 > 代码库 > hadoop MapReduce Yarn运行机制

hadoop MapReduce Yarn运行机制

原 Hadoop MapReduce 框架的问题

技术分享

原hadoop的MapReduce框架图

从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路:

  1. 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
  2. TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况
  3. TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat 发送给 JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。上图虚线箭头就是表示消息的发送 - 接收的过程。

   可以看得出原来的 map-reduce 架构是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:

  1. JobTracker 是 Map-reduce 的集中处理点,存在单点故障
  2. JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  3. 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM
  4. 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
  5. 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  6. 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

 

新 Hadoop Yarn 框架原理及运作机制

  从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。

  为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:

技术分享

  重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理(ResourceManager)任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce job或者是一个 DAG( 有向无环图 ) job。ResourceManager 和每一台机器的节点管理服务器(NodeManager)能够管理用户在那台机器上的进程并能对计算进行组织。

  ResourceManager协调集群的资源利用,任何client或者运行着的applicatitonMaster想要运行job或者task都得向RM申请一定的资源。ApplicatonMaster是一个框架特殊的库,对于MapReduce框架而言有它自己的AM实现,用户也可以实现自己的AM,在运行的时候,AM结合从 ResourceManager 获得的资源与NM一起来启动和监视tasks。 

ResourceManager

  ResourceManager作为资源的协调者有两个主要的组件:Scheduler和ApplicationsManager(AsM)。

  Scheduler负责分配最少但满足application运行所需的资源量给Application。Scheduler只是基于应用程序资源的使用情况进行调度,资源包括:内存,CPU,磁盘,网络等等,可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。从某种意义上讲它就是一个纯粹的调度器,并不负责监视/跟踪application的状态,当然也不会处理失败的task。RM使用resource container概念来管理集群的资源,每一个应用程序需要不同类型的资源因此就需要不同的容器。resource container是资源的抽象,每个container包括一定的内存、IO、网络等资源,不过目前的实现只包括内存一种资源。ResourceManager 提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。

  ApplicationsManager(AsM)负责处理client提交的job以及协商第一个container以供applicationMaster运行,并且在applicationMaster失败的时候会重新启动applicationMaster。下面阐述RM具体完成的一些功能。

  1. 资源调度:Scheduler从所有运行着的application收到资源请求后构建一个全局的资源分配计划,然后根据application特殊的限制以及全局的一些限制条件分配资源。
  2. 资源监视:Scheduler会周期性的接收来自NM的资源使用率的监控信息,另外applicationMaster可以从Scheduler得到属于它的已完成的container的状态信息。
  3. Application提交:
    1. client向AsM获得一个applicationID
    2. client将applicationID以及需要的jar包文件等上传到hdfs的指定目录,由yarn-site.xml的yarn.app.mapreduce.am.staging-dir指定
    3. client构造资源请求的对象以及application的提交上下文发送给AsM
    4. AsM接收application的提交上下文
    5. AsM根据application的信息向Scheduler协商一个Container供applicationMaster运行,然后启动applicationMaster
    6. 向该container所属的NM发送launchContainer信息启动该container,也即启动applicationMaster、AsM向client提供运行着的AM的状态信息。

   4. AM的生命周期:AsM负责系统中所有AM的生命周期的管理。AsM负责AM的启动,当AM启动后,AM会周期性的向AsM发送heartbeat,默认是1s,AsM据此了解AM的存活情况,并且在AM失败时负责重启AM,若是一定时间过后(默认10分钟)没有收到AM的heartbeat,AsM就认为该AM失败了。

  

NodeManager

  NM是每一台机器框架的代理,主要负责启动RM分配给AM的container以及代表AM的container,并且会监视container的运行情况。在启动container的时候,NM会设置一些必要的环境变量以及将container运行所需的jar包、文件等从hdfs下载到本地,也就是所谓的资源本地化;当所有准备工作做好后,才会启动代表该container的脚本将程序启动起来。启动起来后,NM会周期性的监视该container运行占用的资源情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报,若是超过了该container所声明的资源量,则会kill掉该container所代表的进程。

  另外,NM还提供了一个简单的服务以管理它所在机器的本地目录。Applications可以继续访问本地目录即使那台机器上已经没有了属于它的container在运行。例如,Map-Reduce应用程序使用这个服务存储map output并且shuffle它们给相应的reduce task。

  在NM上还可以扩展自己的服务,yarn提供了一个yarn.nodemanager.aux-services的配置项,通过该配置,用户可以自定义一些服务,例如Map-Reduce的shuffle功能就是采用这种方式实现的。

  NM在本地为每个运行着的application生成如下的目录结构:

技术分享

  Container目录下的目录结构如下: 

技术分享

  在启动一个container的时候,NM就执行该container的default_container_executor.sh,该脚本内部会执行launch_container.sh。launch_container.sh会先设置一些环境变量,最后启动执行程序的命令。对于MapReduce而言,启动AM就执行org.apache.hadoop.mapreduce.v2.app.MRAppMaster;启动map/reduce task就执行org.apache.hadoop.mapred.YarnChild。

ApplicationMaster

  ApplicationMaster是一个框架特殊的库,每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

  对于Map-Reduce计算模型而言有它自己的ApplicationMaster实现,对于其他的想要运行在yarn上的计算模型而言,必须得实现针对该计算模型的ApplicationMaster用以向RM申请资源运行task,比如运行在yarn上的spark框架也有对应的ApplicationMaster实现,归根结底,yarn是一个资源管理的框架,并不是一个计算框架,要想在yarn上运行应用程序,还得有特定的计算框架的实现。由于yarn是伴随着MRv2一起出现的,所以下面简要概述MRv2在yarn上的运行流程。

MRv2运行流程:

  1. MR JobClient向resourceManager(AsM)提交一个job
  2. AsM向Scheduler请求一个供MR AM运行的container,然后启动它
  3. MR AM启动起来后向AsM注册
  4. MR JobClient向AsM获取到MR AM相关的信息,然后直接与MR AM进行通信
  5. MR AM计算splits并为所有的map构造资源请求
  6. MR AM做一些必要的MR OutputCommitter的准备工作
  7. MR AM向RM(Scheduler)发起资源请求,得到一组供map/reduce task运行的container,然后与NM一起对每一个container执行一些必要的任务,包括资源本地化等
  8. MR AM 监视运行着的task 直到完成,当task失败时,申请新的container运行失败的task
  9. 当每个map/reduce task完成后,MR AM运行MR OutputCommitter的cleanup 代码,也就是进行一些收尾工作
  10. 当所有的map/reduce完成后,MR AM运行OutputCommitter的必要的job commit或者abort APIs
  11. MR AM退出。

在Yarn上写应用程序

在yarn上写应用程序并不同于我们熟知的MapReduce应用程序,必须牢记yarn只是一个资源管理的框架,并不是一个计算框架,计算框架可以运行在yarn上。我们所能做的就是向RM申请container,然后配合NM一起来启动container。就像MRv2一样,jobclient请求用于MR AM运行的container,设置环境变量和启动命令,然后交由NM去启动MR AM,随后map/reduce task就由MR AM全权负责,当然task的启动也是由MR AM向RM申请container,然后配合NM一起来启动的。所以要想在yarn上运行非特定计算框架的程序,我们就得实现自己的client和applicationMaster。另外我们自定义的AM需要放在各个NM的classpath下,因为AM可能运行在任何NM所在的机器上。

 

新旧 Hadoop MapReduce 框架比对

  让我们来对新旧 MapReduce 框架做详细的分析和对比,可以看到有以下几点显著变化:

  首先客户端不变,其调用 API 及接口大部分保持兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大的改变 ( 详见 2.3 Demo 代码开发及详解),但是原框架中核心的 JobTracker 和 TaskTracker 不见了,取而代之的是 ResourceManager, ApplicationMaster 与 NodeManager 三个部分。

  我们来详细解释这三个部分,首先 ResourceManager 是一个中心的服务,它做的事情是调度、启动每一个 Job 所属的 ApplicationMaster、另外监控 ApplicationMaster 的存在情况。细心的读者会发现:Job 里面所在的 task 的监控、重启等等内容不见了。这就是 AppMst 存在的原因。ResourceManager 负责作业与资源的调度。接收 JobSubmitter 提交的作业,按照作业的上下文 (Context) 信息,以及从 NodeManager 收集来的状态信息,启动调度过程,分配一个 Container 作为 App Mstr

  NodeManager 功能比较专一,就是负责 Container 状态的维护,并向 RM 保持心跳。

  ApplicationMaster 负责一个 Job 生命周期内的所有工作,类似老的框架中 JobTracker。但注意每一个 Job(不是每一种)都有一个 ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。

  

Yarn 框架相对于老的 MapReduce 框架什么优势呢?我们可以看到:

  1. 这个设计大大减小了 JobTracker(也就是现在的 ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。
  2. 在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
  3. 对于资源的表示以内存为单位 ( 在目前版本的 Yarn 中,没有考虑 cpu 的占用 ),比之前以剩余 slot 数目更合理。
  4. 老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsManager( 注意不是 ApplicationMaster),它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。
  5. Container 是 Yarn 为了将来作资源隔离而提出的一个框架。这一点应该借鉴了 Mesos 的工作,目前是一个框架,仅仅提供 java 虚拟机内存的隔离 ,hadoop 团队的设计思路应该后续能支持更多的资源调度和控制 , 既然资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资源闲置的尴尬情况。

 

  新的 Yarn 框架相对旧 MapRduce 框架而言,其配置文件 , 启停脚本及全局变量等也发生了一些变化,主要的改变如下:

表 1. 新旧 Hadoop 脚本 / 变量 / 位置变化表

改变项原框架中新框架中(Yarn)备注
配置文件位置 ${hadoop_home_dir}/conf ${hadoop_home_dir}/etc/hadoop/ Yarn 框架也兼容老的 ${hadoop_home_dir}/conf 位置配置,启动时会检测是否存在老的 conf 目录,如果存在将加载 conf 目录下的配置,否则加载 etc 下配置
启停脚本 ${hadoop_home_dir}/bin/start(stop)-all.sh ${hadoop_home_dir}/sbin/start(stop)-dfs.sh
${hadoop_home_dir}/bin/start(stop)-all.sh
新的 Yarn 框架中启动分布式文件系统和启动 Yarn 分离,启动 / 停止分布式文件系统的命令位于 ${hadoop_home_dir}/sbin 目录下,启动 / 停止 Yarn 框架位于 ${hadoop_home_dir}/bin/ 目录下
JAVA_HOME 全局变量 ${hadoop_home_dir}/bin/start-all.sh 中 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh
${hadoop_home_dir}/etc/hadoop/Yarn-env.sh
Yarn 框架中由于启动 hdfs 分布式文件系统和启动 MapReduce 框架分离,JAVA_HOME 需要在 hadoop-env.sh 和 Yarn-env.sh 中分别配置
HADOOP_LOG_DIR 全局变量 不需要配置 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 老框架在 LOG,conf,tmp 目录等均默认为脚本启动的当前目录下的 log,conf,tmp 子目录 
Yarn 新框架中 Log 默认创建在 Hadoop 用户的 home 目录下的 log 子目录,因此最好在 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 配置 HADOOP_LOG_DIR,否则有可能会因为你启动 hadoop 的用户的 .bashrc 或者 .bash_profile 中指定了其他的 PATH 变量而造成日志位置混乱,而该位置没有访问权限的话启动过程中会报错

  由于新的 Yarn 框架与原 Hadoop MapReduce 框架相比变化较大,核心的配置文件中很多项在新框架中已经废弃,而新框架中新增了很多其他配置项,看下表所示会更加清晰:

 表 2. 新旧 Hadoop 框架配置项变化表

 

配置文件配置项Hadoop 0.20.X 配置Hadoop 0.23.X 配置说明
core-site.xml 系统默认分布式文件 URI fs.default.name fs.defaultFS  
hdfs-site.xml DFS name node 存放 name table 的目录 dfs.name.dir dfs.namenode.name.dir 新框架中 name node 分成 dfs.namenode.name.dir( 存放 naname table 和 dfs.namenode.edits.dir(存放 edit 文件),默认是同一个目录
  DFS data node 存放数据 block 的目录 dfs.data.dir dfs.datanode.data.dir 新框架中 DataNode 增加更多细节配置,位于 dfs.datanode. 配置项下,如dfs.datanode.data.dir.perm(datanode local 目录默认权限);dfs.datanode.address(datanode 节点监听端口);等
  分布式文件系统数据块复制数 dfs.replication dfs.replication 新框架与老框架一致,值建议配置为与分布式 cluster 中实际的 DataNode 主机数一致
mapred-site.xml Job 监控地址及端口 mapred.job.tracker 新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapreduce.jobtracker.jobhistory 相关配置,
  第三方 MapReduce 框架 mapreduce.framework.name 新框架支持第三方 MapReduce 开发框架以支持如 SmartTalk/DGSG 等非 Yarn 架构,注意通常情况下这个配置的值都设置为 Yarn,如果没有配置这项,那么提交的 Yarn job 只会运行在 locale 模式,而不是分布式模式。
         
Yarn-site.xml The address of the applications manager interface in the RM Yarn.resourcemanager.address 新框架中 NodeManager 与 RM 通信的接口地址
  The address of the scheduler interface Yarn.resourcemanager.scheduler.address 同上,NodeManger 需要知道 RM 主机的 scheduler 调度服务接口地址
  The address of the RM web application Yarn.resourcemanager.webapp.address 新框架中各个 task 的资源调度及运行状况通过通过该 web 界面访问
  The address of the resource tracker interface Yarn.resourcemanager.resource-tracker.address 新框架中 NodeManager 需要向 RM 报告任务运行状态供 Resouce 跟踪,因此 NodeManager 节点主机需要知道 RM 主机的 tracker 接口地址

 

Hadoop Yarn 框架 Demo 示例

 

 

 参考:

Hadoop 新 MapReduce 框架 Yarn 详解 

更快、更强——解析Hadoop新一代MapReduce框架Yarn

MapReduce和YARN的关系 - Hadoop分布式数据分析平台   

hadoop2提交到Yarn: Mapreduce执行过程分析1    

 

hadoop MapReduce Yarn运行机制