首页 > 代码库 > Hadoop的ChainMapper和ChainReducer使用案例(链式处理)

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)

 

  不多说,直接上干货!

     Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项目源自Lucene,自然也借鉴了一些Lucene中的处理方式。

    举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的

 

 

注意:

  1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。 

  2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。

比如:

  Map1 -> Map2 -> Reducer -> Map3 -> Map4

(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)

技术分享

 

 

任务介绍:

  这个任务需要两步完成:

  1. 对一篇文章进行WordCount

  2. 统计出现次数超过5词的单词

 

WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:

 

 

 

 

Java代码

  1. package hadoop_in_action_exersice;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.Iterator;  
  5. import java.util.StringTokenizer;  
  6.   
  7. import org.apache.hadoop.fs.FileSystem;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.IntWritable;  
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapred.FileInputFormat;  
  13. import org.apache.hadoop.mapred.FileOutputFormat;  
  14. import org.apache.hadoop.mapred.JobClient;  
  15. import org.apache.hadoop.mapred.JobConf;  
  16. import org.apache.hadoop.mapred.MapReduceBase;  
  17. import org.apache.hadoop.mapred.Mapper;  
  18. import org.apache.hadoop.mapred.OutputCollector;  
  19. import org.apache.hadoop.mapred.Reducer;  
  20. import org.apache.hadoop.mapred.Reporter;  
  21. import org.apache.hadoop.mapred.TextInputFormat;  
  22. import org.apache.hadoop.mapred.TextOutputFormat;  
  23.   
  24. public class ChainedJobs {  
  25.   
  26.     public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  
  27.   
  28.         private final static IntWritable one = new IntWritable(1);  
  29.         public static final int LOW_LIMIT = 5;  
  30.         @Override  
  31.         public void map(LongWritable key, Text value,  
  32.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
  33.                 throws IOException {  
  34.             String line = value.toString();  
  35.             StringTokenizer st = new StringTokenizer(line);  
  36.             while(st.hasMoreTokens())  
  37.                 output.collect(new Text(st.nextToken()), one);  
  38.               
  39.         }  
  40.           
  41.     }  
  42.       
  43.     public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {  
  44.   
  45.         @Override  
  46.         public void reduce(Text key, Iterator<IntWritable> values,  
  47.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
  48.                 throws IOException {  
  49.             int sum = 0;  
  50.             while(values.hasNext()) {  
  51.                 sum += values.next().get();  
  52.             }  
  53.             output.collect(key, new IntWritable(sum));  
  54.         }  
  55.           
  56.     }  
  57.       
  58.       
  59.     public static void main(String[] args) throws IOException {  
  60.           
  61.           
  62.         JobConf conf = new JobConf(ChainedJobs.class);  
  63.         conf.setJobName("wordcount");           //设置一个用户定义的job名称  
  64.         conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  
  65.         conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  
  66.         conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类  
  67.         conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类  
  68.         conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类  
  69.         conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  
  70.         conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  
  71.   
  72.         // Remove output folder before run job(s)  
  73.         FileSystem fs=FileSystem.get(conf);  
  74.         String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";  
  75.         Path op=new Path(outputPath);          
  76.         if (fs.exists(op)) {  
  77.             fs.delete(op, true);  
  78.             System.out.println("存在此输出路径,已删除!!!");  
  79.         }  
  80.           
  81.         FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));  
  82.         FileOutputFormat.setOutputPath(conf, new Path(outputPath));  
  83.         JobClient.runJob(conf);         //运行一个job  
  84.     }  
  85.       
  86. }  

 

     上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。

 

 

为了方便理解,上面的输入的例子如下:

Java代码

  1. accessed    3  
  2. accessible  4  
  3. accomplish  1  
  4. accounting  7  
  5. accurately  1  
  6. acquire 1  
  7. across  1  
  8. actual  1  
  9. actually    1  
  10. add 3  
  11. added   2  
  12. addition    1  
  13. additional  4  

 

 

    old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X 

新的API会方便简洁很多

    下面是增加了一个Mapper 来过滤

Java代码

  1. public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {  
  2.   
  3.     @Override  
  4.     public void map(Text key, IntWritable value,  
  5.             OutputCollector<Text, IntWritable> output, Reporter reporter)  
  6.             throws IOException {  
  7.           
  8.         if(value.get() >= LOW_LIMIT) {  
  9.             output.collect(key, value);  
  10.         }  
  11.           
  12.     }  
  13. }  

 

 

     这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出

所以,目前为止,任务链如下:

TokenizerMapper -> TokenizeReducer -> RangeFilterMapper 

 

    所以我们的main函数改成下面的样子:

Java代码

  1. public static void main(String[] args) throws IOException {  
  2.       
  3.       
  4.     JobConf conf = new JobConf(ChainedJobs.class);  
  5.     conf.setJobName("wordcount");           //设置一个用户定义的job名称  
  6. //        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  
  7. //        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  
  8. //        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类  
  9. //        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类  
  10. //        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类  
  11. //        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  
  12. //        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  
  13.   
  14.     // Step1 : mapper forr word count   
  15.     JobConf wordCountMapper  = new JobConf(false);  
  16.     ChainMapper.addMapper(conf,   
  17.             TokenizeMapper.class,   
  18.             LongWritable.class,     // input key type   
  19.             Text.class,             // input value type  
  20.             Text.class,             // output key type  
  21.             IntWritable.class,      // output value type  
  22.             false,                  //byValue or byRefference 传值还是传引用  
  23.             wordCountMapper);  
  24.       
  25.     // Step2: reducer for word count  
  26.     JobConf wordCountReducer  = new JobConf(false);  
  27.     ChainReducer.setReducer(conf,   
  28.             TokenizeReducer.class,   
  29.             Text.class,   
  30.             IntWritable.class,   
  31.             Text.class,   
  32.             IntWritable.class,   
  33.             false,   
  34.             wordCountReducer);  
  35.       
  36.         // Step3: mapper used as filter  
  37.     JobConf rangeFilterMapper  = new JobConf(false);  
  38.     ChainReducer.addMapper(conf,   
  39.             RangeFilterMapper.class,   
  40.             Text.class,   
  41.             IntWritable.class,   
  42.             Text.class,   
  43.             IntWritable.class,   
  44.             false,   
  45.             rangeFilterMapper);  
  46.       
  47.       
  48.     // Remove output folder before run job(s)  
  49.     FileSystem fs=FileSystem.get(conf);  
  50.     String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";  
  51.     Path op=new Path(outputPath);          
  52.     if (fs.exists(op)) {  
  53.         fs.delete(op, true);  
  54.         System.out.println("存在此输出路径,已删除!!!");  
  55.     }  
  56.       
  57.     FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));  
  58.     FileOutputFormat.setOutputPath(conf, new Path(outputPath));  
  59.     JobClient.runJob(conf);         //运行一个job  
  60. }  

 

 

 

 

 

下面是运行结果的一部分:

Java代码

  1. a   40  
  2. and 26  
  3. are 12  
  4. as  6  
  5. be  7  
  6. been    8  
  7. but 5  
  8. by  5  
  9. can 12  
  10. change  5  
  11. data    5  
  12. files   7  
  13. for 28  
  14. from    5  
  15. has 7  
  16. have    8  
  17. if  6  
  18. in  27  
  19. is  16  
  20. it  13  
  21. more    8  
  22. not 5  
  23. of  23  
  24. on  5  
  25. outputs 5  
  26. see 6  
  27. so  11  
  28. that    11  
  29. the 54  

 

     可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。

 

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)