首页 > 代码库 > Hadoop之 - 剖析 MapReduce 作业的运行机制(MapReduce 2)

Hadoop之 - 剖析 MapReduce 作业的运行机制(MapReduce 2)

在0.20版本及更早期的系列中,mapred.job.tracker 决定了执行MapReduce程序的方式。如果这个配置属性被设置为local(默认值),则使用本地的作业运行器。运行器在耽搁JVM上运行整个作业。它被设计用来在小的数据集上测试和运行MapReduce程序。


如果 mapred.job.tracker 被设置为用冒号分开的主机和端口对(主机:端口),那么该配置属性就被解释为一个jobtracker地址,运行器则将作业提交给该地址的jobtracker。


Hadoop 2.x引入了一种新的执行机制。这种新机制(MapReduce 2)建立在一个名为YARN的系统上。目前,用于执行的框架通过 mapreduce.framework.name 属性进行设置,值local表示本地的作业运行器,“classic”表示经典的 MapReduce 框架(也称MapReduce 1,它使用一个jobtracker和多个tasktracker),yarn表示新的框架。


YARN (MapReduce 2)

对于节点数超出4000的大型集群,MapReduce 1的系统开始面领着扩展性的瓶颈。在2010年雅虎的一个团队开始设计下一代的MapReduce.由此,YARN(Yet Another Resource Negotiator的缩写或者为 YARN Application Resource Neforiator的缩写)应运而生。


YARN 将 Jobtracker 的职能划分为多个独立的实体,从而改善了“经典的”MapReduce 面临的扩展瓶颈问题。Jobtracker负责作业调度和任务进度监视、追踪任务、重启失败或过慢的任务和进行任务登记,例如维护计数器总数。


YARN 将这两种角色划分为两个独立的守护进程:管理集群上资源使用的资源管理器和管理集群上运行任务生命周期的应用管理器。基本思路是:应用服务器与资源管理器协商集群的计算资源:容器(每个容器都有特定的内存上限),在这些容器上运行特定应用程序的进程容器由集群节点上运行的节点管理器监视,以确保应用程序使用的资源不会超过分配给它的资源。

资源管理器:即resource manager,RM,负责管理所有应用程序计算资源的分配。

应用管理器:即application master,AM,每一个应用程序的AM负责相应的调度和协调。

容器:即containers,YARN为将来的资源隔离而提出的框架,每一个任务对应一个Container,且只能在该container中运行。

节点监视器:即node manager,管理每个节点上的资源和任务,主要有两个作用:定期向RM汇报该节点的资源使用情况和各个container的运行状态;接收并处理AM的任务启动、停止等请求。


与jobtracker不同,应用的每个实例(这里指一个MapReduce作业)有一个专用的应用master(application master),它运行在应用的运行期间。这种方式实际上和最初的Google的MapReduce论文里介绍的方法很相似,该论文描述了master进程如何协调在一组worker上运行的map任务和reduce任务。


如前所述,YARN比MapReduce更具一般性,实际上MapReduce只是YARN应用的一种形式。有很多其他的YARN应用(例如能够在集群中的一组节点上运行脚本的分布式shell)以及其他正在开发的程序。 YARN设计的精妙之处在于不同的YARN应用可以在同一个集群上共存。例如,一个MapReduce应用可以同时作为MPI应用运行,这大大提高了可管理性和集群的利用率。


此外,用户甚至有可能在同一个YARN集群上运行多个不同版本的MapReduce,这使得MapReduce升级过程更容易管理。注意,MapReduce的某些部分(比如作业历史服务器和shuffle处理器)以及YARN本身仍然需要在整个集群上升级。


YARN上的MapReduce比经典的MapReduce包括更多的实体:

  • 提交MapReduce作业的客户端

  • YARN资源管理器(resource manager),负责协调集群上计算资源的分配

  • YARN节点管理器(node manager),负责启动和监视集群中机器上的计算容器(container)

  • MapReduce 应用程序master,负责协调运行MapReduce作业的任务。它和MapReduce任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理

  • 分布式文件系统(一般为HDFS),用来与其他实体见共享作业文件


作业的运行过程如下图所示,下面一一具体描述。

技术分享

技术分享

