首页 > 代码库 > MapReduce剖析笔记之一:从WordCount理解MapReduce的几个阶段

MapReduce剖析笔记之一:从WordCount理解MapReduce的几个阶段

WordCount是一个入门的MapReduce程序(从src\examples\org\apache\hadoop\examples粘贴过来的):
package org.apache.hadoop.examples;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {  public static class TokenizerMapper        extends Mapper<Object, Text, Text, IntWritable>{        private final static IntWritable one = new IntWritable(1);    private Text word = new Text();          public void map(Object key, Text value, Context context                    ) throws IOException, InterruptedException {      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens()) {        word.set(itr.nextToken());        context.write(word, one);      }    }  }    public static class IntSumReducer        extends Reducer<Text,IntWritable,Text,IntWritable> {    private IntWritable result = new IntWritable();    public void reduce(Text key, Iterable<IntWritable> values,                        Context context                       ) throws IOException, InterruptedException {      int sum = 0;      for (IntWritable val : values) {        sum += val.get();      }      result.set(sum);      context.write(key, result);    }  }  public static void main(String[] args) throws Exception {    Configuration conf = new Configuration();    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    if (otherArgs.length != 2) {      System.err.println("Usage: wordcount <in> <out>");      System.exit(2);    }    Job job = new Job(conf, "word count");    job.setJarByClass(WordCount.class);    job.setMapperClass(TokenizerMapper.class);    job.setCombinerClass(IntSumReducer.class);    job.setReducerClass(IntSumReducer.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(IntWritable.class);    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    System.exit(job.waitForCompletion(true) ? 0 : 1);  }}
MapReduce即将一个计算任务分为两个阶段:Map、Reduce。为什么要这么分解?
为了理解其含义,我们先不管MapReduce这一套框架,从一个简单的问题来看,如果对于100T的日志文件,需要统计其中出现的"ERROR"这个单词的次数,怎么办?
最简单的方法:单机处理,逐行读入每一行文本,统计并累加,则得到其值。问题是:因为数据量太大,速度太慢,怎么办?自然,多机并行处理就是一个自然的选择。
那么,这个文件怎么切分到多个机器呢?假定有100台机器,可以写一个主程序,将这个100T大文件按照每个机器存储1T的原则,在100台机器上分布存储,再把原来单机上的程序拷贝100份(无需修改)至100台机器上运行得到结果,此时得到的结果只是一个中间结果,最后需要写一个汇总程序,将统计结果进行累加,则完成计算。
将大文件分解后,对单机1T文件计算的过程就相当于Map,而Map的结果就相当于"ERROR"这个单词在本机1T文件中出现的次数,而最后的汇总程序就相当于Reduce,Reduce的输入来源于100台机器。在这个简单的例子中,有100个Map任务,1个Reduce任务。
100台机器计算后的中间结果需要传递到Reduce任务所在机器上,这个过程就是Shuffle,Shuffle这个单词的含义是”洗牌“,也就是将中间结果从Map所在机器传输到Reduce所在机器,在这个过程中,存在网络传输。
此时,我们利用上面的例子已经理解了Map-Shuffle-Reduce的基本含义,在上面的例子中,如果还需要对”WARNING“这个单词进行统计,那么怎么办呢?此时,每个Map任务就不仅需要统计本机1T文件中ERROR的个数,还需要统计WARNING的次数,然后在Reduce程序中分别进行统计。如果需要对所有单词进行统计呢?一个道理,每个Map任务对1T文件中所有单词进行统计计数,然后 Reduce对所有结果进行汇总,得到所有单词在100T大文件中出现的次数。此时,问题可能出现了,因为单词数量可能很多,Reduce用单机处理也可能存在瓶颈了,于是我们需要考虑用多台机器并行计算Reduce,假如用2台机器,因为Reduce只是对单词进行计数累加,所有可以按照这样简单的规则进行:大写字母A-Z开头的单词由Reduce 1累加;小写字母a-z开头的单词由Reduce 2累加。
在这种情况下,100个Map任务执行后的结果,都需要分为两部分,一部分准备送到Reduce 1统计,一部分准备送到Reduce 2统计,这个功能称为Partitioner,即将Map后的结果(比如一个文本文件,记录了各个单词在本机文件出现的次数)分解为两部分(比如两个文本 文件),准备送到两个Reduce任务。
因此,Shuffle在这里就是从100个Map任务和2个Reduce任务之间传输中间结果的过程。
我们继续考虑几个问题:
1、 如果Map后的中间结果数据量较大,Shuffle过程对网络带宽要求较高,因此需要将Map后的结果尽可能减小,这个功能当然可以在Map内自己搞 定,不过MapReduce将这个功能单独拎出来,称为Combiner,即合并,这个Combiner,指的是Map任务后中间结果的合并,相比于 Reduce的最终合并,这里相当于先进行一下局部汇总,减小中间结果,进而减小网络传输量。所以,在上面的例子中,假如Map并不计数,只是记录单词出现这个信息,输出结果是<ERROR,1>,<WARNING,1>,<WARNING,1>.....这样一个Key-Value序列,Combiner可以进行局部汇总,将Key相同的Value进行累加,形成一个新的Key-Value序列:<ERROR,14>,<WARNING,27>,.....,这样就大大减小了Shuffle需要的网络带宽,要知道现在数据中心一般使用千兆以太网,好些的使用万兆以太网,TCP/IP传输的效率不太高。这里Combiner汇总函数实际上可以与Reduce的汇总函数一致,只是输入数据不同。
2、 来自100个Map任务后的结果分别送到两个Reduce任务处理。对于任何一个Reduce任务,输入是一堆<ERROR,14>这样的 Key-Value序列,因为100个Map任务都有可能统计到ERROR的次数,因此这里会先进行一个归并,即将相同单词的归并到一起,形 成<ERROR, <14,36,.....>>,<WARNING,<27,45,...>>这样一个仍然是Key-Value的 序列,14、36、。。。分别表示第1、2、。。。台机器中ERROR的统计次数,这个归并过程在MapReduce中称为Merge。如果merge后 再进行Reduce,那么就只需要统计即可;如果事先没有merge,那么Reduce自己完成这一功能也行,只是两种情况下Reduce的输入Key- Value形式不同。
3、如果要求最后的单词统计结果还要形成字典序怎么办呢?可以自己在 Reduce中进行全排序,也可以100个Map任务中分别进行局部排序,然后将结果发到Reduce任务时,再进行归并排序。这个过程 MapReduce也内建支持,因此不需要用户自己去写排序程序,这个过程在MapReduce中称为Sort。
到这里,我们理解了MapReduce中的几个典型步骤:Map、Sort、Partitioner、 Combiner、Shuffle、Merge、Reduce。MapReduce程序之所以称为MapReduce,就说明Map、Reduce这两个 步骤对于一个并行计算来说几乎是必须的,你总得先分开算吧,所以必须有Map;你总得汇总吧,所以有Reduce。当然,理论上也可以不需要 Reduce,如果Map后就得到你要的结果的话。
Sort对于不需要顺序的程序里没意义(但MapReduce默认做了排序);
Partitioner对于Reduce只有一个的时候没意义,如果有多个Reduce,则需要,至于怎么分,用户可以继承Partitioner标准类,自己实现分解函数。控制中间结果如何传输。MapReduce提供的标准的Partitioner是 一个接口,用户可以自己实现getPartition()函数,MapReduce也提供了几个基本的实现,最典型的HashPartitioner是根 据用户设定的Reduce任务数量(注意,MapReduce中,Map任务的个数最终取决于数据分布,Reduce则是用户直接指定),按照哈希进行计算的:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {  public void configure(JobConf job) {}  /** Use {@link Object#hashCode()} to partition. */  public int getPartition(K2 key, V2 value,                          int numReduceTasks) {    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  }}
这里,numReduceTasks就是用户设定的Reduce任务数量;
K2 key, V2 value 就是Map计算后的中间结果。
Combiner可以选择性放弃,但考虑到网络带宽,可以自己写相应的函数实现局部合并功能。很多情况下,直接利用Reduce那个程序即可,WordCount这个标准程序里就是这么用的。
Shuffle自然是必须的,不用写,根据Partitioner逻辑,框架自己去执行结果传输。
Merge也不是必须的,可以揉到Reduce里面实现等等也可以。因为这些操作的数据结构都是Key-Value,Reduce的输入只要是一个Key-Value即可,相当灵活。
我们再来看WordCount,这个MapReduce程序中定义了一个类:
  public static class TokenizerMapper        extends Mapper<Object, Text, Text, IntWritable>
