首页 > 代码库 > Mapreduce 学习

Mapreduce 学习

以WordCount为例

在类WordCount中实现两个内部静态类(Map,Reduce)

 

1、Map

public static class Map extends MapReduceBase implements            

Mapper<Object, Text, Text, IntWritable> {        

    private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

        public void map(Object key, Text value,  OutputCollector<Text, IntWritable> output, Reporter reporter)    throws IOException {            

   String line = value.toString();

        StringTokenizer tr= new StringTokenizer(line);  

        while (tr.hasMoreTokens()) {

                word.set(tr.nextToken());   

                output.collect(word, one);            

     }        

  }    

}

Map类主要实现了map方法,map方法中的参数类型与接口Mapper中的参数类型一一对应,这里我们没有使用reporter

算法中StringTokenizer是JAVA中用来分割字符串的一个类,默认的分类标记是制表符、空格、回车以及分类,

如果需要除这些之外的分类方式比如",",则可以哄一下方式将","这种划分规则加进来

StringTokenizer tr= new StringTokenizer(line,",");  

分析算法步骤

在这里对数据进行了切割,将切割的数据以key-value的形式输出

2、Reduce

public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

Reduce类主要实现reduce方法,这里接口Reducer参数中的数据类型与方法reduce中的参数类型相对应

分析算法步骤

这个过程对拥有相同key的数据进行了统计,这里使用的是统计总个数的时候使用sum+=values.next().get(),个人认为也可以直接使用sum +=1。

 

举一反三

1、统计下面数据中a,b的平均值

  a 2

      a 3

  b 20

      a 3

  b 100

     分析:这里使用StringTokenizer切割之后数据呈现 a 2 a 3 b 20 a 3 b 100的形式,可以考虑将数据以key-value(比如(a,2))形式传出来,map方法的设计就出来了

  平均值需要知道所有a的和以及a的个数,因此在迭代value的时候求和并计数就可以了

  Map类:

     private Text word = new Text();

     private IntWritable val= new IntWritable();

        public void map(Object key, Text value,  OutputCollector<Text, IntWritable> output, Reporter reporter)    throws IOException {            

   String line = value.toString();

        StringTokenizer tr= new StringTokenizer(line);  

        while (tr.hasMoreTokens()) {

                word.set(tr.nextToken());   

             val.set(integer.parseint(tr.nextToken());

                 output.collect(word, val);            

     }        

  }    

  Reduce类

  public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, FloatWritable> {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, FloatWritable> output, Reporter reporter)
                throws IOException {
            float sum = 0;

          int count= 0;
            while (values.hasNext()) {
                sum += values.next().get();

       count ++;
            }
            output.collect(key, new FloatWritable(sum/count));
        }
    }

2、对下列数据中的第二列数据进行处理

  a,2 ,3

      a,3,6

  b,5,20

      a ,3,23

  b,53,25

分析:数据的形式还是很有规律的,每三个一组,无非在在对切割后的数据做迭代的过程中有选择的传送数据,跟WordCount的思想还是一样

目前第一个举一反三的例子还没进行测试,有时间还得放到集群上测试测试

 

Mapreduce 学习