首页 > 代码库 > Hadoop基础-07-MapReduce原理、序列化和源码分析

Hadoop基础-07-MapReduce原理、序列化和源码分析

1. MapReduce原理

1.1.          MapReduce概述

(1)MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

(2)MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。

(3)在Hadoop 中,map 函数 位 于 内 置 类 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT, VALUEOUT>中,reduce 函数位于内置类 org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT,VALUEOUT>中。 我们要做的就是覆盖 map 函数和 reduce 函数。

对于 Hadoop 的 map 函数和reduce 函数,处理的数据是键值对,也就是说 map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。


在Mapper 类,有四个泛型,分别是 KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个 KEYIN、 VALUEIN 指的是map 函数输入的参数 key、 value 的类型;后面两个 KEYOUT、VALUEOUT 指的是 map 函数输出的 key、value的类型。

输入参数 key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次 map 函数。在这里,map 函数没有处理输入的 key、value,直接通过 context.write(…)方法输出了,输出的 key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要根据业务逻辑覆盖的。

查看 Reducer 类,也有四个泛型,同理,分别指的是 reduce 函数输入的 key、value类型,和输出的 key、value 类型。看一下reduce 函数定义,如下图所示:


reduce 函数的形参 key、value 的类型是KEYIN、VALUEIN。要注意这里的value是存在于java.lang.Iterable<VALUEIN>中的,这是一个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的 key,通过调用context.write(…)输出了,这里输出的类型是 KEYOUT、VALUEOUT。通常会根据业务逻辑覆盖 reduce 函数的实现。

1.2.          MapReduce执行过程


MapReduce 运行的时候,会通过 Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收 Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到 HDFS 的文件中。


1.3.          MapReduce原理及执行步骤

MapReduce原理图如下所示:

 

简单理解如下所示:

(1)Map任务处理


l  读取输入文件内容,解析成key、value对,对输入文件的每一行,解析成key、value对。每一个键值调用一次map函数;

l  覆盖map函数,对输入的key、value处理,转换成新的key、value输出;

l  对输出的key、value进行分区;

l  对不同分区的数据,按照key进行排序、分组;相同key的value放到一个集合中;

l  对分组后的数据进行规约。

l  Mapper执行1-6个步骤得到最终的结果



(2)Reduce任务处理


l  对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点;对多个map任务的输出进行合并、排序;

l  覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出;

l  把reduce的输出保存到文件中。

l  Reduce执行4-6个步骤对Mapper最终的结果执行4-6个步骤进一步处理


将上述步骤融入到原理图中后如下所示:


(3)键值对编号



对于 Mapper 任务输入的键值对,定义为key1 和 value1。在 map 方法中处理后,输出的键值对,定义为 key2 和 value2。reduce 方法接收 key2 和 value2,处理后,输出 key3 和 value3。在下文讨论键值对时,可能把 key1 和 value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和 value3 简写为<k3,v3>。

1.4.          单词计数

package mavshuang.mapreduce;
 
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
public class WordCountDemo {
       private static final String INPUT_PATH = "hdfs://hadoop0:9000/hello";
       private static final String OUTPUT_PATH = "hdfs://hadoop0:9000/out";
 
       public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
              final Configuration configuration = new Configuration();
              FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), configuration);
              // 判断文件是否存在,如果存在则删除
              if (fileSystem.exists(new Path(OUTPUT_PATH))) {
                     fileSystem.delete(new Path(OUTPUT_PATH), true);
              }
              String jobName = WordCountDemo.class.getSimpleName();
              Job job = new Job(configuration, jobName);
              // 1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
              // 设置job执行作业时输入文件的路径
              FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
              // 设置把输入文件处理成键值对的类
              job.setInputFormatClass(TextInputFormat.class);
              // 1.2 覆盖map函数,对输入的key、value处理,转换成新的key、value输出。
              // 省略的条件是map输出的<k,v>与reduce输出的<k,v>格式相同
              // 设置自定义的MyMapper类
              job.setMapperClass(MyMapper.class);
              // 设置map方法输出的k2,v2值得类型
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(LongWritable.class);
 
              // 1.3 对输出的key、value进行分区。
              // 设置对k2分区的类
              job.setPartitionerClass(HashPartitioner.class);
              // 设置运行的Reducer任务的数量
              job.setNumReduceTasks(1);
 
              // 1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
 
              // 1.5 (可选)分组后的数据进行归约。
 
              // 2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。对多个map任务的输出进行合并、排序。
              // 设置自定义的MyReducer类
              job.setReducerClass(MyReducer.class);
 
              // 2.2 覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出。
              // 设置reduce方法输出的k3,v3值得类型
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(LongWritable.class);
 
              // 2.3 把reduce的输出保存到文件中。
              // 设置job执行作业时的输出路径
              FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
              // 设置把输出文件处理成键值对的类
              job.setOutputFormatClass(TextOutputFormat.class);
 
              // 把job提交给JobTracker执行,等待执行结果返回
              job.waitForCompletion(true);
 
       }
 
       // KEYIN:表示每一行的偏移量
       // VALUEIN:表示每一行的内容
       // KEYOUT:表示每一行中的每个单词
       // VALUEOUT:表示每一行中每个单词的出现次数,常量为1
       // 继承Mapper类实现map方法
       static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
              // 源文件会被解析成2个键值对,分别为<0,hello you >,<10,hello mavs>
              // 每个<k,v>都调用一次函数
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                     final String[] splited = value.toString().split("\t");
                     for (String word : splited) {
                            final Text k2 = new Text(word);
                            final LongWritable v2 = new LongWritable(1);
                            context.write(k2, v2);
                     }
              }
       }
 
       // KEYIN:表示整个文件中的不同单词
       // VALUEIN:表示整个文件中的不同单词出现的次数
       // KEYOUT:表示整个文件中的不同单词
       // VALUEOUT:表示整个文件中的不同单词出现的总次数
       // 继承Reducer类实现reduce方法
       static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
              // reduce会被调用3次,分别是<hello,{1,1}>、<mavs,{1}>、<you,{1}>
              protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
                     Long count = 0L;// 定义成Java类型,便于操作
                     for (LongWritable times : v2s) {
                            count += times.get();
                     }
                     final LongWritable v3 = new LongWritable(count);
                     context.write(k2, v3);
              }
       }
}

 