而Mapper是Hadoop中的一个接口,其定义为:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {    /**    * Maps a single input key/value pair into an intermediate key/value pair.   *    * <p>Output pairs need not be of the same types as input pairs.  A given    * input pair may map to zero or many output pairs.  Output pairs are    * collected with calls to    * {@link OutputCollector#collect(Object,Object)}.</p>   *   * <p>Applications can use the {@link Reporter} provided to report progress    * or just indicate that they are alive. In scenarios where the application    * takes an insignificant amount of time to process individual key/value    * pairs, this is crucial since the framework might assume that the task has    * timed-out and kill that task. The other way of avoiding this is to set    * <a href="http://www.mamicode.com/{@docRoot}/../mapred-default.html#mapred.task.timeout">   * mapred.task.timeout</a> to a high-enough value (or even zero for no    * time-outs).</p>   *    * @param key the input key.   * @param value the input value.   * @param output collects mapped keys and values.   * @param reporter facility to report progress.   */  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)  throws IOException;}
因此,Mapper里面并没有规定输入输出的类型是什么,只要是KeyValue的即可,K1、V1、K2、V2是什么由用户指定,反正只是实现K1、V1到K2、V2的映射即可。

在WordCount中实现了继承于Mapper<Object, Text, Text, IntWritable>的一个TokenizerMapper类,实现了map函数:map(Object key, Text value, Context context ) ;

