首页 > 代码库 > MapReduce编程小结
MapReduce编程小结
(1)key-value到map端比较容易,每个分片都会交由一个MapTask,而每个分片由InputFormat(一般是FileInputFormat)决定(一般是64M),
每个MapTask会调用N次map函数,具体是多少次map函数呢?
由job.setInputFormatClass(?)中?决定,默认是TextInputFormat.class,TextInputFormat是以一行为解析对象,一行对应一个map函数的调用。
(2)key-value在reduce端比较复杂,第二参数是Iterable<?>对象,涉及<key,list{value1,value2...}>,它对应一次reduce函数的调用,
也就是说,一次reduce函数调用将会处理一个key,多个value,
(3)而这个<key,list{value1,value2...}>输入是如何来的呢?
mapreduce框架自带了预定义key(Text、LongWritable等)的排序,
将来自不同MapTask的相同的key加以聚合,变为<key,list{value1,value2...}>作为reduce函数的输入。
(4)说了MapTask个数有分片决定,那ReduceTask将由什么决定呢?
每个map函数执行后都会调用一次getPartition函数(默认是HashPartitioner类的)来获取分区号,最终写入磁盘文件带有分区号这条尾巴,以便reduce端的拉取,
而getPartition函数中最重要的参数numReduceTasks将由job.setNumReduceTasks决定,默认值为1,
故若不设置此参数很多情况下getPartition函数会返回0,也就对应一个ReduceTask。
(5)说完了分区,再来说分组。分区是在map端确定,相对于每个map函数,而分组却放到了reduce端,相对于多个MapTask,组属于区。
分组会影响什么呢?
(6)当map端的输出key是自定义NewK2时,且自定义了compareTo,使用分组后,
将使用分组类MyGroupingComparator的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)进行sort,
得到<key,list{value1,value2...}>。
附上一个例子:
package examples; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class GroupApp { static final String INPUT_PATH = "hdfs://192.168.2.100:9000/hello"; static final String OUTPUT_PATH = "hdfs://192.168.2.100:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUTPUT_PATH); if(fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } final Job job = new Job(conf, GroupApp.class.getSimpleName()); job.setJarByClass(GroupApp.class); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NewK2.class); job.setMapOutputValueClass(LongWritable.class); job.setPartitionerClass(MyPartitoner.class); job.setNumReduceTasks(3); job.setGroupingComparatorClass(MyGroupingComparator.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } static class MyPartitoner extends HashPartitioner<NewK2, LongWritable> { public int getPartition(NewK2 key, LongWritable value, int numReduceTasks) { System.out.println("the getPartition() is called..."); if(key.first == 1) { return 0 % numReduceTasks; } else if(key.first == 2) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } static class NewK2 implements WritableComparable<NewK2> { Long first = 0L; Long second = 0L; public NewK2(){} public NewK2(long first, long second) { this.first = first; this.second = second; } public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } public void readFields(DataInput in) throws IOException { first = in.readLong(); second = in.readLong(); } public int compareTo(NewK2 o) { System.out.println("the compareTo() is called..."); final long minus = this.first - o.first; if(minus != 0) { return (int)minus; } return (int) (this.second - o.second); } } static class MyGroupingComparator implements RawComparator<NewK2> { public int compare(NewK2 o1, NewK2 o2) { // System.out.println("the compare() is called..."); return (int) (o1.first - o2.first); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { System.out.println("the compare() is called..."); return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } } static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable> { protected void map(LongWritable k1, Text v1, Context ctx) throws IOException, InterruptedException { final String[] splited = v1.toString().split("\t"); System.out.println("the map() is called..."); NewK2 k2 = new NewK2(Integer.parseInt(splited[0]), Integer.parseInt(splited[1])); LongWritable v2 = new LongWritable(Long.parseLong((splited[1]))); ctx.write(k2, v2);// System.out.println("the real map output...");// System.out.println("<"+k2.first+","+v2+">"); } } static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable> { long v3 = 0; protected void reduce(NewK2 k2, Iterable<LongWritable> v2s, Context ctx) throws IOException, InterruptedException { System.out.println("the reduce() is called..."); for(LongWritable secend : v2s) { v3 = secend.get(); System.out.println("<"+k2.first+","+k2.second+">, "+v3+""); } System.out.println("--------------------------------------------"); System.out.println("the real reduce output..."); System.out.println("<"+k2.first+","+v3+">"); ctx.write(new LongWritable(k2.first), new LongWritable(v3)); System.out.println("--------------------------------------------"); } }}
MapReduce编程小结