上述结果错误原因在于:hello文件,单词与单词之间应该使用“Tab”键来区分,而不是空格键;修改后的结果如下所示:

 



1.5.          分析上述代码执行过程

(1)JobTracker

负责接收用户提交的作业,负责启动、跟踪任务的执行。

JobSubmissionProtocol是JobClient与JobTracker通信的接口;

InterTrackerProtocol是TaskTracker与JobTracker通信的接口;

(2)TaskTracker

负责执行任务。

(3)JobClient

是用户作业与JobTracker交互的主要接口;

负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。

(4)MapReduce驱动默认的设置


以上代码是从上述代码中截取出来的,其中一些设置未按照MapReduce中的默认设置,MapReduce中的默认设置如下所示:

 

(5)上述代码分析执行过程

(6)总结图解

 

2. 序列化

2.1.序列化概念

(1)序列化(Serialization)是指把结构化对象转换为字节流;

(2)反序列化(Deserialization)是序列化的逆过程,即把字节流转回结构化对象;

(3)Java序列化是指java.io.Serializable接口。

2.2.Hadoop序列化特点

(1)紧凑

         高效使用存储空间。

(2)快速

         读写数据的额外开销小。

(3)可扩展

         可透明地读取老格式的数据。

(5)互操作

支持多语言的交互。

 

2.3.Hadoop序列化的作用

主要有两大作用:进程间通信和永久存储;

Hadoop节点间通信如下图所示:

 

3. Writable接口以及常用的Writable实现类

3.1. Writable接口

         根据DataInput和DataOutput实现的简单的、有效的序列化对象。


         MapReduce的任意key和value都必须实现Writable接口并实现write()和readFields()方法:

write 是把每个对象序列化到输出流

readFields是把输入流字节反序列化

         MapReduce的任意key都必须实现WritableComparable接口:


3.2. 常用的Writable实现类


Java基本类型与Hadoop中类型之间的转换如下图所示:

 

3.3.基于文件的存储结构

SequenceFile 无序存储

MapFile 会对key建立索引文件,value按key顺序存储

基于MapFile的结构有:

ArrayFile 像我们使用的数组一样,key值为序列化的数字

SetFile 他只有key,value为不可变的数据

BloomMapFile 在 MapFile 的基础上增加了一个 /bloom 文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。

 

3.4.自定义WritableKpi


package mavshuang.mapreduce;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class KpiDemo {
       public static final String INPUT_PATH = "hdfs://hadoop0:9000/kpi";
       public static final String OUT_PATH = "hdfs://hadoop0:9000/out";
 
       public static void main(String[] args) throws Exception {
              final Configuration configuration = new Configuration();
              //final FileSystem fileSystem = FileSystem.get(configuration);//这是导致出错的原因
              final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
              // 判断输出的路径是否已经存在
              if (fileSystem.exists(new Path(OUT_PATH))) {
                     fileSystem.delete(new Path(OUT_PATH), true);
              }
             
              final String jobName = KpiDemo.class.getSimpleName();
             
              final Job job = new Job(configuration,jobName);
             
              FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
             
              job.setMapperClass(MyMapper.class);
             
              job.setReducerClass(MyReducer.class);
             
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(KpiWritable.class);
             
              FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
             
              job.waitForCompletion(true);
       }
 
       static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {
 
              protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws java.io.IOException ,InterruptedException {
                     final String[] splited = value.toString().split("\t");
                     final String phoneNo = splited[1];
                     final Text k2 = new Text(phoneNo);
                    
                     final KpiWritable v2 = new KpiWritable(splited[6], splited[7], splited[8], splited[9]);
                    
                     context.write(k2, v2);
              };
       }
 
       static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> {
 
              protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
                     long upPackNum = 0L;
                     long downPackNum = 0L;
                     long upPayLoad = 0L;
                     long downPayLoad = 0L;
                    
                     for (KpiWritable kpiWritable : v2s) {
                            upPackNum += kpiWritable.upPackNum;
                            downPackNum += kpiWritable.downPackNum;
                            upPayLoad += kpiWritable.upPayLoad;
                            downPayLoad += kpiWritable.downPayLoad;
                     }
                    
                     final KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
                    
                     context.write(k2, v3);
              };
       }
}
 