TokenizerMapper中,输入的Key-Value是<Object, Text>,输出是<Text, IntWritable>,在WordCount程序里,K1代表一行文本的起始位置,V1代表这一行文本;

K2代表单词,V2代表"1",用于后面的累和。

同样,在MapReduce中,Reducer也是一个接口,其声明为:
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {    /**    * <i>Reduces</i> values for a given key.     *    * <p>The framework calls this method for each    * <code>&lt;key, (list of values)></code> pair in the grouped inputs.   * Output values must be of the same type as input values.  Input keys must    * not be altered. The framework will <b>reuse</b> the key and value objects   * that are passed into the reduce, therefore the application should clone   * the objects they want to keep a copy of. In many cases, all values are    * combined into zero or one value.   * </p>   *      * <p>Output pairs are collected with calls to     * {@link OutputCollector#collect(Object,Object)}.</p>   *   * <p>Applications can use the {@link Reporter} provided to report progress    * or just indicate that they are alive. In scenarios where the application    * takes an insignificant amount of time to process individual key/value    * pairs, this is crucial since the framework might assume that the task has    * timed-out and kill that task. The other way of avoiding this is to set    * <a href="http://www.mamicode.com/{@docRoot}/../mapred-default.html#mapred.task.timeout">   * mapred.task.timeout</a> to a high-enough value (or even zero for no    * time-outs).</p>   *    * @param key the key.   * @param values the list of values to reduce.   * @param output to collect keys and combined values.   * @param reporter facility to report progress.   */  void reduce(K2 key, Iterator<V2> values,              OutputCollector<K3, V3> output, Reporter reporter)    throws IOException;}
Reducer的输入为K2, V2(这个对应于Mapper输出的经过Shuffle到达Reducer端的K2,V2,), 输出为K3, V3。

在WordCount中,K2为单词,V2为1这个固定值(或者为局部出现次数,取决于是否有Combiner);K3还是单词,V3就是累和值。

而WordCount里存在继承于Reducer<Text, IntWritable, Text, IntWritable>的IntSumReducer类,完成单词计数累加功能。

对于Combiner,实际上MapReduce没有Combiner这个基类(WordCount自然也没有实现),从任务的提交函数来看:
  public void setCombinerClass(Class<? extends Reducer> cls                               ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);  }
可以看出,Combiner使用的类实际上符合Reducer。两者是一样的。

再来看任务提交代码:
 1     Job job = new Job(conf, "word count"); 2     job.setJarByClass(WordCount.class); 3     job.setMapperClass(TokenizerMapper.class); 4     job.setCombinerClass(IntSumReducer.class); 5     job.setReducerClass(IntSumReducer.class); 6     job.setOutputKeyClass(Text.class); 7     job.setOutputValueClass(IntWritable.class); 8     FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 9     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));10     System.exit(job.waitForCompletion(true) ? 0 : 1);
