首页 > 代码库 > Hadoop阅读笔记(二)——利用MapReduce求平均数和去重
Hadoop阅读笔记(二)——利用MapReduce求平均数和去重
前言:圣诞节来了,我怎么能虚度光阴呢?!依稀记得,那一年,大家互赠贺卡,短短几行字,字字融化在心里;那一年,大家在水果市场,寻找那些最能代表自己心意的苹果香蕉梨,摸着冰冷的水果外皮,内心早已滚烫。这一年……我在博客园-_-#,希望用dt的代码燃烧脑细胞,温暖小心窝。
上篇《Hadoop阅读笔记(一)——强大的MapReduce》主要介绍了MapReduce的在大数据集上处理的优势以及运行机制,通过专利数据编写Demo加深了对于MapReduce中输入输出数据结构的细节理解。有了理论上的指导,仍需手捧我的hadoop圣经——Hadoop实战2,继续走完未走过的路(有种怪怪的感觉,我一直在原地踏步)。
正文:实践是检验真理的唯一标准,不知道这是谁说的,但是作为码农,倒也是实用受用的座右铭。除却大牛们能够在阅读高深理论时new一个并发线程,眼睛所到之处,已然可以理清program的精髓所在,好似一台计算机扫描了所看到的代码一般(有点夸张^_^)。作为普罗大众的搬砖者来说,还是通过一些实例来加深对于理论的认识。
今天主要是通过以下两个例子:求平均成绩、去重来加深对MapReduce的理解。
1.如何用MapReduce求平均成绩——WordCount的加强版
在谈平均成绩之前我们回顾下属性的Hadoop HelloWorld程序——WordCount,其主要是统计数据集中各个单词出现的次数。因为次数没有多少之分,如果将这里的次数换成分数就将字数统计问题转化成了求每个个体的总成绩的问题,再加上一步(总成绩/学科数)运算就是这里要讨论的求平均数的问题了。在笔者看来,MapReduce是一种编程思维,它引导码农们如何将一个亟待解决的问题转换为一个MapReduce过程:map阶段输入什么、map过程执行什么、map阶段输出什么、reduce阶段输入什么、执行什么、输出什么。能够将以上几个点弄清楚整明白,一个MapReduce程序就会跃然纸上。这里:
Map: 指定格式的数据集(如"张三 60")——输入数据
执行每条记录的分割操作以key-value写入上下文context中——执行功能
得到指定键值对类型的输出(如"(new Text(张三),new IntWritable(60))")——输出结果
Reduce: map的输出——输入数据
求出单个个体的总成绩后再除以该个体课程数目——执行功能
得到指定键值对类型的输入——输出结果
鉴于上面的map和reduce过程,我们可以得到如下的代码:
1 public class Test1214 { 2 3 public static class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> { 4 public void map(LongWritable key, Text value, Context context){ 5 String line = value.toString(); 6 System.out.println("该行数据为:" + line); 7 StringTokenizer token = new StringTokenizer(line,"\t"); 8 String nameT = token.nextToken(); 9 int score = Integer.parseInt(token.nextToken());10 Text name = new Text(nameT);11 try {12 context.write(name, new IntWritable(score));13 } catch (IOException e) {14 e.printStackTrace();15 } catch (InterruptedException e) {16 e.printStackTrace();17 }18 }19 }20 21 public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{22 public void reduce(Text key, Iterable<IntWritable> value, Context context){23 int sum = 0;24 int count =0;25 for(IntWritable score : value){26 sum += score.get();27 count++;28 System.out.println("第" + count + "个数值为:" + score.get());29 }30 IntWritable avg = new IntWritable(sum/count);31 try {32 context.write(key, avg);33 } catch (IOException e) {34 e.printStackTrace();35 } catch (InterruptedException e) {36 e.printStackTrace();37 }38 }39 }40 /**41 * @param args42 * @throws IOException 43 * @throws ClassNotFoundException 44 * @throws InterruptedException 45 */46 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {47 48 Configuration conf = new Configuration();49 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();50 if (otherArgs.length != 2) {51 System.err.println("Usage: wordcount <in> <out>");52 System.exit(2);53 }54 Job job = new Job(conf, "Test1214");55 56 job.setJarByClass(Test1214.class);57 job.setMapperClass(MapperClass.class);58 job.setCombinerClass(ReducerClass.class);59 job.setReducerClass(ReducerClass.class);60 job.setOutputKeyClass(Text.class);61 job.setOutputValueClass(IntWritable.class);62 63 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));64 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));65 System.exit(job.waitForCompletion(true) ? 0 : 1);66 System.out.println("end");67 }68 69 }
数据集:这里的数据是码农我自己手工创建的,主要是想看看mapreduce的运行过程,所以就创建了两个文件,当然这里面的成绩也就没有什么是否符合正态分布的考虑了……
数据中设定有A-K共11个学生,共16门课程,具体数据如下:
NameScore1.txt:
A 55B 65C 44D 87E 66F 90G 70H 59I 61J 58K 40A 45B 62C 64D 77E 36F 50G 80H 69I 71J 70K 49A 51B 64C 74D 37E 76F 80G 50H 51I 81J 68K 80A 85B 55C 49D 67E 69F 50G 80H 79I 81J 68K 80A 35B 55C 40D 47E 60F 72G 76H 79I 68J 78K 50A 65B 45C 74D 57E 56F 50G 60H 59I 61J 58K 60A 85B 45C 74D 67E 86F 70G 50H 79I 81J 78K 60A 50B 69C 40D 89E 69F 95G 75H 59I 60J 59K 45
NameScore2.txt:
A 65B 75C 64D 67E 86F 70G 90H 79I 81J 78K 60A 65B 82C 84D 97E 66F 70G 80H 89I 91J 90K 69A 71B 84C 94D 67E 96F 80G 70H 71I 81J 98K 80A 85B 75C 69D 87E 89F 80G 70H 99I 81J 88K 60A 65B 75C 60D 67E 80F 92G 76H 79I 68J 78K 70A 85B 85C 74D 87E 76F 60G 60H 79I 81J 78K 80A 85B 65C 74D 67E 86F 70G 70H 79I 81J 78K 60A 70B 69C 60D 89E 69F 95G 75H 59I 60J 79K 65
其执行过程中控制台打印的信息为:
14/12/14 17:05:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=14/12/14 17:05:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 214/12/14 17:05:27 INFO mapred.JobClient: Running job: job_local_000114/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 214/12/14 17:05:27 INFO mapred.MapTask: io.sort.mb = 10014/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680该行数据为:A 55该行数据为:B 65该行数据为:C 44该行数据为:D 87该行数据为:E 66该行数据为:F 90该行数据为:G 70该行数据为:H 59该行数据为:I 61该行数据为:J 58该行数据为:K 40该行数据为:A 45该行数据为:B 62该行数据为:C 64该行数据为:D 77该行数据为:E 36该行数据为:F 50该行数据为:G 80该行数据为:H 69该行数据为:I 71该行数据为:J 70该行数据为:K 49该行数据为:A 51该行数据为:B 64该行数据为:C 74该行数据为:D 37该行数据为:E 76该行数据为:F 80该行数据为:G 50该行数据为:H 51该行数据为:I 81该行数据为:J 68该行数据为:K 80该行数据为:A 85该行数据为:B 55该行数据为:C 49该行数据为:D 67该行数据为:E 69该行数据为:F 50该行数据为:G 80该行数据为:H 79该行数据为:I 81该行数据为:J 68该行数据为:K 80该行数据为:A 35该行数据为:B 55该行数据为:C 40该行数据为:D 47该行数据为:E 60该行数据为:F 72该行数据为:G 76该行数据为:H 79该行数据为:I 68该行数据为:J 78该行数据为:K 50该行数据为:A 65该行数据为:B 45该行数据为:C 74该行数据为:D 57该行数据为:E 56该行数据为:F 50该行数据为:G 60该行数据为:H 59该行数据为:I 61该行数据为:J 58该行数据为:K 60该行数据为:A 85该行数据为:B 45该行数据为:C 74该行数据为:D 67该行数据为:E 86该行数据为:F 70该行数据为:G 50该行数据为:H 79该行数据为:I 81该行数据为:J 78该行数据为:K 60该行数据为:A 50该行数据为:B 69该行数据为:C 40该行数据为:D 89该行数据为:E 69该行数据为:F 95该行数据为:G 75该行数据为:H 59该行数据为:I 60该行数据为:J 59该行数据为:K 4514/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output第1个数值为:55第2个数值为:45第3个数值为:51第4个数值为:85第5个数值为:35第6个数值为:65第7个数值为:85第8个数值为:50第1个数值为:45第2个数值为:64第3个数值为:65第4个数值为:45第5个数值为:55第6个数值为:69第7个数值为:62第8个数值为:55第1个数值为:64第2个数值为:49第3个数值为:44第4个数值为:74第5个数值为:74第6个数值为:40第7个数值为:40第8个数值为:74第1个数值为:67第2个数值为:67第3个数值为:77第4个数值为:37第5个数值为:87第6个数值为:57第7个数值为:89第8个数值为:47第1个数值为:36第2个数值为:66第3个数值为:76第4个数值为:86第5个数值为:69第6个数值为:69第7个数值为:60第8个数值为:56第1个数值为:90第2个数值为:95第3个数值为:70第4个数值为:50第5个数值为:80第6个数值为:50第7个数值为:50第8个数值为:72第1个数值为:60第2个数值为:76第3个数值为:50第4个数值为:50第5个数值为:80第6个数值为:70第7个数值为:75第8个数值为:80第1个数值为:59第2个数值为:69第3个数值为:51第4个数值为:79第5个数值为:59第6个数值为:79第7个数值为:59第8个数值为:79第1个数值为:60第2个数值为:61第3个数值为:81第4个数值为:81第5个数值为:61第6个数值为:71第7个数值为:68第8个数值为:81第1个数值为:58第2个数值为:59第3个数值为:78第4个数值为:68第5个数值为:78第6个数值为:68第7个数值为:70第8个数值为:58第1个数值为:40第2个数值为:50第3个数值为:49第4个数值为:60第5个数值为:60第6个数值为:45第7个数值为:80第8个数值为:8014/12/14 17:05:28 INFO mapred.MapTask: Finished spill 014/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting14/12/14 17:05:28 INFO mapred.LocalJobRunner: 14/12/14 17:05:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done.14/12/14 17:05:28 INFO mapred.MapTask: io.sort.mb = 10014/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680该行数据为:A 65该行数据为:B 75该行数据为:C 64该行数据为:D 67该行数据为:E 86该行数据为:F 70该行数据为:G 90该行数据为:H 79该行数据为:I 81该行数据为:J 78该行数据为:K 60该行数据为:A 65该行数据为:B 82该行数据为:C 84该行数据为:D 97该行数据为:E 66该行数据为:F 70该行数据为:G 80该行数据为:H 89该行数据为:I 91该行数据为:J 90该行数据为:K 69该行数据为:A 71该行数据为:B 84该行数据为:C 94该行数据为:D 67该行数据为:E 96该行数据为:F 80该行数据为:G 70该行数据为:H 71该行数据为:I 81该行数据为:J 98该行数据为:K 80该行数据为:A 85该行数据为:B 75该行数据为:C 69该行数据为:D 87该行数据为:E 89该行数据为:F 80该行数据为:G 70该行数据为:H 99该行数据为:I 81该行数据为:J 88该行数据为:K 60该行数据为:A 65该行数据为:B 75该行数据为:C 60该行数据为:D 67该行数据为:E 80该行数据为:F 92该行数据为:G 76该行数据为:H 79该行数据为:I 68该行数据为:J 78该行数据为:K 70该行数据为:A 85该行数据为:B 85该行数据为:C 74该行数据为:D 87该行数据为:E 76该行数据为:F 60该行数据为:G 60该行数据为:H 79该行数据为:I 81该行数据为:J 78该行数据为:K 80该行数据为:A 85该行数据为:B 65该行数据为:C 74该行数据为:D 67该行数据为:E 86该行数据为:F 70该行数据为:G 70该行数据为:H 79该行数据为:I 81该行数据为:J 78该行数据为:K 60该行数据为:A 70该行数据为:B 69该行数据为:C 60该行数据为:D 89该行数据为:E 69该行数据为:F 95该行数据为:G 75该行数据为:H 59该行数据为:I 60该行数据为:J 79该行数据为:K 6514/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output第1个数值为:65第2个数值为:65第3个数值为:71第4个数值为:85第5个数值为:65第6个数值为:85第7个数值为:85第8个数值为:70第1个数值为:65第2个数值为:84第3个数值为:75第4个数值为:85第5个数值为:75第6个数值为:69第7个数值为:82第8个数值为:75第1个数值为:84第2个数值为:69第3个数值为:64第4个数值为:74第5个数值为:94第6个数值为:60第7个数值为:60第8个数值为:74第1个数值为:67第2个数值为:87第3个数值为:97第4个数值为:67第5个数值为:67第6个数值为:87第7个数值为:89第8个数值为:67第1个数值为:66第2个数值为:86第3个数值为:96第4个数值为:86第5个数值为:89第6个数值为:69第7个数值为:80第8个数值为:76第1个数值为:70第2个数值为:95第3个数值为:70第4个数值为:70第5个数值为:80第6个数值为:60第7个数值为:80第8个数值为:92第1个数值为:60第2个数值为:76第3个数值为:70第4个数值为:70第5个数值为:80第6个数值为:90第7个数值为:75第8个数值为:70第1个数值为:79第2个数值为:89第3个数值为:71第4个数值为:99第5个数值为:59第6个数值为:79第7个数值为:79第8个数值为:79第1个数值为:60第2个数值为:81第3个数值为:81第4个数值为:81第5个数值为:81第6个数值为:91第7个数值为:68第8个数值为:81第1个数值为:78第2个数值为:79第3个数值为:78第4个数值为:88第5个数值为:78第6个数值为:98第7个数值为:90第8个数值为:78第1个数值为:60第2个数值为:70第3个数值为:69第4个数值为:60第5个数值为:80第6个数值为:65第7个数值为:60第8个数值为:8014/12/14 17:05:28 INFO mapred.MapTask: Finished spill 014/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting14/12/14 17:05:28 INFO mapred.LocalJobRunner: 14/12/14 17:05:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0‘ done.14/12/14 17:05:28 INFO mapred.LocalJobRunner: 14/12/14 17:05:28 INFO mapred.Merger: Merging 2 sorted segments14/12/14 17:05:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes14/12/14 17:05:28 INFO mapred.LocalJobRunner: 第1个数值为:58第2个数值为:73第1个数值为:76第2个数值为:57第1个数值为:57第2个数值为:72第1个数值为:78第2个数值为:66第1个数值为:64第2个数值为:81第1个数值为:77第2个数值为:69第1个数值为:67第2个数值为:73第1个数值为:79第2个数值为:66第1个数值为:70第2个数值为:78第1个数值为:83第2个数值为:67第1个数值为:58第2个数值为:6814/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting14/12/14 17:05:28 INFO mapred.LocalJobRunner: 14/12/14 17:05:28 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now14/12/14 17:05:28 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/user/hadoop/output414/12/14 17:05:28 INFO mapred.LocalJobRunner: reduce > reduce14/12/14 17:05:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done.14/12/14 17:05:28 INFO mapred.JobClient: map 100% reduce 100%14/12/14 17:05:28 INFO mapred.JobClient: Job complete: job_local_000114/12/14 17:05:28 INFO mapred.JobClient: Counters: 1414/12/14 17:05:28 INFO mapred.JobClient: FileSystemCounters14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_READ=5057314/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_READ=263014/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_WRITTEN=10304614/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=5514/12/14 17:05:28 INFO mapred.JobClient: Map-Reduce Framework14/12/14 17:05:28 INFO mapred.JobClient: Reduce input groups=1114/12/14 17:05:28 INFO mapred.JobClient: Combine output records=2214/12/14 17:05:28 INFO mapred.JobClient: Map input records=17614/12/14 17:05:28 INFO mapred.JobClient: Reduce shuffle bytes=014/12/14 17:05:28 INFO mapred.JobClient: Reduce output records=1114/12/14 17:05:28 INFO mapred.JobClient: Spilled Records=4414/12/14 17:05:28 INFO mapred.JobClient: Map output bytes=105614/12/14 17:05:28 INFO mapred.JobClient: Combine input records=17614/12/14 17:05:28 INFO mapred.JobClient: Map output records=17614/12/14 17:05:28 INFO mapred.JobClient: Reduce input records=22最终结果为:A 65B 66C 64D 72E 72F 73G 70H 72I 74J 75K 63
为了更清晰从将要执行的控制台中看到map和reduce过程的执行都进行了那些操作,我们在其中打印了相关信息,这里有自己的两点疑惑需要拿出来闹闹:
(1).这里我写的程序和书中不一样,没有增加StringTokenizer token = new StringTokenizer(line,"line")这行,事实上我加上去后会出现错误,我的理解是,因为默认格式是TextInputFormat即已经将文件中的文本按照行标示进行分割,即输入给map方法的已经是以一行为单位的记录,所以这里不需要以“\n”进行分割了。书中的做法应该是假定将整个文件拿过来,统一处理,但事实上这里默认的TextInputFormat已经完成了前期工作。(如果执迷不悟这样处理的话,距离来说NameScore1.txt中第一行是“A 55”整个以“\n”分割后就是一个整体了,再以“\t”就无法分割了。)
(2).从执行过程打印的信息,起初让我有些疑惑,因为从信息来看,似乎是:NameScore1.txt被分割并以每行记录进入map过程,当执行到该文件的最后一行记录时,从打印信息我们似乎看到的是紧接着就去执行reduce过程了,后面的NameScore2.txt也是如此,当两个文件都分别执行了map和reduce时似乎又执行了一次reduce操作。那么事实是不是如此,如果真是这样,这与先前所看到的理论中提到当map执行完后再执行reduce是否有冲突。
通过查看代码我们发现
job.setMapperClass(MapperClass.class);
job.setCombinerClass(ReducerClass.class);
job.setReducerClass(ReducerClass.class);
是的,没错,在这里我们发现了端倪,其真正执行过程是:先执行map,这就是过程信息中打印了每条成绩记录,后面执行的reduce其实是介于map和reduce之间的combiner操作,那么这个Combiner类又为何物,通过神奇的API我们可以发现Combine其实就是一次reduce过程,而且这里明确写了combiner和reduce过程都是使用ReducerClass类,从而就出现了这里为什么对于单个文件都会先执行map然后在reduce,再后来是两个map执行后,才执行的真正的reduce。
2.去重——阉割版的WordCount
相比于前面的求平均值例子需要添加一些逻辑代码来说,这里的去重更像是阉割版的WordCount。
如果你还是用传统的思维在考量一个去重的程序需要多少次的判断,如果你还不了解什么是真正的map和reduce。Hadoop中的去重问题被你整复杂了。要知道,当一个map执行完后会对执行的数据进行一个排序,比如按照字母先后顺序;后面会进入combine阶段,这阶段主要是针对key-value中有相同的key就合并;再到reduce阶段,通过迭代器遍历前一阶段合并的各个元素,得到最终的输出结果。
对于去重来说,我们不在乎一个元素到底出现了几次,只要知道这个元素确实出现了,并能够再最后显示出来就行了,通过map和combiner,我们最终得到的key-value对中的key都是不一样的,也就是说在完成合并的同时就是我们所需要的去重操作(是不是有点绕)。最终reduce输出的就是具有唯一性的去重的元素集合。我们还是按照理清map和reduce的思路来看待这个去重问题:
map: 数据中的一行记录如"(安徽 jack)"——输入数据
直接以key-value的方式写入上下文对象context(这里的value并不是我们关心的对象,可以为空)——执行功能
得到指定键值对类型的输出如"(new Text(安徽),new Text(""))"——输出结果
reduce: map的输出——输入数据
直接以key-value的方式写入上下文对象context(同样,value并不是我们关心的对象)——执行功能
得到指定键值对类型的输入——输出结果
鉴于以上对于map和reduce的理解,代码如下:
1 package org.apache.mapreduce; 2 3 import java.io.IOException; 4 import java.util.Collection; 5 import java.util.Iterator; 6 import java.util.StringTokenizer; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.Path;10 import org.apache.hadoop.io.IntWritable;11 import org.apache.hadoop.io.LongWritable;12 import org.apache.hadoop.io.Text;13 import org.apache.hadoop.mapred.TextInputFormat;14 import org.apache.hadoop.mapreduce.Job;15 import org.apache.hadoop.mapreduce.Mapper;16 import org.apache.hadoop.mapreduce.Reducer;17 import org.apache.hadoop.util.GenericOptionsParser;18 import org.apache.mapreduce.Test1123.MapperClass;19 import org.apache.mapreduce.Test1123.ReducerClass;20 21 public class Test1215 {22 23 public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {24 public void map(LongWritable key, Text value, Context context){25 26 try {27 context.write(value, new Text(""));28 System.out.println("value:" + value);29 } catch (IOException e) {30 e.printStackTrace();31 } catch (InterruptedException e) {32 e.printStackTrace();33 }34 }35 }36 37 public static class ReducerClass extends Reducer<Text, Text, Text, Text>{38 public void reduce(Text key, Iterable<Text> value, Context context){39 40 try {41 context.write(key, new Text(""));42 System.out.println("key:"+key);43 } catch (IOException e) {44 e.printStackTrace();45 } catch (InterruptedException e) {46 e.printStackTrace();47 }48 }49 }50 /**51 * @param args52 * @throws IOException 53 * @throws ClassNotFoundException 54 * @throws InterruptedException 55 */56 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {57 58 Configuration conf = new Configuration();59 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();60 if (otherArgs.length != 2) {61 System.err.println("Usage: wordcount <in> <out>");62 System.exit(2);63 }64 Job job = new Job(conf, "Test1214");65 66 job.setJarByClass(Test1215.class);67 job.setMapperClass(MapperClass.class);68 job.setCombinerClass(ReducerClass.class);69 job.setReducerClass(ReducerClass.class);70 job.setOutputKeyClass(Text.class);71 job.setOutputValueClass(Text.class);72 73 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));74 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));75 System.exit(job.waitForCompletion(true) ? 0 : 1);76 System.out.println("end");77 }78 79 }
数据集:手动创建两个文件,每个文件内都有重复元素,两个文件内也有重复元素,具体如下:
repeat1.txt:
安徽 jack江苏 jim江西 lucy广东 david上海 smith安徽 jack江苏 jim北京 yemener
repeat2.txt
江西 lucy安徽 jack上海 hanmei北京 yemener新疆 afanti黑龙江 lily福建 tom安徽 jack
通过运行,我们发现控制台打印的信息如下:
14/12/15 21:57:07 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=14/12/15 21:57:07 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 214/12/15 21:57:07 INFO mapred.JobClient: Running job: job_local_000114/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 214/12/15 21:57:07 INFO mapred.MapTask: io.sort.mb = 10014/12/15 21:57:07 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/15 21:57:07 INFO mapred.MapTask: record buffer = 262144/327680value:安徽 jackvalue:江苏 jimvalue:江西 lucyvalue:广东 davidvalue:上海 smithvalue:安徽 jackvalue:江苏 jimvalue:北京 yemener14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map outputkey:上海 smithkey:北京 yemenerkey:安徽 jackkey:广东 davidkey:江苏 jimkey:江西 lucy14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 014/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting14/12/15 21:57:08 INFO mapred.LocalJobRunner: 14/12/15 21:57:08 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done.14/12/15 21:57:08 INFO mapred.MapTask: io.sort.mb = 10014/12/15 21:57:08 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/15 21:57:08 INFO mapred.MapTask: record buffer = 262144/327680value:江西 lucyvalue:安徽 jackvalue:上海 hanmeivalue:北京 yemenervalue:新疆 afantivalue:黑龙江 lilyvalue:福建 tomvalue:安徽 jack14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map outputkey:上海 hanmeikey:北京 yemenerkey:安徽 jackkey:新疆 afantikey:江西 lucykey:福建 tomkey:黑龙江 lily14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 014/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting14/12/15 21:57:08 INFO mapred.LocalJobRunner: 14/12/15 21:57:08 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0‘ done.14/12/15 21:57:08 INFO mapred.LocalJobRunner: 14/12/15 21:57:08 INFO mapred.Merger: Merging 2 sorted segments14/12/15 21:57:08 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 212 bytes14/12/15 21:57:08 INFO mapred.LocalJobRunner: key:上海 hanmeikey:上海 smithkey:北京 yemenerkey:安徽 jackkey:广东 davidkey:新疆 afantikey:江苏 jimkey:江西 lucykey:福建 tomkey:黑龙江 lily14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting14/12/15 21:57:08 INFO mapred.LocalJobRunner: 14/12/15 21:57:08 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now14/12/15 21:57:08 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/user/hadoop/output514/12/15 21:57:08 INFO mapred.LocalJobRunner: reduce > reduce14/12/15 21:57:08 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done.14/12/15 21:57:08 INFO mapred.JobClient: map 100% reduce 100%14/12/15 21:57:08 INFO mapred.JobClient: Job complete: job_local_000114/12/15 21:57:08 INFO mapred.JobClient: Counters: 1414/12/15 21:57:08 INFO mapred.JobClient: FileSystemCounters14/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_READ=5058414/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_READ=50714/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=10299714/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=14014/12/15 21:57:08 INFO mapred.JobClient: Map-Reduce Framework14/12/15 21:57:08 INFO mapred.JobClient: Reduce input groups=1014/12/15 21:57:08 INFO mapred.JobClient: Combine output records=1314/12/15 21:57:08 INFO mapred.JobClient: Map input records=1614/12/15 21:57:08 INFO mapred.JobClient: Reduce shuffle bytes=014/12/15 21:57:08 INFO mapred.JobClient: Reduce output records=1014/12/15 21:57:08 INFO mapred.JobClient: Spilled Records=2614/12/15 21:57:08 INFO mapred.JobClient: Map output bytes=22014/12/15 21:57:08 INFO mapred.JobClient: Combine input records=1614/12/15 21:57:08 INFO mapred.JobClient: Map output records=1614/12/15 21:57:08 INFO mapred.JobClient: Reduce input records=13
基于以上两个例子的分析,让我们了解map是怎么一回事,reduce又做了什么,在map和reduce之间还有那些猫腻,整个mapreduce是如何一次次的帮助我们完成我们想要的逻辑,当然这里为了方便用的是小数据集,事实上在大数据集上解决这样的问题更能凸显mapreduce高大上和救世主的形象。
真心觉得Doug Cutting很牛,如何写出这样的框架,低头一想,前面的路还很长。今天就到这,觉得有用,记得点赞哦,你的到来是对我最大的鼓舞^_^
本文链接:《Hadoop阅读笔记(二)——利用MapReduce求平均数和去重》http://www.cnblogs.com/bigdataZJ/p/hadoopreading2.html
Hadoop阅读笔记(二)——利用MapReduce求平均数和去重