首页 > 代码库 > 【原创】MapReduce计数器

【原创】MapReduce计数器

MapReduce框架内置了一些计数器的支持,当然,我们也可以设置自己的计数器用来满足一些特殊的要求。

其实计数器可以用来完成很多事,关键要看你如何用,例如你想知道map输入数据的指定记录特定的信息有多少可以设置计数,还有,在MR执行过程中,一些特定事件的发生次数也可以记录。使用计数器的莫大好处在于整个计数的过程只需要再map阶段就可以完成,而且也可以不做任何输出,可以快速的得到自己想要的一些计数结果。但并不是计数器可以设置为无限多,因为计数器过多会影响JT的效率,甚至可能被自定义的分析程序拖垮。

  • 计数器原理

计数器的信息是存储再JobTracker中的内存中的,TaskTracker执行任务时会对设定的信息进行计数,按照既定的条件对计数器进行累加,并聚合汇报给JT。JT在工作完成的时候做整体聚合。

  • 程序实例

首先需要定义个枚举类:

package zebra.shlgao.counters;public enum MyCounter {    CounterA,CounterB}

然后在MR程序中分别计数不同Counter的数量:

package zebra.shlgao.counters;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TestCounter {    public static class CounterMapper extends Mapper<Object, Text, Text, Text>{                @Override        protected void map(Object key, Text value,Context context)                throws IOException, InterruptedException {            // TODO Auto-generated method stub            String txt = value.toString();            if (txt.contains("java")){                context.getCounter(MyCounter.CounterA).increment(1);            }else{                context.getCounter(MyCounter.CounterB).increment(2);            }//            context.write(new Text(key), value);        }            }    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException{        Configuration conf = new Configuration();        Job job = new Job(conf, "testCounter");        job.setJarByClass(TestCounter.class);        job.setMapperClass(CounterMapper.class);//        job.setNumReduceTasks(0);        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:19000/testdir/file22m"));        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:19000/testdir/file22mResult"));        int finish = job.waitForCompletion(true) ? 0 : 1 ;        FileSystem fs  =  FileSystem.get(URI.create("hdfs://localhost:19000/testdir/file22mResult"),conf);        fs.delete(new Path("hdfs://localhost:19000/testdir/file22mResult"),true);//删除空的输出路径        System.exit(finish);    }}

由于这里是快速计数,所以可以不必做任何输出,但是在配置Job的时候必须定义输出路径,所以可以在最后将空的输出路径删除。

【原创】MapReduce计数器