首页 > 代码库 > Hadoop日记Day15---MapReduce新旧api的比较
Hadoop日记Day15---MapReduce新旧api的比较
hadoop版本0.x的包一般是mapred
package old;import java.io.IOException;import java.net.URI;import java.util.Iterator;import mapreduce.WordCountApp;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;/** * hadoop版本1.x的包一般是mapreduce * hadoop版本0.x的包一般是mapred * */public class OldAPP { static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; static final String OUT_PATH = "hdfs://hadoop:9000/out"; /** * 改动: * 1.不再使用Job,而是使用JobConf * 2.类的包名不再使用mapreduce,而是使用mapred * 3.不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job); * */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final JobConf job = new JobConf(conf , WordCountApp.class); //1.1指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 //job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); //map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 //job.setMapOutputKeyClass(Text.class); //job.setMapOutputValueClass(LongWritable.class); //1.3 分区 //job.setPartitionerClass(HashPartitioner.class); //有一个reduce任务运行 //job.setNumReduceTasks(1); //1.4 TODO 排序、分组 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); //指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath); //指定输出文件的格式化类 //job.setOutputFormatClass(TextOutputFormat.class); //把job提交给JobTracker运行 JobClient.runJob(job); } /** * 新api:extends Mapper * 老api:extends MapRedcueBase implements Mapper */ static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{ @Override public void map(LongWritable k1, Text v1, OutputCollector<Text, LongWritable> collector, Reporter reporter) throws IOException { final String[] splited = v1.toString().split("\t"); for (String word : splited) { collector.collect(new Text(word), new LongWritable(1)); } } } static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{ @Override public void reduce(Text k2, Iterator<LongWritable> v2s, OutputCollector<Text, LongWritable> collector, Reporter reporter) throws IOException { long times = 0L; while (v2s.hasNext()) { final long temp = v2s.next().get(); times += temp; } collector.collect(k2, new LongWritable(times)); } }}
代码 1.1
一、自定义Mapper类的不同
在新api中,是继承类org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>。在旧api中,是继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口 org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。在新api中,覆盖的map方法的第三个参数是Context类;在旧api中,覆盖的map方法的第三、四个形参分别是OutputCollector和Reporter类。在新api的Context中已经把两个类的功能合并到一起了,用户操作更简单。使用旧api的自定义Mapper类,如代码1.2所示所示。key、value对。每一个键值对调用一次map函数。
1 /** 2 * 新api:extends Mapper 3 * 老api:extends MapRedcueBase implements Mapper 4 */ 5 static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{ 6 @Override 7 public void map(LongWritable k1, Text v1, 8 OutputCollector<Text, LongWritable> collector, Reporter reporter) 9 throws IOException {10 final String[] splited = v1.toString().split("\t");11 for (String word : splited) {12 collector.collect(new Text(word), new LongWritable(1));13 }14 }15 }
代码 1.2
二、自定义Reducer类的不同
在新api中,是继承类org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>。在旧api中,是继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口 org.apache.hadoop.mapred. Reducer<K1, V1, K2, V2>。在新api中覆盖的reduce方法的第二个参数是java.lang.Iterable<VALUEIN>。在旧api中,覆盖的 reduce方法的第二个参数是java.util.Iterator<V 2>。前者可以使用增强for循环进行处理,后者只能使用 while循环处理了。在新api中,覆盖的reduce方法的第三个参数是Context类;在旧api中,覆盖的reduce方法的第三、四个形参分别是OutputCollector和Reporter类。在新api的Context中已经把两个类的功能合并到一起了,用户操作更简单。使用旧api的自定义Reducer类,代码如2.1所示。
1 static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{ 2 @Override 3 public void reduce(Text k2, Iterator<LongWritable> v2s, 4 OutputCollector<Text, LongWritable> collector, Reporter reporter) 5 throws IOException { 6 long times = 0L; 7 while (v2s.hasNext()) { 8 final long temp = v2s.next().get(); 9 times += temp;10 }11 collector.collect(k2, new LongWritable(times));12 }13 }
代码 2.1
三、 驱动代码main方法的不同
在新api中,驱动代码主要是通过org.apache.hadoop.mapreduce.Job类实现的,通过该类管理各种配置,然后调用waitForCompleti on(boolean)方法把代码提交给JobTracker执行。在旧api中,驱动代码主要是通过 org.apache.hadoop.mapred.JobConf.JobConf(Con figuration, Class)类实现的,通过该类管理各种配置。对于job的提交,是通过org.apache.hadoop.mapred.JobClient类的 runJob(JobC onf)方法实现的。可见,新api中把JobConf和JobClient的功能进行了合并,用户调用更方便。
其中,JobConf类与Job类的方法名称几乎一致,只是传递的形参类型大不相同了。在新api中的Job类,要求setXXX(…)的形参必须是org .apache.hadoop.mapreduce及其子包下面的类;而旧api中的JobConf类,要求setXXX(…)的形参必须是 org.apache.hadoop.mapred及其子包下面的类。使用旧api的驱动代码main方法,如代码3.1所示。
1 package old; 2 3 import java.io.IOException; 4 import java.net.URI; 5 import java.util.Iterator; 6 7 import mapreduce.WordCountApp; 8 9 import org.apache.hadoop.conf.Configuration;10 import org.apache.hadoop.fs.FileSystem;11 import org.apache.hadoop.fs.Path;12 import org.apache.hadoop.io.LongWritable;13 import org.apache.hadoop.io.Text;14 import org.apache.hadoop.mapred.FileInputFormat;15 import org.apache.hadoop.mapred.FileOutputFormat;16 import org.apache.hadoop.mapred.JobClient;17 import org.apache.hadoop.mapred.JobConf;18 import org.apache.hadoop.mapred.MapReduceBase;19 import org.apache.hadoop.mapred.Mapper;20 import org.apache.hadoop.mapred.OutputCollector;21 import org.apache.hadoop.mapred.Reducer;22 import org.apache.hadoop.mapred.Reporter;23 import org.apache.hadoop.mapred.TextInputFormat;24 import org.apache.hadoop.mapred.TextOutputFormat;25 import org.apache.hadoop.mapred.lib.HashPartitioner;26 /**27 * hadoop版本1.x的包一般是mapreduce28 * hadoop版本0.x的包一般是mapred29 *30 */31 public class OldAPP {32 static final String INPUT_PATH = "hdfs://hadoop:9000/hello";33 static final String OUT_PATH = "hdfs://hadoop:9000/out";34 /**35 * 改动:36 * 1.不再使用Job,而是使用JobConf37 * 2.类的包名不再使用mapreduce,而是使用mapred38 * 3.不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job);39 * 40 */41 public static void main(String[] args) throws Exception {42 43 Configuration conf = new Configuration();44 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);45 final Path outPath = new Path(OUT_PATH);46 if(fileSystem.exists(outPath)){47 fileSystem.delete(outPath, true);48 }49 50 final JobConf job = new JobConf(conf , WordCountApp.class);51 52 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里53 job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类54 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略55 job.setMapOutputValueClass(LongWritable.class);56 job.setPartitionerClass(HashPartitioner.class);//1.3 分区57 job.setNumReduceTasks(1);//有一个reduce任务运行58 job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类59 job.setOutputKeyClass(Text.class);//指定reduce的输出类型60 job.setOutputValueClass(LongWritable.class);61 FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里62 JobClient.runJob(job);//把job提交给JobTracker运行63 }64 65 66 67 /**68 * 新api:extends Mapper69 * 老api:extends MapRedcueBase implements Mapper70 */71 static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{72 @Override73 public void map(LongWritable k1, Text v1,74 OutputCollector<Text, LongWritable> collector, Reporter reporter)75 throws IOException {76 final String[] splited = v1.toString().split("\t");77 for (String word : splited) {78 collector.collect(new Text(word), new LongWritable(1));79 }80 }81 }82 83 static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{84 @Override85 public void reduce(Text k2, Iterator<LongWritable> v2s,86 OutputCollector<Text, LongWritable> collector, Reporter reporter)87 throws IOException {88 long times = 0L;89 while (v2s.hasNext()) {90 final long temp = v2s.next().get();91 times += temp;92 }93 collector.collect(k2, new LongWritable(times));94 }95 }96 }
代码 3.1
Hadoop日记Day15---MapReduce新旧api的比较