首页 > 代码库 > hadoop(五) - 分布式计算利器MapReduce加强
hadoop(五) - 分布式计算利器MapReduce加强
一. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
public class DataCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCount.class); job.setMapperClass(DCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataInfo.class); job.setReducerClass(DCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataInfo.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setPartitionerClass(DCPartitioner.class); job.setNumReduceTasks(Integer.parseInt(args[2])); job.waitForCompletion(true); } //Map public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{ private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataInfo>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String tel = fields[1]; long up = Long.parseLong(fields[8]); long down = Long.parseLong(fields[9]); DataInfo dataInfo = new DataInfo(tel,up,down); k.set(tel); context.write(k, dataInfo); } } public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{ @Override protected void reduce(Text key, Iterable<DataInfo> values, Reducer<Text, DataInfo, Text, DataInfo>.Context context) throws IOException, InterruptedException { long up_sum = 0; long down_sum = 0; for(DataInfo d : values){ up_sum += d.getUpPayLoad(); down_sum += d.getDownPayLoad(); } DataInfo dataInfo = new DataInfo("",up_sum,down_sum); context.write(key, dataInfo); } } public static class DCPartitioner extends Partitioner<Text, DataInfo>{ private static Map<String,Integer> provider = new HashMap<String,Integer>(); static{ provider.put("138", 1); provider.put("139", 1); provider.put("152", 2); provider.put("153", 2); provider.put("182", 3); provider.put("183", 3); } @Override public int getPartition(Text key, DataInfo value, int numPartitions) { //向数据库或配置信息 读写 String tel_sub = key.toString().substring(0,3); Integer count = provider.get(tel_sub); if(count == null){ count = 0; } return count; } } }
二. 排序和分组
map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
public class InfoBean implements WritableComparable<InfoBean>{ private String account; private double income; private double expenses; private double surplus; public void set(String account,double income,double expenses){ this.account = account; this.income = income; this.expenses = expenses; this.surplus = income - expenses; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(account); out.writeDouble(income); out.writeDouble(expenses); out.writeDouble(surplus); } @Override public void readFields(DataInput in) throws IOException { this.account = in.readUTF(); this.income = in.readDouble(); this.expenses = in.readDouble(); this.surplus = in.readDouble(); } @Override public int compareTo(InfoBean o) { if(this.income == o.getIncome()){ return this.expenses > o.getExpenses() ? 1 : -1; } return this.income > o.getIncome() ? 1 : -1; } @Override public String toString() { return income + "\t" + expenses + "\t" + surplus; } // get set }
三. Combiners编程
每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。如果不用combiner,那么所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
public class InverseIndex { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置jar job.setJarByClass(InverseIndex.class); //设置Mapper相关的属性 job.setMapperClass(IndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt //设置Reducer相关属性 job.setReducerClass(IndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setCombinerClass(IndexCombiner.class); //提交任务 job.waitForCompletion(true); } public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text k = new Text(); private Text v = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" "); FileSplit inputSplit = (FileSplit) context.getInputSplit(); Path path = inputSplit.getPath(); String name = path.getName(); for(String f : fields){ k.set(f + "->" + name); v.set("1"); context.write(k, v); } } } public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{ private Text k = new Text(); private Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] fields = key.toString().split("->"); long sum = 0; for(Text t : values){ sum += Long.parseLong(t.toString()); } k.set(fields[0]); v.set(fields[1] + "->" + sum); context.write(k, v); } } public static class IndexReducer extends Reducer<Text, Text, Text, Text>{ private Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String value = http://www.mamicode.com/"";>
四. shuffle
每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent), 一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
写磁盘前,要partition, sort。如果有combiner,combine排序后数据。等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
写磁盘前,要partition, sort。如果有combiner,combine排序后数据。等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
Reducer通过Http方式得到输出文件的分区。TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。排序阶段合并map输出。然后走Reduce阶段。
hadoop(五) - 分布式计算利器MapReduce加强
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。