首页 > 代码库 > Hadoop大数据处理读书笔记

Hadoop大数据处理读书笔记

几个关键性的概念

  1. 云计算:是指利用大量计算节点构成的可动态调整的虚拟化计算资源,通过并行化和分布式计算技术,实现业务质量可控的大数据处理的计算技术。
  2. NameNode:是HDFS系统中的管理者,它负责管理文件系统的命名空间,维护文件系统的文件树以及所有的文件和目录的元数据。这些信息存储在NameNode维护的两个本地磁盘文件:命名空间镜像文件和编辑日志文件。同时,NameNode中还保存了每个文件与数据块所在的DataNode的对应关系,这些信息被用于其他功能组件查找所需文件资源的数据服务器。
  3. SecondNameNode:与NameNode保持通信,按照一定时间间隔保存文件系统元数据的快照。当NameNode发生故障时,系统管理者可以通过手工配置的形式将保存的元数据快照恢复到重新启动的NameNode中,降低数据丢失的风险。
  4. DataNode:是HDFS中保存数据的节点。HDFS中的数据通常被分割为多个数据块,以冗余备份的形式存储在多个DataNode中。DataNode定期向NameNode报告其存储的数据块列表,以备使用者通过直接访问DataNode获得相应的数据。
  5. JobClient。JobClient是基于MapReduce接口库编写的客户端程序,负责提交MapReduce作业。
  6. JobTracker。是应用于MapReduce模块之间的控制协调者,它负责协调MapReduce作业的执行。当一个MapReduce作业提交到集群中,JobTracker负责确定后续执行计划,包括需要处理哪些文件、分配任务的Map和Reduce执行节点、监控任务的执行、重新分配失败的任务等。每个Hadoop集群中只有一个JobTracker。
  7. TaskTracker。TaskTracker负责执行由JobTracker分配的任务,每个TaskTracker可以启动一个或多个Map或Reduce任务。同时,TaskTracker和JobTracker间通过心跳(HeartBeat)机制保持通信,以维持整个集群的运行状态。
  8. MapTask,ReduceTask。是由TaskTracker启动的负责具体执行Map任务和Reduce任务的程序。
MapReduce任务的执行过程

  1. MapReduce程序启动一个JobClient实例以开启整个MapReduce作业(Job)
  2. JobClient通过getNewJobId()接口向JobTracker发出请求,以获得一个新的作业ID。
  3. JobClient根据作业请求指定的输入文件计算数据块的划分,并将完成作业需要的资源,包括JAR文件、配置文件、数据块,存放到HDFS中属于JobTracker的以作业ID命名的目录下,一些(如JAR文件)可能会以冗余备份的形式存放在多个节点上。
  4. 完成上述准备后,JobClient通过调用JobTracker的submitJob()接口提交此作业。
  5. JobTracker将提交的作业放入一个作业队列中等待进行作业调度以完成作业初始化工作。作业初始化主要是创建一个代表此作业的运行对象,作业运行对象中封装了作业包含的任务和任务运行状态记录信息用于后续跟踪相关任务的状态和执行进度。
  6. JobTracker还需要从HDFS文件系统中取出JobClient放好的输入数据,并根据输入数据创建对应数量的Map任务,同时根据JobConf配置文件中定义的数量生成Map任务。
  7. 在TaskTracker和JobTracker间通过心跳机制维持通信,TaskTracker发送的心跳消息中包含了当前是否可执行新的任务的信息,根据这个信息,JobTracker将Map任务和Reduce任务分配到空闲的TaskTracker节点。
  8. 被分配了任务的TaskTracker从HDFS文件系统中取出所需的文件,包括JAR程序文件和任务对应的数据文件,从存入本地磁盘,并启动一个TaskRunner程序实例准备运行任务。
  9. TaskRunner在一个新的Java虚拟机中根据任务类型创建出MapTask或ReduceTask进行运算。在新的Java虚拟机中运行MapTask和ReduceTask的原因是避免这些任务的运行异常影响TaskTracker的正常运行。MapTask和ReduceTask会定时与TaskRunner进行通信报告进度,直到任务完成。