动图展示流程

  1. 作业提交

    MapReduce 2中的作业提交时使用与MapReduce 1相同的用户API(步骤1)。MapReduce 2实现了ClientProtocol,当 mapreduce.framework.name 设置为yarn时启动。提交的过程与经典的非常相似。从资源管理器(而不是jobtracker)获取新的作业ID,在YARN命名法中它是一个应用程序ID(步骤2)。作业客户端检查作业的输出说明,计算输入分片(虽然有选项yarn.app.mapreduce.am.compute-splits-in-cluster在集群上来产生分片,这可以使具有多个分片的作业从中受益)并将作业资源(包括作业JAR、配置和分片信息)复制到HDFS(步骤3)。最后,通过调用资源管理器上的submitApplication()方法提交作业(步骤4)。          


  2. 作业初始化

    资源管理器收到调用它的 submitApplication() 消息后,便将请求传递给调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动应用程序的master进程(步骤5a 和 5b)


    MapReduce作业的application master 是一个Java应用程序,它的主类是MRAppMaster。它对作业进行初始化:通过创建多个簿记对象以保持对作业进度的跟踪,因为它将接受来自任务的进度和完成报告(步骤6)。接下来,它接受来自共享文件系统的在客户端计算的输入分片(步骤7)。对每一个分片创建一个map任务对象以及由 mapreduce.job.reduces 属性确定的多个reduce任务对象。


    接下来,application master 决定如何运行构成MapReduce作业的各个任务。如果作业很小,就选择在与它同一个JVM上运行任务。


    相对于在一个节点上顺序运行它们,判断在新的容器中分配和运行任务的开销大于并行运行它们的开销时,就会发生这一情况。这不同于MapReduce 1,MapReduce 1 从不在单个tasktracker上运行小作业。这样的作业称为uberized,或者作为uber任务运行。


    uber运行默认对小作业进行优化,不会给每个任务分别神器分配Contianer资源,这些小任务将统一在一个container中按照先执行map任务后执行reduce任务的顺序串执行。


    哪些任务是小任务呢? 默认情况下,小任务就是小鱼10个mapper且只有1个reducer且输入大小小于一个HDFS块的任务。(通过设置mapreduce.job.ubertask.maxmapsmapreduce.job.ubertask.maxreducesmapreduce.job.ubertask.maxbytes 可以改变一个作业的上述值。)将 mapreduce.job.ubertask.enable 设置为 false 也可以完全使uber任务不可用。


    在任何任务运行之前,作业的setup方法为了设置作业的 OutputCommitter 被调用来建立作业的输出目录。在MapReduce 1中,它在一个由 tasktracker 运行的特殊任务中被调用,而在YARN执行框架中,该方法由应用程序master直接调用。


  3. 任务分配

    如果作业不适合作为uber任务运行,那么 application master 就会为该作业中的所有map任务和reduce任务向资源管理器请求容器(步骤8)。附着心跳信息的请求包括每个map任务的数据本地化信息,特别是输入分片所在的主机和相应机架信息。调度器使用这些信息来做调度策略(像jobtracker的调度器一样)。理想情况下,它将任务分配到数据本地化的节点,但如果不可能这样做,调度器就会相对于非本地化的分配有限使用机架本地化的分配。


    请求也为任务指定了内存需求。在默认情况下,map任务和reduce任务都分配到1024MB的内存,但这可以通过 mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb 来设置。


    内存的分配方式不同于MapReduce 1,后者中tasktrackers 有在集群配置时设置的固定数量的槽,每个任务在一个槽上运行。槽有最大内存分配限制,这对集群是固定的,导致当任务使用较少内存时无法充分利用内存(因为其他等待的任务不能使用这些未使用的内存)以及由于任务不能获取足够内存而导致作业失败。


    在YARN中,资源分为更细的粒度,所以可以避免上述问题。具体而言,应用程序可以请求最小到最大限制范围内的任意最小值倍数的内存容量。默认的内存分配容量是调度器特定的,对于容量调度器,它的默认值最小值是1024MB(由 yarn.sheduler.capacity.minimum-allocation-mb 设置),默认的最大值是10240MB(由 yarn.sheduler.capacity.maximum-allocation-mb 设置)。因此,任务可以通过适当设置 mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb 来请求1GB到10GB间的任务1GB倍数的内存容量(调度器在需要的时候使用最接近的倍数)。


  4. 任务执行

    一旦资源管理器的调度器为任务分配了容器,application master 就通过与节点管理器通信来启动容器(步骤9a 和 9b)。该任务由主类 YarnChild 的Java应用程序执行,在它运行任务之前,首先将任务需要的资源本地化,包括作业的配置、JAR文件 和 所有来自分布式缓存的文件(步骤10)。最后,运行map任务或reduce任务(步骤11)。


    Streaming 和 Pipes程序以MapReduce 1的方式运行。YarnChild 启动Streaming 或 Pipes进行,并通过分别使用标准的输入/输出或套接字与它们通信,child和子进程在节点管理器上运行,而非tasktracker。


  5. 进度和状态更新

    在 YARN 下运行时,任务每3瞄准通过umbilical接口向 application master 汇报进度和状态(包含计数器),作为作业的汇聚视图(aggregate view)。这个过程如下图所示。相比之下,MapReduce 1 通过tasktracker 到 jobtracker 来实现进度更新。

    技术分享


    客户端每秒钟(通过 mapreduce.client.progressmonitor.pollinterval 设置)查询一次 application master 以接收进度更新,通常都会向用户显示。


    在MapReduce 1中,作业跟踪器的Web UI展示运行作业列表及进度。在 YARN 中个,资源管理器的 Web UI(默认8088端口)展示了正在运行的应用以及连接到的对应 application master,每个 application master 展示MapReduce作业的进度等进一步的细节


  6. 作业完成

    除了向 application master 查询进度外,客户端每5秒钟还通过调用Job的 waitForCompletion() 来检查作业是否完成。查询的间隔可以通过 mapreduce.client.completion.pollinterval 属性进行设置。


    注意,通过 HTTP 回调 (callback)来完成作业也是支持的,就像在 MapReduce 1中一样,然而在MapReduce 2中,回调是由 application master 初始化。


    作业完成后,application master 和任务容器清理其工作状态,OutputCommiter 的作业清理方法会被调用。作业历史服务器保存作业的信息供用户需要时查询。



本文出自 “Professor哥” 博客,转载请与作者联系!

Hadoop之 - 剖析 MapReduce 作业的运行机制(MapReduce 2)