首页 > 代码库 > Hadoop Day3

Hadoop Day3

Hadoop Day3

  1. MapReduce原理(****理解***)
  • 思考:怎样解决海量数据的计算?

技术分享

 

  • MapReduce概述

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

 

  • MapReduce由两个阶段组成:
      • map():任务分解
      • reduce():结果汇总

这两个函数的形参是key、value对,表示函数的输入信息。

  • 数据类型与格式

 

map、reduce键值对格式:

函数

输入键值对

输出键值对

Map()

<K1,V1>

<K2,V2>

Reduce()

<K2,{V2}>

<k3,V3>

 

 

 

 

 

 

 

 

 

2.MapReduce执行过程概要描述***必须理解掌握****

  • 分析MapReduce总体执行过程(****理解***

为了更简单的理解MapReducer的执行过程,先对它来个总体的过程图如下:

技术分享

 

概要说明:

MapReduce运行的时候,通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的map方法,处理数据,最后输出。Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法reduce,最后输出到HDFS的文件中。

 

  • Mapper任务的执行过程(***理解**** 

每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我们覆盖的map方法处理后,转换为很多的键值对再输出。整个Mapper任务的处理过程又可以分为以下几个阶段,如图所示:

技术分享

 

 

 

 

  1. 第一阶段是把输入文件照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件两个,一个是32MB,一个是72MB,那么小的文件是一个输入片,大文件会分为两个数据块64MB和8MB,一共产生三个输入片。每一个输入片由一个Mapper进程处理。这里的个输入片,会个Mapper进程处理。

 

  1. 第二阶段是对输入片中的记录照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。”是每一行的起始位置(单位是字节),“值”是本行的文本内容

 

  1. 第阶段是调用Mapper类中的map方法。第二阶段中解析出来的每一个键值对,调用一次map方法。如果1000个键值对,就会调用1000次map方法。每一次调用map方法会输出零个或者多个键值对。

 

  1. 第四阶段是照一定的规则对第三阶段输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只一个Reducer任务。

 

  1. 第五阶段是对每个分区中的键值对进行排序。首先,照键进行排序,对于键相同的键值对,照值进行排序。比如个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果第六阶段,那么进入第六阶段;如果没,直接输出到本地的linux文件中。

 

  1. 第六阶段是对数据进行归约处理,也就是reduce处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linux文件中。本阶段默认是没的,需要用户自己增加这一阶段的代码。

 

  • Reducer任务的执行过程(***理解****

每个Reducer任务是一个java进程。

Reducer任务接收Mapper任务的输出,归约处理后写入到HDFS中,可以分为如下图所示的几个阶段。

 

技术分享

 

 

 

 

  1. 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会很多,因此Reducer会复制多个Mapper的输出。

 

  1. 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。

 

  1. 第阶段是对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。

 

注意:在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数。

  1. MapReduce例子

例子说明:统计hdfs文件系统中的words.txt单词个数。

vim words.txt

hadoop fs –put /root/words.txt /

hadoop fs –ls /

  • 导入相关的jar

导入${HADOOP-HOME}/share/hadoop/mapreduce红色中部分的包。

技术分享

 

 

和导入${HADOOP-HOME}/share/hadoop/mapreduce/lib/*

技术分享

 

在客户端创建

  • 创建WcMaper.class

public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

       @Override

       protected void map(LongWritable key, Text value,

                       Mapper<LongWritable, Text, Text, LongWritable>.Context context)

                       throws IOException, InterruptedException {

               //接收数据

               String line = value.toString();

               //切分数据

               String [] words = line.split(" ");

               //循环

               for(String w : words){

                       //出现一次,记个1,输出

                       context.write(new Text(w), new LongWritable(1));

               }

       }

}

 

  • 创建WcReducer 

 

public class WcReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

       @Override

       protected void reduce(Text key, Iterable<LongWritable> v2s,

                       Reducer<Text, LongWritable, Text, LongWritable>.Context context)

                       throws IOException, InterruptedException {

                       //定义一个计算器

                       long counter = 0;

                       //循环v2s

                       for (LongWritable v : v2s) {

                               counter +=v.get();

                       }

                       //输出

                       context.write(key, new LongWritable(counter));

       }

}

 

  • 创建WordCount 

 

public class WordCount {

       public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

               

               Job job = Job.getInstance(new Configuration());                            

    //注意:加载main方法所在的类

job.setJarByClass(WordCount.class);

               //设置MapperReducer的类

               job.setMapperClass(WcMapper.class);

               job.setReducerClass(WcReducer.class);

 

               FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.0.2:9000/words.txt"));

               FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.2:9000/output"));

//设置输入和输出的相关属性

               job.setOutputKeyClass(Text.class);

               job.setOutputValueClass(LongWritable.class);

               

               job.waitForCompletion(true);

       }

 

}

  • 打包成jar file 上传到服务器运行测试。

在项目中点击右键“export...”,如下图所示

技术分享

 

点击“next”,jar file导出的位置。

技术分享

 

点击“next”再点击“next”main函数的类后点ok ,完毕!

 

技术分享

 

 

wc.jar上传到服务器测试。

scp –r /root/wcRunner.jar 192.168.174.21:/root/

技术分享

 

删除最后两行

hadoop jar /root/wcRunner.jar

hadoop fs –ls /

也可以直接在本地模式下运行,需要把yarn下的所jar包导入工程,

 

技术分享

 

需在输入,输出里加入前缀“file://

 

  • Hadoop的运行模式

Hadoop的运行模式以下种:
牋(1独立模式(即本地模式(standalone或local mode
牋牋? 无需运行任何守护进程(daemon,所程序都在单个JVM上执行。由于在本机模式下测试和调试MapReduce程序较为方便,因此,这种模式适宜用在开发阶段。
牋(2伪分布模式(pseudo-distributed model
牋牋? Hadoop守护进程运行在本地机器上,模拟一个小规模的集群。
牋(3全分布模式(fully distributed model
牋牋? Hadoop守护进程运行在一个集群上,此模式以后会详细介绍。
在特定模式下运行Hadoop需要关注两个因素:正确设置属性和启动Hadoop守护进程。

下表列举了配置各种模式所需要的最小属性集合:

 

组件名称 

属性名称 

独立模式 

伪分布模式 

全分布模式 

Common 

fs.default.name 

file:///(默认 

hdfs://localhost/ 

hdfs://namenode/ 

HDFS 

dfs.replication 

N/A 

3(默认 

MapReduce 

mapred.job.tracker 

local(默认 

localhost:8021 

namenode:8021 

 

牋?在独立模式下,将使用本地文件系统和本地MapReduce作用运行期;在分布式模式下,将启动HDFS和MapReduce守护进程。

 

 

 

4.深入MapReduce执行过程细节源码分析

  • MapReduce流程图解

技术分享

 

 

 

MR执行流程

       (1).客户端提交一个mrjar包给JobClient(提交方式:hadoop jar ...)

       (2).JobClient通过RPCJobTracker进行通信,返回一个存放jar包的地址(HDFSjobId

       (3).clientjar包写入到HDFS当中(path = hdfs上的地址 + jobId)

       (4).开始提交任务(任务的描述信息,不是jar, 包括jobidjar存放的位置,配置信息等等)

       (5).JobTracker进行初始化任务

       (6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask

       (7).TaskTracker通过心跳机制领取任务(任务的描述信息

       (8).下载所需的jar,配置文件等

       (9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTaskReducerTask

       (10).将结果写入到HDFS当中

  • 细节图解

技术分享

 

 

  • MapReduce过程各个角色的作用:

备注:

  • JobTracker
  1. 负责接收用户提交的作业,负责启动、跟踪任务执行。
  2. JobSubmissionProtocolJobClientJobTracker通信的接口。
  3. InterTrackerProtocolTaskTrackerJobTracker通信的接口。
  • TaskTracker:负责执行任务。
  • JobClient
  1. 是用户作业与JobTracker交互的主要接口。
  2. 负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。

 

 

  • jobClient:提交作业
  • 提交作业之前,需要对作业进行配置
  1. 编写自己的MR程序
  2. 配置作业,包括输入输出路径等等
  • 提交作业

配置完成后,通过JobClient提交

  • 具体功能
  1. JobTracker通信得到一个jar的存储路径和JobId
  2. 输入输出路径检查
  3. job jar拷贝到的HDFS
  4. 计算输入分片,将分片信息写入到job.split
  5. job.xml
  6. 真正提交作业
  • JobTracker:初始化作业,分配作业,TaskTracker与其进行通信,协调监控整个作业
  • 初始化作业
  1. 客户端提交作业后,JobTracker会将作业加入到队列,然后进行调度,默认是FIFO方式
  2. 具体功能
  1. 作业初始化主要是指JobInProgress中完成的
  2. 读取分片信息
  3. 创建task包括MapReduce任创建task包括MapReduce任务
  4. 创建TaskInProgress执行task,包括map任务和reduce任务
  • 任务分配

 

  • TaskTracker:定期与JobTracker通信,执行MapReduce任务
  • 任务分配
  1. TaskTrackerJobTracker之间的通信和任务分配是通过心跳机制实现的
  2. TaskTracker会主动定期向JobTracker发送心态信息,询问是否任务要做,如果,就会申请到任务。
  • 任务执行
  1. 如果TaskTracker拿到任务,会将所的信息拷贝到本地,包括代码、配置、分片信息等
  2. TaskTracker中的localizeJob()方法会被调用进行本地化,拷贝job.jarjobconfjob.xml到本地
  3. TaskTracker调用launchTaskForJob()方法加载启动任务
  4. MapTaskRunnerReduceTaskRunner分别启动java child进程来执行相应的任务
  • 状态更新
  1. Task会定期向TaskTraker汇报执行情况
  2. TaskTracker会定期收集所在集群上的所Task的信息,并向JobTracker汇报
  3. JobTracker会根据所TaskTracker汇报上来的信息进行汇总

 

  • HDFS:保存作业的数据、配置、jar包、结果
    • 作业完成
  1. JobTracker是在接收到最后一个任务完成后,才将任务标记为成功
  2. 将数结果据写入到HDFS
    • 错误处理
  • JobTracker失败

存在单点故障,hadoop2.0解决了这个问题

  • TraskTracker失败
  1. TraskTracker崩溃了会停止向JobTracker发送心跳信息。
  2. JobTracker会将TraskTracker从等待的任务池中移除,并将该任务转移到其他的地方执行
  3. JobTrackerTaskTracker加入到黑名单中
  • Task失败
  1. 任务失败,会向TraskTracker抛出异常
  2. 任务挂起

 

 

 

 

 

 

 

 

 

 

 

 

 

思考题与练习

  1. MapReduce框架的结构是什么
  2. Map在整个MapReduce框架中作用是什么
  3. Reduce在整个MapReduce框架中作用是什么

 

<style>body { margin: 15px 15px 15px 15px; background-color: #d7c6da } hr { color: #000000 } body,table { font-size: 9pt; font-family: "宋体"; font-style: normal; font-weight: normal; color: #000000; text-decoration: none } a.rvts1,span.rvts1 { color: #0000ff; text-decoration: underline } span.rvts2 { font-size: 10pt; font-family: "Times New Roman", "Times", serif } span.rvts3 { font-size: 24pt; font-family: "????"; font-weight: bold } span.rvts4 { font-size: 16pt; font-family: "Arial", "Helvetica", sans-serif; font-weight: bold } span.rvts5 { font-size: 16pt; font-family: "????"; font-weight: bold } span.rvts6 { font-size: 16pt; font-family: "????"; font-weight: bold; color: #ff0000 } span.rvts7 { font-size: 16pt; font-family: "Times New Roman", "Times", serif; font-weight: bold; color: #ff0000 } span.rvts8 { font-size: 12pt; font-family: "????" } span.rvts9 { font-size: 16pt; font-family: "Arial", "Helvetica", sans-serif; font-weight: bold; color: #ff0000 } span.rvts10 { font-size: 10pt; font-family: "????"; font-weight: bold } span.rvts11 { font-size: 10pt; font-family: "????" } span.rvts12 { font-size: 10pt; font-family: "????"; color: #ff0000 } span.rvts13 { font-size: 16pt; font-family: "Times New Roman", "Times", serif; font-weight: bold } span.rvts14 { font-size: 10pt; font-family: "????"; color: #ff0000; background-color: #d9d9d9 } span.rvts15 { font-size: 10pt; font-family: "????"; font-weight: bold; color: #ff0000 } span.rvts16 { font-size: 10pt; font-family: "????"; font-weight: bold; color: #000000 } span.rvts17 { font-size: 10pt; font-family: "Times New Roman", "Times", serif; font-weight: bold; color: #000000 } span.rvts18 { font-size: 10pt; font-family: "Times New Roman", "Times", serif; color: #ff0000 } span.rvts19 { font-size: 12pt; font-family: "????"; font-weight: bold; color: #ff0000 } span.rvts20 { font-size: 12pt; font-family: "Times New Roman", "Times", serif; font-weight: bold; color: #ff0000 } span.rvts21 { font-size: 10pt; font-family: "Times New Roman", "Times", serif; font-weight: bold } span.rvts22 { font-size: 12pt; font-weight: bold; color: #000000 } p,ul,ol { text-align: left; text-indent: 0px; padding: 0px 0px 0px 0px; margin: 0px 0px 0px 0px } .rvps1 { text-align: center } .rvps2 { margin: 104px 0px 0px 0px } .rvps3 { text-align: justify } .rvps4 { text-align: center; margin: 7px 0px 7px 0px } .rvps5 { text-align: justify; line-height: 1.72; page-break-inside: avoid; page-break-after: avoid; margin: 17px 0px 17px 0px } .rvps6 { text-align: justify; text-indent: -28px; line-height: 1.72; page-break-inside: avoid; page-break-after: avoid; margin: 17px 0px 17px 28px } .rvps7 { text-align: justify; margin: 0px 0px 0px 28px } .rvps8 { text-align: justify; text-indent: -28px; margin: 0px 0px 0px 56px } .rvps9 { text-align: justify; text-indent: -28px; margin: 0px 0px 0px 84px } .rvps10 { text-align: justify; text-indent: 28px; margin: 0px 0px 0px 28px } .rvps11 { text-align: justify; text-indent: 28px } .rvps12 { text-indent: 28px; line-height: 24px; background: #ffffff; margin: 15px 0px 5px 0px } .rvps13 { margin: 7px 0px 7px 0px } .rvps14 { text-align: justify; text-indent: -28px; line-height: 1.72; page-break-inside: avoid; page-break-after: avoid; margin: 17px 0px 17px 56px } .rvps15 { text-align: justify; text-indent: 28px; margin: 0px 0px 0px 56px } .rvps16 { text-align: justify; text-indent: -28px; margin: 0px 0px 0px 112px } .rvps17 { text-align: justify; text-indent: 32px; margin: 0px 0px 0px 112px } .rvps18 { text-align: justify; text-indent: -28px; margin: 0px 0px 0px 84px } .rvps19 { text-align: justify; text-indent: 28px; margin: 0px 0px 0px 84px } .rvps20 { text-align: justify; text-indent: 28px; margin: 0px 0px 0px 112px } .rvps21 { text-align: justify; margin: 0px 0px 0px 84px } .rvps22 { text-align: justify; text-indent: -28px; margin: 0px 0px 0px 112px } .rvps23 { text-align: justify; text-indent: -28px; margin: 0px 0px 0px 140px } .rvps24 { text-align: justify; text-indent: 43px; line-height: 1.72; page-break-inside: avoid; page-break-after: avoid; margin: 17px 0px 17px 0px } .rvps25 { text-align: center; margin: 104px 0px 0px 0px }</style>

Hadoop Day3