基于云计算的大数据处理架构


基于云计算的大数据处理技术的应用
百度主要应用成果
HCE(Hadoop c++ Extension)——基于C++的MapReduce运行环境(解决了Java对内存管理的低效)
HDFS2——分布式NameNode实现(解决了NameNode压力大问题)
HDFS的透明压缩存储(解决HDFS系统中文件存储占用磁盘空间问题)
DISQL(Distributed SQL)——大数据分析语言
阿里巴巴
腾讯
华为
中国移动
三、MapReduce计算模式
MapReduce原理
Map:<k1,v1>→[<k2,v2>]
Reduce:<k2,[v2]>→[<k3,v3>]
注:<...>代表关键字/值数据对,[...]代表列表。
MapReduce处理过程图

MapReduce工作机制

  1. 作业提交:用户编写MapReduce程序创建新的JobClient实例(步骤1.1)。JobCl实例创建后,向JobTracker请求获得一个新的Jobld,用于标识本次MapReduce作业(步骤1.2)。然后JobClient检査本次作业指定的输入数据和输出目录是否正确。在检查无误后,JobClient将运行作业需要的相关资源,包括本次作业相关的配置文件、输入数据分片的数量,以及包含Mapper 和 Redllcer类的 JAR文件存入分布式文件存储系统中(步骤1.3),其中JAR文件将以多个备份的形式存放。完成以上工作后,JobClient向JobTracker发出作业提交请求 (步骤1.4)。
  2. 作业初始化:作为系统主控节点,JobTracker会收到多个JobClient发出的作业请求,因此JobTracker实现了一个队列机制处理多个请求。收到的请求会放入一个内部队列,由作业调度器进行调度。JobTracker为作业进行初始化工作(步骤2.1)。初始化的内容是创建一个代表此作业的JoblnProgress实例,用 于后续跟踪和调度此作业。JobTracker要从分布式文件存储系统中取出JobClient存放的输入数据分片信息(步骤2.2),以决定需要创建的Map任务数量,并创建对应的一批TaskInProgress实例用于监控和调度Map任务。而需要创建的Reduce任务数量和对应的TaskInProgress实例由配置文件中的参数决定。
  3. 任务分配:MapReduce框架中的任务分配机制是采用"拉"(pull)的机制实现的。 在任务分配之前,负责执行Map任务或Reduce任务的TaskTracker节点均已经启动。 TaskTracker —直通过RPC向JobTracker发送心跳消息询问有没有任务可做(步骤3)。如果JobTracker的作业队列不为空,则TaskTracker发送的心跳消息将会获得JobTracker给它派发的任务。由于TaskTracker节点的计算能力(由内核数量和内存大小决定)是有限的, 因此每个TaskTracker节点可运行Map任务和Reduce任务的数量也是有限的,即每个 TaskTracker有两个固定数量的任务槽,分别对应Map任务和Reduce任务。在进行任务分 配时,JobTracker优先填满TaskTracker的Map任务槽,即只要有空闲Map任务槽就分配 一个Map任务,Map任务槽满了后才分配Reduce任务。
  4.  Map任务执行:在MapTaskTracker节点收到JobTracker分配的Map任务后, 将执行一系列操作以执行此任务。首先,创建一个Tasklnprogress对象实例以调度和监控任务。然后将作业的JAR文件和作业的相关参数配置文件从分布式文件存储系统中取出,并复制到本地工作目录下 (JAR文件中的内容需经过解压)(步骤4.1)。 完成这些准备工作后,TaskTracker新建一个TaskRunner实例来运行此Map任务(步骤4.2 )。 TaskRunner将启动一个单独的JVM, 并在其中启动MapTask执行用户指定的map函数(步骤4.3)。使用单独的JVM运行MapTask的原因是为了避免MapTask的异常影响 TaskTracker的正常运行。MapTask计算获得的数据,定期存入缓存中(步骤4.4), 并在缓存满的情况下存入本地磁盡中(步骤 4.5)。在任务执行时,MapTask定时与 TaskTracker通信报告任务进度(步骤4.6),直到任务全部完成,此时所有的计算结果会存入本地磁盘中。
  5. Reduce任务执行:在部分Map任务执行完成后,JobTracker即将按照上面第3步同样的机制开始分配Reduce任务到Reduce TaskTracker节点中(步骤5.1)。与Map任务启动过程类似,Reduce TaskTracker同样会生成在单独JVM中的ReduceTask以执行用户指定的Reduce函数(步骤5.2、步骤5.3)。同时ReduceTask会开始从对应的Map TaskTracker 节点中远程下载中间结果的数据文件(步骤5 4)。直到此时,Reduce任务还没有真正开始执行,而仅仅是做好运行环境和数据的准备工作。只有当所有Map任务执行完成后, JobTracker才会通知所有Reduce TaskTracker节点开始Reduce任务的执行。同样, ReduceTask定时与TaskTracker通信报告任务进度,直到任务全部完成(步骤5.6)。
  6. 作业完成:在Reduce阶段执行过程中,每个ReduceTask会将计算结果输出到分布式文件存储系统中的临时文件(步骤5.5)。ReduceTask完成时,这些临时文件会合并为一个最终输出结果文件。JobTracker在收到作业包含的全部任务的完成通知后(通过每个TaskTraeker与JobTracker间的心眺消息),会将此作业的状态设置为"完成"。当此后的JobCient的第一个状态轮询请求到达时,将会获知此作业已经完成(步骤6.1),于是JobClient会通知用户程序整个作业完成并显示必要的信息(步骤6.2)。
