http://www.aboutyun.com/thread-8927-1-1.html
Mapreduce在hadoop中是一个比較难以的概念。以下须要用心看,然后自己就能总结出来了。
概括: combine和partition都是函数。中间的步骤应该仅仅有shuffle!
1.combine combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,能够自己定义的。 combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一个新的<key2,value2>.将新的<key2,value2>作为输入到reduce函数中 这个value2亦可称之为values,由于有多个。这个合并的目的是为了降低网络传输。
详细实现是由Combine类。 实现combine函数,该类的主要功能是合并同样的key键。通过job.setCombinerClass()方法设置。默觉得null,不合并中间结果。实现map函数 详细调用:(下图是调用reduce,合并map的个数)
难点:不知道这个reduce和mapreduce中的reduce差别是什么? 以下简单说一下:后面慢慢琢磨: 在mapreduce中。map多,reduce少。 在reduce中因为数据量比較多。所以干脆。我们先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。
这里举个样例: map与reduce的样例 map理解为销售人员,reduce理解为销售经理。 每一个人(map)仅仅管销售,赚了多少钱销售人员不统计。也就是说这个销售人员没有Combine,那么这个销售经理就累垮了。由于每一个人都没有统计,它须要统计全部人员卖了多少件。赚钱了多少钱。 这样是不行的。所以销售经理(reduce)为了减轻压力,每一个人(map)都必须统计自己卖了多少钱,赚了多少钱(Combine),然后经理所做的事情就是统计每一个人统计之后的结果。这样经理就轻松多了。所以Combine在map所做的事情。减轻了reduce的事情。 (这就是为什么说map的Combine干reduce的事情。相信你应该明确了)
public static void main(String[] args)throws IOException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class); job.setCombinerClass(reduce.class); job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(Reducer.class); job.setOutputFormatClass(TextOutFormat.class); } }
2.partition partition是切割map每一个节点的结果,依照key分别映射给不同的reduce。也是能够自己定义的。这里事实上能够理解归类。 我们对于错综复杂的数据归类。比方在动物园里有牛羊鸡鸭鹅。他们都是混在一起的。可是到了晚上他们就各自牛回牛棚。羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。仅仅只是在敲代码的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也能够自己定义。
HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。得到当前的目的reducer。 以下在看该怎样自己定义,该怎样调用:(以下便是自己定义了一个Partition函数。红字部分是算法的核心,也就是分区的核心)
public static class Partition extends Partitioner<IntWritable, IntWritable> { @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { int Maxnumber = 65223; int bound = Maxnumber / numPartitions + 1; int keynumber = key.get(); for (int i = 0; i < numPartitions; i++) { if (keynumber < bound * i && keynumber >= bound * (i - 1)) { return i - 1; } } return 0; }
} 那么我们该怎样调用:(以下调用之后,你的分区函数就生效了)
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "sort"); job.setJarByClass(Sort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setPartitionerClass(Partition.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in"); FileOutputFormat .setOutputPath(job, new Path("/home/asheng/hadoop/out")); job.waitForCompletion(true); } }
3.shuffle
shuffle就是map和reduce之间的过程。包括了两端的combine和partition。它比較难以理解,由于我们摸不着。看不到它。它仅仅是理论存在的。并且确实存在,它属于mapreduce的框架。编程的时候。我们用不到它,它属于mapreduce框架。具体能够看通过实例让你真正明确mapreduce---填空式、分布(切割)编程。
3.1shuffle的作用是 Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后。通过OutputFormat,进行输出 shuffle阶段的主要函数是fetchOutputs(),这个函数的功能就是将map阶段的输出,copy到reduce 节点本地。
|