首页 > 代码库 > Hadoop那些事儿(三)---MapReduce编程浅析

Hadoop那些事儿(三)---MapReduce编程浅析

1.map和reduce

1.1 mapReduce处理逻辑

在本系列文章的第一篇中,曾对MapReduce原理做过简单的描述,在这里再重述一遍。
首先我们有两个文件word1.txt和word2.txt
其中word1.txt的内容如下:

aaaa
bbbb
cccc
dddd
aaaa

word2.txt的内容如下:

aaaa
cccc
dddd
eeee
aaaa

这里的两个文件很小,我们先假设这两个文件很大,分别为64M和96M的大小,然后我们需要统计文件中每个字符串的数量,那么MapReduce的处理流程如下:
技术分享
Input:最左边是输入的过程,输入了图示的数据。
Split分片:mapreduce会根据输入的文件计算分片,每个分片对应与一个map任务。而分片的过程和HDFS密切相关,比如HDFS的一个block大小为64M,我们输入的两个文件分比为64M,96M,这样的话第一个文件生成一个64M的分片,第二个文件生成一个64M的分片和一个32M的分片(如果有一个小于64M的文件,比如10M的文件,那么这个文件会生成一个单独的10M的分片)
Map:map阶段是由编程人员通过代码来控制的,图中所示的大概内容就是将字符串分割开来,作为键存储在map中,值的位置存储1,表示数量。
shuffle洗牌:洗牌阶段,由于之前生成map中存在很多键相同的map,在洗牌阶段将键相同的进行合并。
Reduce:reduce阶段也是有开发人员通过代码控制,本例中是将键相同的map的value值进行求和,得出最终的map
这样最后输出的数据就是每个字符串出现的次数。

1.2 Hadoop数据类型

Hadoop本身提供了一套可优化网络序列化传输的基本类型

类型 含义
BooleanWritable 标准布尔型数值
ByteWritable 单字节数值
DoubleWritable 双字节数值
FloatWritable 浮点数
IntWritable 整型数
LongWritable 长整型数
Text 使用UTF8格式存储的文本
NullWritable 当中的key或value为空时使用

1.3 Mapper

Mapper类是一个泛型类,四个参数分别指定map函数的输入键,输入值,输出键,输出值
技术分享
Mapper类包含四个方法:
技术分享
setup方法在任务开始时调用一次,一般用来做map前的准备工作。
map承担主要的处理工作,把输入数据拆分为键值对。
cleanup方法则是在任务结束时调用一次,主要负责收尾工作。
run方法确定了setup-map-cleanup的执行模板。
map()方法的输入是一个键和一个值,输出是一个Context实例:
技术分享
先了解到这里,后续我们结合代码来进一步了解Mapper。

1.4 Reducer

Reducer类也是一个泛型类,与Mapper相似,四个参数分别指定map函数的输入键,输入值,输出键,输出值
技术分享
Reducer类也包含四个方法:
技术分享

setup方法在任务开始时调用一次,一般用来做reduce前的准备工作。
reduce承担主要的处理工作,把输入数据拆分为键值对。
cleanup方法则是在任务结束时调用一次,主要负责收尾工作。
run方法确定了setup-reduce-cleanup的执行模板。

注意,Reducer的输入类型必须匹配Mapper的输出类型。

2.代码分析