class KpiWritable implements Writable {
       long upPackNum;// 上传数据包数,单位是个
       long downPackNum;// 下载数据包数,单位是个
       long upPayLoad;// 上行总流量,单位是bytes
       long downPayLoad;// 下行总流量,单位是bytes
 
       public KpiWritable() {
       }
 
       public KpiWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) {
              this.upPackNum = upPackNum;
              this.downPackNum = downPackNum;
              this.upPayLoad = upPayLoad;
              this.downPayLoad = downPayLoad;
       }
 
       public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
              this(Long.parseLong(upPackNum), Long.parseLong(downPackNum), Long.parseLong(upPayLoad), Long.parseLong(downPayLoad));
       }
 
       public void readFields(DataInput in) throws IOException {
              this.upPackNum = in.readLong();
              this.downPackNum = in.readLong();
              this.upPayLoad = in.readLong();
              this.downPayLoad = in.readLong();
       }
 
       public void write(DataOutput out) throws IOException {
              out.writeLong(this.upPackNum);
              out.writeLong(this.downPackNum);
              out.writeLong(this.upPayLoad);
              out.writeLong(this.downPayLoad);
       }
 
       public String toString() {
              return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
       }
 
}

 出现如下错误:


//final FileSystem fileSystem = FileSystem.get(configuration);//这是导致出错的原因

 

正确的运行结果如下所示:


查看运行后生成的文件:

4.MapReduce的执行过程源码分析

4.1.InputFormat


InputFormat负责处理MR的输入部分,有三个作用:

l  验证作业的输入是否规范,即验证输入信息的合法性,包括输入路径是否存在等;

l  把输入文件切分成InputSpilt,即把HDFS中的文件按照一定规则拆分成InputSpilt,每个InputSpilt由一个Mapper执行;

l  提供RecordReader,把InputSpilt中的每一行解析出来供map函数处理。

 

4.2. FileInputFormat

         FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为Job输入的所有文件,并实现了对输入文件计算splits的方法;而获取记录的方法是由TextInputFormat进行实现的。

         在上图中,第247行计算minSize,是提供给后面计算使用的,其中getFormatMinSplitSize()方法的值是1,getMinSplitSize(job)方法的值由配置参数mapred.min.split.size指定的,默认是1,所以minSize的值就是1。第248行计算maxSize,也是同后面使用的,值由mapred.max.split.size指定,默认值是Long的最大值。在第252行files列表中存放的是输入文件,可以有多个。从第253行开始,循环处理每一个输入文件;第254行是获取文件路径,L256获取文件长度,L257获取文件块位置。如果文件非空,并且文件允许被分割为输入块,从而进入L258中。



          从上图看出输入块size由三个因素决定,分别为blockSize, minSize,maxSize,根据之前的数值,输入分片的默认Size就是文件块Size。

4.3.InputSplit

l  在执行MapReduce之前,原始数据被分割成若干spilt,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会一次处理每一个记录;

l  FileInputFormat只划分比HDFS block大的文件;如果一个文件的大小比block小,将不会被划分,这是Hadoop处理大文件的效率比处理很多小文件效率高的原因;

l  当Hadoop处理很多小文件时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个spilt并分配一个Map任务,从而导致效率低下。

4.4.TextInputFormat

l  TextInputFormat是默认处理类,处理普通文本文件;文件中每一行作为一个记录,将每一行在文件中的起始偏移量作为key,每一行的内容作为value,默认以\n或者回车键作为一行记录;

l  TextInputFormat继承FileInputFormat。


InputFormat类的层次结构:


4.5.其他输入类

CombineFileInputFormat:处理小文件;

KeyValueTextInputFormat:当输入数据的每一行是两列,并且使用Tab键分离的时候;

NLineInputFormat:控制在每个split中数据的行数;

SequenceFileInputFormat:当输入文件格式是sequencefile的时候要使用SequenceFileInputFormat作为输入。

4.6.自定义输入格式

继承FileInputFormat基类;

重写getSplits(JobContextjob)方法。

4.7.Hadoop的输出

TextOutputFormat:默认的输出格式,key和value中间值使用Tab键隔开;

         在上图中发现,当文本输出的时候使用UTF-8编码,从L47中可看出,划分行的符号是”\n”,从L65中可以看出,输出的键值对默认是以制表符(Tab)分割的。

SequenceFileOutputFormat:将key和value以sequenceFile格式输出;

SequenceFileAsBinaryOutputFormat:将key和value以原始二进制的格式输出;

MapFileOutputFormat:将key和value写入MapFile中,由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。

MultipleOutputFormat:默认情况下一个reducer会产生一个输出,此类可以实现一个reducer产生多个输出。

 

Hadoop基础-07-MapReduce原理、序列化和源码分析