首页 > 代码库 > 【Hadoop】Hadoop MR 自定义分组 Partition机制
【Hadoop】Hadoop MR 自定义分组 Partition机制
1、概念
2、Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理
3、代码示例
FlowBean
package com.ares.hadoop.mr.flowgroup;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNum; private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { // TODO Auto-generated constructor stub } // public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {// super();// this.phoneNum = phoneNum;// this.upFlow = upFlow;// this.downFlow = downFlow;// this.sumFlow = sumFlow;// } public String getPhoneNum() { return phoneNum; } public void setPhoneNum(String phoneNum) { this.phoneNum = phoneNum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub phoneNum = in.readUTF(); upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(phoneNum); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return "" + phoneNum + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { // TODO Auto-generated method stub return sumFlow>o.getSumFlow()?-1:1; } }
FlowGroup
package com.ares.hadoop.mr.flowgroup;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.StringUtils;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.log4j.Logger;import com.ares.hadoop.mr.exception.LineException;import com.ares.hadoop.mr.flowgroup.FlowBean;;public class FlowGroup extends Configured implements Tool { private static final Logger LOGGER = Logger.getLogger(FlowGroup.class); enum Counter { LINESKIP } public static class FlowGroupMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private String line; private int length; private final static char separator = ‘\t‘; private String phoneNum; private long upFlow; private long downFlow; //private long sumFlow; private Text text = new Text(); private FlowBean flowBean = new FlowBean(); @Override protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //super.map(key, value, context); String errMsg; try { line = value.toString(); String[] fields = StringUtils.split(line, separator); length = fields.length; if (length != 11) { throw new LineException(key.get() + ", " + line + " LENGTH INVALID, IGNORE..."); } phoneNum = fields[1]; upFlow = Long.parseLong(fields[length-3]); downFlow = Long.parseLong(fields[length-2]); //sumFlow = upFlow + downFlow; text.set(phoneNum); flowBean.setPhoneNum(phoneNum); flowBean.setUpFlow(upFlow); flowBean.setDownFlow(downFlow); //flowBean.setSumFlow(sumFlow); context.write(text, flowBean); } catch (LineException e) { // TODO: handle exception LOGGER.error(e); System.out.println(e); context.getCounter(Counter.LINESKIP).increment(1); return; } catch (NumberFormatException e) { // TODO: handle exception errMsg = key.get() + ", " + line + " FLOW DATA INVALID, IGNORE..."; LOGGER.error(errMsg); System.out.println(errMsg); context.getCounter(Counter.LINESKIP).increment(1); return; } catch (Exception e) { // TODO: handle exception LOGGER.error(e); System.out.println(e); context.getCounter(Counter.LINESKIP).increment(1); return; } } } public static class FlowGroupReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> { private FlowBean flowBean = new FlowBean(); @Override protected void reduce( Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, FlowBean, NullWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //super.reduce(arg0, arg1, arg2); long upFlowCounter = 0; long downFlowCounter = 0; for (FlowBean flowBean : values) { upFlowCounter += flowBean.getUpFlow(); downFlowCounter += flowBean.getDownFlow(); } flowBean.setPhoneNum(key.toString()); flowBean.setUpFlow(upFlowCounter); flowBean.setDownFlow(downFlowCounter); flowBean.setSumFlow(upFlowCounter + downFlowCounter); context.write(flowBean, NullWritable.get()); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub String errMsg = "FlowGroup: TEST STARTED..."; LOGGER.debug(errMsg); System.out.println(errMsg); Configuration conf = new Configuration(); //FOR Eclipse JVM Debug //conf.set("mapreduce.job.jar", "flowsum.jar"); Job job = Job.getInstance(conf); // JOB NAME job.setJobName("FlowGroup"); // JOB MAPPER & REDUCER job.setJarByClass(FlowGroup.class); job.setMapperClass(FlowGroupMapper.class); job.setReducerClass(FlowGroupReducer.class); // JOB PARTITION job.setPartitionerClass(FlowGroupPartition.class); // JOB REDUCE TASK NUMBER job.setNumReduceTasks(5); // MAP & REDUCE job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); // MAP job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // JOB INPUT & OUTPUT PATH //FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.setInputPaths(job, args[1]); FileOutputFormat.setOutputPath(job, new Path(args[2])); // VERBOSE OUTPUT if (job.waitForCompletion(true)) { errMsg = "FlowGroup: TEST SUCCESSFULLY..."; LOGGER.debug(errMsg); System.out.println(errMsg); return 0; } else { errMsg = "FlowGroup: TEST FAILED..."; LOGGER.debug(errMsg); System.out.println(errMsg); return 1; } } public static void main(String[] args) throws Exception { if (args.length != 3) { String errMsg = "FlowGroup: ARGUMENTS ERROR"; LOGGER.error(errMsg); System.out.println(errMsg); System.exit(-1); } int result = ToolRunner.run(new Configuration(), new FlowGroup(), args); System.exit(result); }}
FlowGroupPartition
package com.ares.hadoop.mr.flowgroup;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class FlowGroupPartition<KEY, VALUE> extends Partitioner<KEY, VALUE>{ private static HashMap<String, Integer> groupMap = new HashMap<String, Integer>(); static { groupMap.put("135", 0); groupMap.put("136", 1); groupMap.put("137", 2); groupMap.put("138", 3); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { // TODO Auto-generated method stub return (groupMap.get(key.toString().substring(0, 3)) == null)?4: groupMap.get(key.toString().substring(0, 3)); }}
【Hadoop】Hadoop MR 自定义分组 Partition机制
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。