首页 > 代码库 > Hadoop MapReduce链式实践--ChainReducer
Hadoop MapReduce链式实践--ChainReducer
版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0。
场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据:
data1:
A,10 A,11 A,12 A,13 B,21 B,31 B,41 B,51data2:
A,20 A,21 A,22 A,23 B,201 B,301 B,401 B,501最后输出为:
A,23 B,501假如这样的逻辑的mapreduce数据流如下:
假设C组数据比较多,同时假设集群有2个节点,那么这个任务分配2个reducer,且C组数据平均分布到两个reducer中,(这样做是为了效率考虑,如果只有一个reducer,那么当一个节点在运行reducer的时候另外一个节点会处于空闲状态)那么如果在reducer之后,还可以再次做一个reducer,那么不就可以整合数据到一个文件了么,同时还可以再次比较C组数据中,以得到真正比较大的数据。
首先说下,不用上面假设的方式进行操作,那么一般的操作方法。一般有两种方法:其一,直接读出HDFS数据,然后进行整合;其二,新建另外一个Job来进行整合。这两种方法,如果就效率来说的话,可能第一种效率会高点。
考虑到前面提出的mapreduce数据流,以前曾对ChainReducer有点印象,好像可以做这个,所以就拿ChainReducer来试,同时为了学多点知识,也是用了多个Mapper(即使用ChainMapper)。
主程序代码如下:
package chain; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.ChainMapper; import org.apache.hadoop.mapred.lib.ChainReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ChainDriver2 extends Configured implements Tool{ /** * ChainReducer 实战 * 验证多个reducer的整合 * 逻辑:寻找最大值 * @param args */ private String input=null; private String output=null; private String delimiter=null; private int reducer=1; public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new ChainDriver2(),args); } @Override public int run(String[] arg0) throws Exception { configureArgs(arg0); checkArgs(); Configuration conf = getConf(); conf.set("delimiter", delimiter); JobConf job= new JobConf(conf,ChainDriver2.class); ChainMapper.addMapper(job, MaxMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, new JobConf(false)) ; ChainMapper.addMapper(job, MergeMaxMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, new JobConf(false)); ChainReducer.setReducer(job, MaxReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, new JobConf(false)); ChainReducer.addMapper(job, MergeMaxMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, new JobConf(false)); job.setJarByClass(ChainDriver2.class); job.setJobName("ChainReducer test job"); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /* job.setMapperClass(MaxMapper.class); job.setReducerClass(MaxReducer.class);*/ job.setInputFormat(TextInputFormat.class);; job.setOutputFormat(TextOutputFormat.class); job.setNumReduceTasks(reducer); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); JobClient.runJob(job); return 0; } /** * check the args */ private void checkArgs() { if(input==null||"".equals(input)){ System.out.println("no input..."); printUsage(); System.exit(-1); } if(output==null||"".equals(output)){ System.out.println("no output..."); printUsage(); System.exit(-1); } if(delimiter==null||"".equals(delimiter)){ System.out.println("no delimiter..."); printUsage(); System.exit(-1); } if(reducer==0){ System.out.println("no reducer..."); printUsage(); System.exit(-1); } } /** * configuration the args * @param args */ private void configureArgs(String[] args) { for(int i=0;i<args.length;i++){ if("-i".equals(args[i])){ input=args[++i]; } if("-o".equals(args[i])){ output=args[++i]; } if("-delimiter".equals(args[i])){ delimiter=args[++i]; } if("-reducer".equals(args[i])){ try { reducer=Integer.parseInt(args[++i]); } catch (Exception e) { reducer=0; } } } } public static void printUsage(){ System.err.println("Usage:"); System.err.println("-i input \t cell data path."); System.err.println("-o output \t output data path."); System.err.println("-delimiter data delimiter , default is blanket ."); System.err.println("-reducer reducer number , default is 1 ."); } }
MaxMapper:
package chain; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MaxMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>{ private Logger log = LoggerFactory.getLogger(MaxMapper.class); private String delimiter=null; @Override public void configure(JobConf conf){ delimiter=conf.get("delimiter"); log.info("delimiter:"+delimiter); log.info("This is the begin of MaxMapper"); } @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException { // TODO Auto-generated method stub String[] values= value.toString().split(delimiter); log.info(values[0]+"-->"+values[1]); out.collect(new Text(values[0]), new IntWritable(Integer.parseInt(values[1]))); } public void close(){ log.info("This is the end of MaxMapper"); } }
MaxReducer:
package chain; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MaxReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{ private Logger log = LoggerFactory.getLogger(MaxReducer.class); @Override public void configure(JobConf conf){ log.info("This is the begin of the MaxReducer"); } @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException { // TODO Auto-generated method stub int max=-1; while(values.hasNext()){ int value=http://www.mamicode.com/values.next().get();>
MergeMaxMapper:package chain; import java.io.IOException; //import java.util.ArrayList; //import java.util.HashMap; //import java.util.Map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MergeMaxMapper extends MapReduceBase implements Mapper<Text ,IntWritable,Text,IntWritable>{ private Logger log = LoggerFactory.getLogger(MergeMaxMapper.class); // private Map<Text,ArrayList<IntWritable>> outMap= new HashMap<Text,ArrayList<IntWritable>>(); @Override public void configure(JobConf conf){ log.info("This is the begin of MergeMaxMapper"); } @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException { log.info(key.toString()+"_MergeMaxMapper"+"-->"+value.get()); out.collect(new Text(key.toString()+"_MergeMaxMapper"), value); } @Override public void close(){ log.info("this is the end of MergeMaxMapper"); } }
编程思路如下:原始测试数据data1、data2首先经过MaxMapper(由于两个文件,所以生成了2个map),然后经过MergeMaxMapper,到MaxReducer,最后再次经过MergeMaxMapper。在程序中添加了输出数据的log,可以通过log来查看各个map和reduce的数据流程。
mapper端的log(其中的一个mapper):
2014-05-14 17:23:51,307 INFO [main] chain.MaxMapper: delimiter:, 2014-05-14 17:23:51,307 INFO [main] chain.MaxMapper: This is the begin of MaxMapper 2014-05-14 17:23:51,454 INFO [main] chain.MergeMaxMapper: This is the begin of MergeMaxMapper 2014-05-14 17:23:51,471 INFO [main] chain.MaxMapper: A-->20 2014-05-14 17:23:51,476 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->20 2014-05-14 17:23:51,476 INFO [main] chain.MaxMapper: A-->21 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->21 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: A-->22 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->22 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: A-->23 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->23 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: B-->201 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->201 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: B-->301 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->301 2014-05-14 17:23:51,478 INFO [main] chain.MaxMapper: B-->401 2014-05-14 17:23:51,478 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->401 2014-05-14 17:23:51,478 INFO [main] chain.MaxMapper: B-->501 2014-05-14 17:23:51,478 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->501 2014-05-14 17:23:51,481 INFO [main] chain.MaxMapper: This is the end of MaxMapper 2014-05-14 17:23:51,481 INFO [main] chain.MergeMaxMapper: this is the end of MergeMaxMapper
通过上面log,可以看出,通过ChainMapper添加mapper的方式的mapper的处理顺序为:首先初始化第一个mapper(即调用configure方法);接着初始第二个mapper(调用configure方法);然后开始map函数,map函数针对一条记录,首先采用mapper1进行处理,然后使用mapper2进行处理;最后是关闭阶段,关闭的顺序同样是首先关闭mapper1(调用close方法),然后关闭mapper2。reducer端的log(其中一个reducer)
2014-05-14 17:24:10,171 INFO [main] chain.MergeMaxMapper: This is the begin of MergeMaxMapper 2014-05-14 17:24:10,311 INFO [main] chain.MaxReducer: This is the begin of the MaxReducer 2014-05-14 17:24:10,671 INFO [main] chain.MaxReducer: B_MergeMaxMapper-->501 2014-05-14 17:24:10,672 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper_MergeMaxMapper-->501 2014-05-14 17:24:10,673 INFO [main] chain.MergeMaxMapper: this is the end of MergeMaxMapper 2014-05-14 17:24:10,673 INFO [main] chain.MaxReducer: This is the end of the MaxReducer
通过上面的log可以看出,通过ChainReducer添加mapper的方式,其数据处理顺序为:首先初始化Reducer之后的Mapper,接着初始化Reducer(看configure函数即可知道);然后处理reducer,reducer的输出接着交给mapper处理;最后先关闭Mapper,接着关闭reducer。同时,注意到,reducer后面的mapper也是两个的,即有多少个reducer,就有多少个mapper。
通过实验得到上面的ChainReducer的数据处理流程,且ChainReducer没有addReducer的方法,也即是不能添加reducer了,那么最开始提出的mapreduce数据流程就不能采用这种方式实现了。
最后,前面提出的mapreduce数据流程应该是错的,在reducer out里面C组数据不会被拆分为两个reducer,相同的key只会向同一个reducer传输。这里同样做了个试验,通过对接近90M的数据(只有一个分组A)执行上面的程序,可以看到有2个mapper,2个reducer(此数值为设置值),但是在其中一个reducer中并没有A分组的任何数据,在另外一个reducer中才有数据。其实,不用试验也是可以的,以前看的书上一般都会说相同的key进入同一个reducer中。不过,如果是这样的话,那么这样的数据效率应该不高。
返回最开始提出的场景,最开始提出的问题,如果相同的key只会进入一个reducer中,那么最后的2个数据文件(2个reducer生成2个数据文件)其实里面不会有key冲突的数据,所以在进行后面的操作的时候可以直接读多个文件即可,就像是读一个文件一样。
会产生这样的认知错误,应该是对mapreduce 原理不清楚导致。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990