首页 > 代码库 > Hadoop读书笔记(十一)MapReduce中的partition分组
Hadoop读书笔记(十一)MapReduce中的partition分组
Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855
1.partition分组
partition是指定分组算法,以及通过setNumReduceTasks设定Reduce的任务个数
2.代码
KpiApp.ava
package cmd; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * <p> * Title: KpiApp.java * Package mapReduce * </p> * <p> * Description: 本例demo中统计给定的流量文件每个手机号码的流量,使用Partitioner分组,如果setNumReduceTasks大于1,必须将代码打成jar * <p> * @author Tom.Cai * @created 2014-11-25 下午10:23:33 * @version V1.0 * */ public class KpiApp extends Configured implements Tool{ public static void main(String[] args) throws Exception { ToolRunner.run(new KpiApp(), args); } @Override public int run(String[] arg0) throws Exception { String INPUT_PATH = arg0[0]; String OUT_PATH = arg0[1]; FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), new Configuration()); Path outPath = new Path(OUT_PATH); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } Job job = new Job(new Configuration(), KpiApp.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(KpiMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(KpiWite.class); job.setPartitionerClass(kpiPartitioner.class); job.setNumReduceTasks(2); job.setReducerClass(KpiReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWite.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); return 0; } static class KpiMapper extends Mapper<LongWritable, Text, Text, KpiWite> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splited = value.toString().split("\t"); String num = splited[1]; KpiWite kpi = new KpiWite(splited[6], splited[7], splited[8], splited[9]); context.write(new Text(num), kpi); } } static class KpiReducer extends Reducer<Text, KpiWite, Text, KpiWite> { @Override protected void reduce(Text key, Iterable<KpiWite> value, Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for (KpiWite kpi : value) { upPackNum += kpi.upPackNum; downPackNum += kpi.downPackNum; upPayLoad += kpi.upPayLoad; downPayLoad += kpi.downPayLoad; } context.write(key, new KpiWite(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad))); } } static class kpiPartitioner extends HashPartitioner<Text, KpiWite>{ @Override public int getPartition(Text key, KpiWite value, int numReduceTasks) { return (key.toString().length() == 11) ? 0:1; } } } class KpiWite implements Writable { long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; public KpiWite() { } public KpiWite(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) { this.upPackNum = Long.parseLong(upPackNum); this.downPackNum = Long.parseLong(downPackNum); this.upPayLoad = Long.parseLong(upPayLoad); this.downPayLoad = Long.parseLong(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(upPackNum); out.writeLong(downPackNum); out.writeLong(upPayLoad); out.writeLong(downPayLoad); } }
3.备注:
使用Partitioner分组,如果setNumReduceTasks大于1,必须将代码打成jar包运行
否则将报以下错误:
java.io.IOException: Illegal partition for 84138413 (1) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at mapReduce.KpiApp$KpiMapper.map(KpiApp.java:75) at mapReduce.KpiApp$KpiMapper.map(KpiApp.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
欢迎大家一起讨论学习!
有用的自己收!
记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang
Hadoop读书笔记(十一)MapReduce中的partition分组
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。