首页 > 代码库 > Hadoop基础-07-MapReduce原理、序列化和源码分析
Hadoop基础-07-MapReduce原理、序列化和源码分析
1. MapReduce原理
1.1. MapReduce概述
(1)MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
(2)MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。
(3)在Hadoop 中,map 函数 位 于 内 置 类 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT, VALUEOUT>中,reduce 函数位于内置类 org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT,VALUEOUT>中。 我们要做的就是覆盖 map 函数和 reduce 函数。
对于 Hadoop 的 map 函数和reduce 函数,处理的数据是键值对,也就是说 map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。
在Mapper 类,有四个泛型,分别是 KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个 KEYIN、 VALUEIN 指的是map 函数输入的参数 key、 value 的类型;后面两个 KEYOUT、VALUEOUT 指的是 map 函数输出的 key、value的类型。
输入参数 key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次 map 函数。在这里,map 函数没有处理输入的 key、value,直接通过 context.write(…)方法输出了,输出的 key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要根据业务逻辑覆盖的。
查看 Reducer 类,也有四个泛型,同理,分别指的是 reduce 函数输入的 key、value类型,和输出的 key、value 类型。看一下reduce 函数定义,如下图所示:
reduce 函数的形参 key、value 的类型是KEYIN、VALUEIN。要注意这里的value是存在于java.lang.Iterable<VALUEIN>中的,这是一个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的 key,通过调用context.write(…)输出了,这里输出的类型是 KEYOUT、VALUEOUT。通常会根据业务逻辑覆盖 reduce 函数的实现。
1.2. MapReduce执行过程
MapReduce 运行的时候,会通过 Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收 Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到 HDFS 的文件中。
1.3. MapReduce原理及执行步骤
MapReduce原理图如下所示:
简单理解如下所示:
(1)Map任务处理
l 读取输入文件内容,解析成key、value对,对输入文件的每一行,解析成key、value对。每一个键值调用一次map函数;
l 覆盖map函数,对输入的key、value处理,转换成新的key、value输出;
l 对输出的key、value进行分区;
l 对不同分区的数据,按照key进行排序、分组;相同key的value放到一个集合中;
l 对分组后的数据进行规约。
l Mapper执行1-6个步骤得到最终的结果
(2)Reduce任务处理
l 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点;对多个map任务的输出进行合并、排序;
l 覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出;
l 把reduce的输出保存到文件中。
l Reduce执行4-6个步骤对Mapper最终的结果执行4-6个步骤进一步处理
将上述步骤融入到原理图中后如下所示:
(3)键值对编号
对于 Mapper 任务输入的键值对,定义为key1 和 value1。在 map 方法中处理后,输出的键值对,定义为 key2 和 value2。reduce 方法接收 key2 和 value2,处理后,输出 key3 和 value3。在下文讨论键值对时,可能把 key1 和 value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和 value3 简写为<k3,v3>。
1.4. 单词计数
package mavshuang.mapreduce; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class WordCountDemo { private static final String INPUT_PATH = "hdfs://hadoop0:9000/hello"; private static final String OUTPUT_PATH = "hdfs://hadoop0:9000/out"; public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { final Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), configuration); // 判断文件是否存在,如果存在则删除 if (fileSystem.exists(new Path(OUTPUT_PATH))) { fileSystem.delete(new Path(OUTPUT_PATH), true); } String jobName = WordCountDemo.class.getSimpleName(); Job job = new Job(configuration, jobName); // 1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。 // 设置job执行作业时输入文件的路径 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 设置把输入文件处理成键值对的类 job.setInputFormatClass(TextInputFormat.class); // 1.2 覆盖map函数,对输入的key、value处理,转换成新的key、value输出。 // 省略的条件是map输出的<k,v>与reduce输出的<k,v>格式相同 // 设置自定义的MyMapper类 job.setMapperClass(MyMapper.class); // 设置map方法输出的k2,v2值得类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.3 对输出的key、value进行分区。 // 设置对k2分区的类 job.setPartitionerClass(HashPartitioner.class); // 设置运行的Reducer任务的数量 job.setNumReduceTasks(1); // 1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。 // 1.5 (可选)分组后的数据进行归约。 // 2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。对多个map任务的输出进行合并、排序。 // 设置自定义的MyReducer类 job.setReducerClass(MyReducer.class); // 2.2 覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出。 // 设置reduce方法输出的k3,v3值得类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 2.3 把reduce的输出保存到文件中。 // 设置job执行作业时的输出路径 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 设置把输出文件处理成键值对的类 job.setOutputFormatClass(TextOutputFormat.class); // 把job提交给JobTracker执行,等待执行结果返回 job.waitForCompletion(true); } // KEYIN:表示每一行的偏移量 // VALUEIN:表示每一行的内容 // KEYOUT:表示每一行中的每个单词 // VALUEOUT:表示每一行中每个单词的出现次数,常量为1 // 继承Mapper类实现map方法 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { // 源文件会被解析成2个键值对,分别为<0,hello you >,<10,hello mavs> // 每个<k,v>都调用一次函数 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { final String[] splited = value.toString().split("\t"); for (String word : splited) { final Text k2 = new Text(word); final LongWritable v2 = new LongWritable(1); context.write(k2, v2); } } } // KEYIN:表示整个文件中的不同单词 // VALUEIN:表示整个文件中的不同单词出现的次数 // KEYOUT:表示整个文件中的不同单词 // VALUEOUT:表示整个文件中的不同单词出现的总次数 // 继承Reducer类实现reduce方法 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { // reduce会被调用3次,分别是<hello,{1,1}>、<mavs,{1}>、<you,{1}> protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { Long count = 0L;// 定义成Java类型,便于操作 for (LongWritable times : v2s) { count += times.get(); } final LongWritable v3 = new LongWritable(count); context.write(k2, v3); } } }
上述结果错误原因在于:hello文件,单词与单词之间应该使用“Tab”键来区分,而不是空格键;修改后的结果如下所示:
1.5. 分析上述代码执行过程
(1)JobTracker
负责接收用户提交的作业,负责启动、跟踪任务的执行。
JobSubmissionProtocol是JobClient与JobTracker通信的接口;
InterTrackerProtocol是TaskTracker与JobTracker通信的接口;
(2)TaskTracker
负责执行任务。
(3)JobClient
是用户作业与JobTracker交互的主要接口;
负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。
(4)MapReduce驱动默认的设置
以上代码是从上述代码中截取出来的,其中一些设置未按照MapReduce中的默认设置,MapReduce中的默认设置如下所示:
(5)上述代码分析执行过程
(6)总结图解
2. 序列化
2.1.序列化概念
(1)序列化(Serialization)是指把结构化对象转换为字节流;
(2)反序列化(Deserialization)是序列化的逆过程,即把字节流转回结构化对象;
(3)Java序列化是指java.io.Serializable接口。
2.2.Hadoop序列化特点
(1)紧凑
高效使用存储空间。
(2)快速
读写数据的额外开销小。
(3)可扩展
可透明地读取老格式的数据。
(5)互操作
支持多语言的交互。
2.3.Hadoop序列化的作用
主要有两大作用:进程间通信和永久存储;
Hadoop节点间通信如下图所示:
3. Writable接口以及常用的Writable实现类
3.1. Writable接口
根据DataInput和DataOutput实现的简单的、有效的序列化对象。
MapReduce的任意key和value都必须实现Writable接口并实现write()和readFields()方法:
write 是把每个对象序列化到输出流
readFields是把输入流字节反序列化
MapReduce的任意key都必须实现WritableComparable接口:
3.2. 常用的Writable实现类
Java基本类型与Hadoop中类型之间的转换如下图所示:
3.3.基于文件的存储结构
SequenceFile 无序存储
MapFile 会对key建立索引文件,value按key顺序存储
基于MapFile的结构有:
ArrayFile 像我们使用的数组一样,key值为序列化的数字
SetFile 他只有key,value为不可变的数据
BloomMapFile 在 MapFile 的基础上增加了一个 /bloom 文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。
3.4.自定义WritableKpi
package mavshuang.mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class KpiDemo { public static final String INPUT_PATH = "hdfs://hadoop0:9000/kpi"; public static final String OUT_PATH = "hdfs://hadoop0:9000/out"; public static void main(String[] args) throws Exception { final Configuration configuration = new Configuration(); //final FileSystem fileSystem = FileSystem.get(configuration);//这是导致出错的原因 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); // 判断输出的路径是否已经存在 if (fileSystem.exists(new Path(OUT_PATH))) { fileSystem.delete(new Path(OUT_PATH), true); } final String jobName = KpiDemo.class.getSimpleName(); final Job job = new Job(configuration,jobName); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> { protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split("\t"); final String phoneNo = splited[1]; final Text k2 = new Text(phoneNo); final KpiWritable v2 = new KpiWritable(splited[6], splited[7], splited[8], splited[9]); context.write(k2, v2); }; } static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> { protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for (KpiWritable kpiWritable : v2s) { upPackNum += kpiWritable.upPackNum; downPackNum += kpiWritable.downPackNum; upPayLoad += kpiWritable.upPayLoad; downPayLoad += kpiWritable.downPayLoad; } final KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad); context.write(k2, v3); }; } } class KpiWritable implements Writable { long upPackNum;// 上传数据包数,单位是个 long downPackNum;// 下载数据包数,单位是个 long upPayLoad;// 上行总流量,单位是bytes long downPayLoad;// 下行总流量,单位是bytes public KpiWritable() { } public KpiWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) { this.upPackNum = upPackNum; this.downPackNum = downPackNum; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; } public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) { this(Long.parseLong(upPackNum), Long.parseLong(downPackNum), Long.parseLong(upPayLoad), Long.parseLong(downPayLoad)); } public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayLoad); } public String toString() { return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; } }
出现如下错误:
//final FileSystem fileSystem = FileSystem.get(configuration);//这是导致出错的原因 |
正确的运行结果如下所示:
查看运行后生成的文件:
4.MapReduce的执行过程源码分析
4.1.InputFormat
InputFormat负责处理MR的输入部分,有三个作用:
l 验证作业的输入是否规范,即验证输入信息的合法性,包括输入路径是否存在等;
l 把输入文件切分成InputSpilt,即把HDFS中的文件按照一定规则拆分成InputSpilt,每个InputSpilt由一个Mapper执行;
l 提供RecordReader,把InputSpilt中的每一行解析出来供map函数处理。
4.2. FileInputFormat
FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为Job输入的所有文件,并实现了对输入文件计算splits的方法;而获取记录的方法是由TextInputFormat进行实现的。
在上图中,第247行计算minSize,是提供给后面计算使用的,其中getFormatMinSplitSize()方法的值是1,getMinSplitSize(job)方法的值由配置参数mapred.min.split.size指定的,默认是1,所以minSize的值就是1。第248行计算maxSize,也是同后面使用的,值由mapred.max.split.size指定,默认值是Long的最大值。在第252行files列表中存放的是输入文件,可以有多个。从第253行开始,循环处理每一个输入文件;第254行是获取文件路径,L256获取文件长度,L257获取文件块位置。如果文件非空,并且文件允许被分割为输入块,从而进入L258中。
从上图看出输入块size由三个因素决定,分别为blockSize, minSize,maxSize,根据之前的数值,输入分片的默认Size就是文件块Size。
4.3.InputSplit
l 在执行MapReduce之前,原始数据被分割成若干spilt,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会一次处理每一个记录;
l FileInputFormat只划分比HDFS block大的文件;如果一个文件的大小比block小,将不会被划分,这是Hadoop处理大文件的效率比处理很多小文件效率高的原因;
l 当Hadoop处理很多小文件时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个spilt并分配一个Map任务,从而导致效率低下。
4.4.TextInputFormat
l TextInputFormat是默认处理类,处理普通文本文件;文件中每一行作为一个记录,将每一行在文件中的起始偏移量作为key,每一行的内容作为value,默认以\n或者回车键作为一行记录;
l TextInputFormat继承FileInputFormat。
InputFormat类的层次结构:
4.5.其他输入类
CombineFileInputFormat:处理小文件;
KeyValueTextInputFormat:当输入数据的每一行是两列,并且使用Tab键分离的时候;
NLineInputFormat:控制在每个split中数据的行数;
SequenceFileInputFormat:当输入文件格式是sequencefile的时候要使用SequenceFileInputFormat作为输入。
4.6.自定义输入格式
继承FileInputFormat基类;
重写getSplits(JobContextjob)方法。
4.7.Hadoop的输出
TextOutputFormat:默认的输出格式,key和value中间值使用Tab键隔开;
在上图中发现,当文本输出的时候使用UTF-8编码,从L47中可看出,划分行的符号是”\n”,从L65中可以看出,输出的键值对默认是以制表符(Tab)分割的。
SequenceFileOutputFormat:将key和value以sequenceFile格式输出;
SequenceFileAsBinaryOutputFormat:将key和value以原始二进制的格式输出;
MapFileOutputFormat:将key和value写入MapFile中,由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。
MultipleOutputFormat:默认情况下一个reducer会产生一个输出,此类可以实现一个reducer产生多个输出。
Hadoop基础-07-MapReduce原理、序列化和源码分析