接下来我们来看一下上一篇文章用到的测试代码:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    //继承mapper接口,设置map的输入类型为<Object,Text>
    //输出类型为<Text,IntWritable>
    public static class Map extends Mapper<Object,Text,Text,IntWritable>{
        //one表示单词出现一次
        private static IntWritable one = new IntWritable(1);
        //word存储切下的单词
        private Text word = new Text();
        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            //对输入的行切词
            StringTokenizer st = new StringTokenizer(value.toString());
            while(st.hasMoreTokens()){
                word.set(st.nextToken());//切下的单词存入word
                context.write(word, one);
            }
        }
    }
    //继承reducer接口,设置reduce的输入类型<Text,IntWritable>
    //输出类型为<Text,IntWritable>
    public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
        //result记录单词的频数
        private static IntWritable result = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
            int sum = 0;
            //对获取的<key,value-list>计算value的和
            for(IntWritable val:values){
                sum += val.get();
            }
            //将频数设置到result
            result.set(sum);
            //收集结果
            context.write(key, result);
        }
    }
    /**
     * @param args
     */
    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker", "localhost:9001");
        args = new String[]{"hdfs://localhost:9000/user/hadoop/input/count_in","hdfs://localhost:9000/user/hadoop/output/count_out"};
        //检查运行命令
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.err.println("Usage WordCount <int> <out>");
            System.exit(2);
        }
        //配置作业名
        Job job = new Job(conf,"word count");
        //配置作业各个类
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

WordCount类可以分为三部分,Map,Reduce和main三部分,Map和Reduce都是静态内部类。
Map类继承与Mapper类,四个参数表示其输入键类型为Object,输入值为文本,输出键为文本,输出值为整型数。
通过执行Map操作后,我们希望得到的结果是图1中第三列mapping列的值,即将数据拆分后存储到map中,每个字符串的数量均存储为1.
在代码中定义了一个整型类型的变量one,值为1,用来作为map的值。
map方法的前两个参数分别为输入的键和值,通过下面的代码先将text格式的字段转为java的String类型。

StringTokenizer st = new StringTokenizer(value.toString());

StringTokenizer 根据自定义字符为分界符对字符串进行拆分并将结果集封装提供对应的遍历方法,有如下构造方法:
技术分享
str为要拆分的字符串,delim为界定符,当不指定delim时,将默认以空格进行拆分。

有如下方法:
技术分享
其中hasMoreTokens方法用来判断是否还有分隔符。
使用context的write方法将数据进行记录。

Reduce类继承于Reducer类,Reducer类是一个泛型类,四个参数分别表示输入键,输入值,输出键,输出值。其中输入键和输入值与Map类的输出键,输出值保持一致。
当数据到达reduce时,数据已经经过了洗牌,即键相同的数据进行了合并,所以reduce方法的key为键,values是一个迭代器,存储着该键对应的所有值,然后在方法体中对该键对应的值得数量进行了统计。
如果我们在map方法中分别写一句System.out.println(“map”)和System.out.println(“reduce”),就会发现map方法和reduce方法都不止被执行了一次。

main方法来控制任务的执行。
要知道,使用MapReduce框架时,我们仅仅只是填写map和reduce部分的代码,其他的都交给mapreduce框架来处理,所以我们至少需要告诉mapreduce框架应该怎么执行,main方法中的代码做的就是这个操作。
首先我们需要初始化Configuration类,使用MapReduce之前一定要初始化Configuration,该类主要用来读取hdfs和Mapreduce的配置信息。
args设置输入文件和输出文件的位置,这里指向hdfs,输出文件的文件夹可以不存在,运行后会在指定目录下自动生成,输出文件一定不能存在,在运行前要将上一次运行生成的输出文件删除掉。
在上面的代码中我们是通过下面的代码来配置的:

conf.set("mapred.job.tracker", "localhost:9001");

我们也可以将该信息添加到xml文件中来配置,如下图:
技术分享
代码修改为:
技术分享
接下来的if部分用来判断是否有两个参数都指定了。
再往下就是配置作业。首先创建一个Job类,然后装载需要的各个类,从上到下分别为:程序类(我们编写的java文件的类名,这里是WordCount),Mapper类(继承了Mapper类的内部类,这里是Map),
Combiner和Reducer类都指向继承于Reducer的内部类Reduce.
再往下两行:

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

指定了输出数据的键和值的类型,也是数据存储到hdfs结果文件中的类型。
下面的代码用来创建输入文件和输出文件:

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

最后一行代码表示执行成功后退出程序。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    Hadoop那些事儿(三)---MapReduce编程浅析