作业调度:
  1. 先进先出调度器:默认的调度器,是支持优先级的先进先出调度器。不支持资源抢占。
  2. 公平调度器。公平调度器的设计目标是支持系统的所有用户可以公平的共享集群的计算能力。
  3. 能力调度器。采用多队列的形式组织集群中的计算资源,这些队列可以采用层次结构连接在一起。
  4. 自定义的调度器
异常处理:
  • 任务异常(两种状态:失败和终止。任务出现异常后,TaskTracker会将此任务的失败信息报告给JobTracker,JobTracker会分配新的节点执行此任务。)
失败:
  1. 用户编写的map或redeuce函数中的代码不正确
  2. 任务所在的JVM出现运行异常,这通常也是某种代码异常导致的。
  3. 任务进度更新超时。
终止:
  1. 备份任务时,MapReduce框架为了避免某个未失败但执行缓慢的任务影响整个作业的执行速度而设计了备份任务机制。
  2. 当TaskTracker出现异常无法运行时。
  3. 用户通过命令行或Web页面手工终止或取消执行任务时。
  • TaskTracker异常(如果JobTracker超过最大时间间隔没有收到TaskTracker的心跳消息时,则认为TaskTracker出现了异常。已完成的任务会正常返回,未完成的任务则重新分配TaskTracker节点重新执行。为了避免TaskTracker异常反复出现,MapReduce框架设定了黑名单机制。
  • JobTracker异常(目前还没有能应对JobTracker异常的机制)
MapReduce应用开发流程

多个MapReduce过程的组合模式
  • 作业链(按顺序执行)——JobClient.runJob
JobClient.runJob(config1);
JobClient.runJob(config2);
  • 作业图(DAG)——jobControl
jobMapReduce3.addDependingJob(jobMapReduce1);
jobMapReduce3.addDependingJob(jobMapReduce2);
jobControl.addJob(jobMapReduce1);
jobControl.addJob(jobMapReduce2);
jobControl.addJob(jobMapReduce3);
jobControl.run();
  • Map/Reduce链——ChainMaper和ChainReducer
  1. 计数(Counting)——计算每条记录某个属性的函数表达式值,例如求和、平均值等。
  2. 分类(Classfication)——在给定一计算函数的情况下,将通过此计算函数计算后饿到结果值相同的实体放在一起(或进行后续处理)
  3. 过滤处理(Filtering)——将符合某个条件的记录取出,或者进行格式转换。
  4. 排序(Sorting)——将记录按照一定规则排序后输出。
  5. 去重计数(Distinct Counting)——在计算某几个属性组合后去掉相同的重复组合后,求其中某一属性的统计值
  6. 相关计数(Cross-Correlation)——计算数组中记录以一定条件要求成对出现的次数或概率。
MapReduce算法实践
  1. 最短路径算法(Dijkstra算法)
  2. 反向索引算法
  3. PageRank算法
MapReduce性能调优


map函数在执行时,输出数据首先是保存在缓存中,这个缓存的默认大小是100MB,由参数io.sort.mb来控制。当缓存使用量达到一定比例时,缓存中的数据将写入磁盘中,这个比例是由参数io.sort.spill.percent控制。缓存中的数据每次输出到磁盘时会生成一个临时文件,多个临时文件合并后生成一个map输出文件,参数io.sort.factor制定最多可以有多少个临时文件被合并到输出文件中。性能调优参数:
参数类型默认值说明
Map阶段   
io.sort.mbint100map输出的缓存大小,单位为MB
io.sort.spill.percentfloat0.8map输出缓存占用超过此比例将开始写入磁盘
io.sort.factorint10合并多个临时输出文件的数量,可增大
min.num.spills.for.combineint3输出临时文件达到此数量时会执行一次combine操作
tasktracker.http.threadsint40tasktracker可用于输出map文件的http线程数
Reduce阶段   
mapred.reduce.parallel.copiesint5可读取多个map输出的线程数
mapred.reduce.copy.backoffint300reduce读取map输出的失败超时时间,s为单位
io.sort.factorint10处理之前合并输入文件的最大数量
mapred.job.shuffle.input.buffer.percentfloat0.7存储map输出数据的缓存占整个内存的比例
mapred.job.shuffle.merge.percentfloat0.6存储map输出数据的缓存的占用比例阀值,超过则存入磁盘
mapred.inmem.merge.thresholdint1000当map输出文件超过此数量时,进行合并并存入磁盘
mapred.job.reduce.input.buffer.percentfloat0.0在reduce节点的内存中保持map输出数据的缓冲占整个内存的百分比,增大可以减少磁盘读写
mapred.child.java.optsint200map或reduce任务可使用的内存大小,默认为200MB,可适当增大
io.file.buffer.sizeint4096进行磁盘I/O操作的是缓存大小,默认4kb,可提高为64kb或128kb

使用Combiner减少数据传输
启用数据压缩(DefaultCodec:zlib;GzipCodec:Gzip;BZip2Codec:bzip2)
参数类型默认值说明
mapred.compress.map.outputBooleanfalse是否启用map输出压缩
mapred.map.output.compression.codecClass nameorg.apache.hadoop.io.compress.DefaultCodecmap输出压缩类
使用预测执行功能:在任务执行过程中,hadoop会检测所有任务的进度和完成情况,当出现某个任务执行进程远慢于整个系统平均进度时,hadoop会将在另一个节点上启动一个相同的备份任务,并与原始任务并行执行。当原始任务和备份任务的其中一个完成时,另一个任务被终止。
参数类型默认值说明
mapred.map.tasks.speculative.executionBooleantrue是否启用map任务的预测执行机制
mapred.reduce.tasks.speculative.executionBooleantrue是否启用reduce任务的预测执行机制

重用JVM:默认设置,每个JVM只可以单独运行一个Task进程,其主要目的是避免某个任务的崩溃影响其他任务或整个TaskTracker的正常执行。但MapReduce框架也可以允许一个JVM运行多个任务。设置参数mapred.job.reuse.jvm.num.tasks,或调用JobConf类的setNumTasksToExecutePerJvm接口,当设置为-1时,任务数量将没有限制。对于一些map函数初始化简单却执行频繁的作业,可以考虑。

分布式文件系统应满足:

  1. 使用大量低成本构建的分布式运行环境
  2. 可以应对大量并发用户访问
  3. 能够处理超乎寻常的文件大小
  4. 提供足够大的系统吞吐量

数据读取过程


  1. client生成一个HDFS类库中的DistributedFileSystem对象实例,并使用此实例的 open()接口打开一个文件。 
  2. DistributedFileSystem通过RPC向NameNode发出请求,以获得文件相关的数据块位置信息。NameNode将包含此文件相关数据块所在的 DataNode 地址,经过与 Client 相关的距离(参见4.4.2节性能优化)进行排序后,返回给DistributedFilesystem。
  3. DistributedFilesystem在获得数据块相关信息后,生成一个FSDatalnputStream对象实例返回给client。 此实例封装了一个DFSInputStream对象,负责存储数据块信息及 DataNode地址信息,并负责后续的文件内容读取处理。 
  4. Client向FSDataInputStream发出读取数据的read()调用。 
  5. 在收到read()调用后,FSDatalnputStream封装的DFSInputstream选择第一个数据块的最近的DataNode, 并读取相应的数据信息,返回给client。在数据块读取完成后, DFSInputStream负责关闭到相应DataNode的链接。
  6. DFSInputStream将持续选择后续数据块的最近DataNode节点,并读取数据返回给client, 直到最后一个数据块读取完成。
  7. 当client读取完所有数据后,将调用FSDatalnputstream的close()接口结束本次文件读取操作。
  • 当读取某个DataNode出现故障时,DFSInputStream将选取下一个包含此数据块的最近的DataNode。
数据写入过程
  1. client生成一个HDFS类库中的DistributedFileSystem对象实例,并使用此实例的 create()接口打开创建一个文件。 
  2. DistributedFileSystem通过RPC向NameNode发出创建文件请求,NameNode在确认此文件没有重名文件,且Client有写入权限后,在命名空间中创建此文件的对应记录。在此过程中如果出现异常,NameNode将返回IOException。
  3. DistHbutedFilesystem在获得NameNode的成功返回后,生成一个FSDataoutputStream对象实例返回给client。此实例封装了一个DFSOutputstream对象,负责后续的文件内容写入处理。 
  4. Client向FSDataInputStream发出写入数据的write()调用及需要写入文件的数据。 DFSOutputStream在收到数据后会将数据拆分后放入一个数据队列。 
  5. Datastreamer负责从数据队列中不断取出数据,准备写入DataNode中。但在写入 之前,DataStreamer需要从NameNode请求分配一些存放数据的数据块信息以及适合存放这些数据块的DataNode地址。
  6. 对于每个数据块,NameNode会分配若干个DataNode以复制存储数据块,例如要将数据块2存入3个DataNode节点。Datastreamer会将数据块写入第一个DataNode, 这个DataNode会将数据传给第二个(步骤6.1),第二个传给第三个(步骤6.2),以完成整 个DataNode链的数据写入。
  7. 每个DataNode完成写入后,会向Datastreamer报告已完成(步骤7、步骤7.1和 步骤7.2),同时向NameNode报告自己完成了一个数据块的写人(步骤7.3)。步骤6和步骤7会循环执行,直到所有数据块写入完成.
  8. 当Client完成所有数据写入后,将调用FSDataInputStream的close()接口结束本次文件写入操作。
  • 当某个DataNode出现故障写入失败,那么故障节点将从DataNode链中删除,NameNode会分配另一个DataNode完成此数据块的写入,只要有一个写入成功,本次操作也被视为完成。
基于命令行的文件管理
  • hadoop fs -cmd<args>
通过API操作文件
  1. JAVA
  2. C/C++
  3. HTTP

HDFS性能优化

  1. 调整数据块尺寸
  2. 规划网络与节点
  3. 调整服务队列数量
  4. 预留磁盘空间
  5. 存储平衡(start-balancer.sh)
  6. 根据节点功能优化磁盘配置
  7. 其他参数
HDFS小文件存储问题

(未完待续)