第1行创建一个Job对象,Job是MapReduce中提供的一个任务类,其声明为:
public class Job extends JobContext {    public static enum JobState {DEFINE, RUNNING};  private JobState state = JobState.DEFINE;  private JobClient jobClient;  private RunningJob info;.......
之后,设置该任务的运行类,也就是WordCount这个类;

然后设置Map、Combiner、Reduce三个实现类;

之后,设置输出Key和Value的类,这两个类表明了MapReduce任务完毕后的结果。

Key即单词,为一个Text对象,Text是Hadoop提供的一个可以序列化的文本类;

Value为计数,为一个IntWritable对象,IntWritable是Hadoop提供的一个可以序列化的整数类。

之所以不用普通的String和int,是因为输出Key、 Value需要写入HDFS,因此Key和Value都要可写,这种可写能力在Hadoop中使用一个接口Writable表示,其实就相当于序列化,换 句话说,Key、Value必须得有可序列化的能力。Writable的声明为:
public interface Writable {  /**    * Serialize the fields of this object to <code>out</code>.   *    * @param out <code>DataOuput</code> to serialize this object into.   * @throws IOException   */  void write(DataOutput out) throws IOException;  /**    * Deserialize the fields of this object from <code>in</code>.     *    * <p>For efficiency, implementations should attempt to re-use storage in the    * existing object where possible.</p>   *    * @param in <code>DataInput</code> to deseriablize this object from.   * @throws IOException   */  void readFields(DataInput in) throws IOException;}
在第8、9行,还设置了要计算的文件在HDFS中的路径,设定好这些配置和参数后,执行任务提交:job.waitForCompletion(true)
waitForCompletion是Job类中实现的一个方法:
  public boolean waitForCompletion(boolean verbose                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {    if (state == JobState.DEFINE) {      submit();    }    if (verbose) {      jobClient.monitorAndPrintJob(conf, info);    } else {      info.waitForCompletion();    }    return isSuccessful();  }
即执行submit函数:
  public void submit() throws IOException, InterruptedException,                               ClassNotFoundException {    ensureState(JobState.DEFINE);    setUseNewAPI();        // Connect to the JobTracker and submit the job    connect();    info = jobClient.submitJobInternal(conf);    super.setJobID(info.getID());    state = JobState.RUNNING;   }  
其中,调用jobClient对象的submitJobInternal方法进行任务提交。jobClient是 JobClient对象,在执行connect()的时候即创建出来:
  private void connect() throws IOException, InterruptedException {    ugi.doAs(new PrivilegedExceptionAction<Object>() {      public Object run() throws IOException {        jobClient = new JobClient((JobConf) getConfiguration());            return null;      }    });  }
创建JobClient的参数是这个任务的配置信息,JobClient是MapReduce任务的客户端部分,主要用于提交任务等等。而具体的任务提交在submitJobInternal方法中实现,关于submitJobInternal的具体实现,包括MapReduce的任务执行流程,较为复杂,留作下一节描述。

关于MapReduce的这一流程,我们也可以看出一些特点:

1、 Map任务之间是不通信的,这与传统的MPI(Message Passing Interface)存在本质区别,这就要求划分后的子任务具有独立性。这个要求一方面限制了MapReduce的应用场合,但另一方面对于任务执行出错 后的处理十分方便,比如执行某个Map任务的机器挂掉了,可以不管其他Map任务,重新在另一台机器上执行一遍即可。因为底层的数据在HDFS里面,有3 份备份,所以数据冗余搭配上Map的重执行这一能力,可以将集群计算的容错性相比MPI而言大大增强。后续博文会对MPI进行剖析,也会对 MapReduce与传统高性能计算中的并行计算框架进行比较。

2、Map任务的分配与数据 的分布关系十分密切,对于上面的例子,这个100T的大文件分布在多台机器上,MapReduce框架会根据文件的实际存储位置分配Map任务,这一过程 需要对HDFS有好的理解,在后续博文中会对HDFS中进行剖析。到时候,能更好滴理解MapReduce框架。因为两者是搭配起来使用的。

3、 MapReduce的输入数据来自于HDFS,输出结果也写到HDFS。如果一个任务很复杂,需要分成很多个MapReduce阶段,那么就需要来来回回 地从磁盘中搬移数据的过程,速度很慢,后续博文会对Spark这一内存计算框架进行剖析,到时候,能更好滴理解MapReduce性能。

4、MapReduce的输入数据和输出结果也可以来自于HBase,HBase本身搭建于HDFS之上(理论上也可以搭建于其他文件系统),这种应用场合大多需要MapReduce处理一些海量结构化数据。后续博文会对HBase进行剖析。