首页 > 代码库 > MapReduce 中的两表 join 实例(二)
MapReduce 中的两表 join 实例(二)
package com.baidu.uilt; import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { WritableComparator.define(TextPair.class, new Comparator()); } public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } } }
package com.baidu.loan; /*** * * /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6 /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928 /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928 /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928 * * **/ import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import com.baidu.uilt.TextPair; public class LoanIdeainfoJoinIterialByDAILI6 extends Configured implements Tool { public static class JoinUnitMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> { public void map(LongWritable key, Text value, OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException { String gbkStr = value.toString(); if (gbkStr.split("\t").length < 2 && gbkStr.split(",").length == 4) { String[] strs = gbkStr.split(","); output.collect(new TextPair(strs[0], "0"), value); } } } public static class JoinIterialMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> { public void map(LongWritable key, Text value, OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException { String gbkStr = value.toString(); if (gbkStr.split("\t").length > 4) {// LoanIterial String[] strs = gbkStr.split("\t"); output.collect(new TextPair(strs[0], "1"), value); } } } public static class JoinReducer extends MapReduceBase implements Reducer<TextPair, Text, Text, Text> { public void reduce(TextPair key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Text stationName = new Text(values.next()); while (values.hasNext()) { Text record = values.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); output.collect(stationName, record); //output.collect(key.getFirst(), outValue); } } } public static class KeyPartitioner implements Partitioner<TextPair, Text> { @Override public void configure(JobConf job) {} @Override public int getPartition(TextPair key, Text value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } @Override public int run(String[] args) throws Exception { if (args.length != 3) { return -1; } JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Join record with station name"); String strPathUnit =args[0]; String strPathIterial =args[1]; Path outputPath= new Path(args[2]); MultipleInputs.addInputPath(conf, new Path(strPathUnit), TextInputFormat.class, JoinUnitMapper.class); MultipleInputs.addInputPath(conf, new Path(strPathIterial), TextInputFormat.class, JoinIterialMapper.class); FileOutputFormat.setOutputPath(conf, outputPath); conf.setPartitionerClass(KeyPartitioner.class); conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class); conf.setMapOutputKeyClass(TextPair.class); conf.setReducerClass(JoinReducer.class); conf.setOutputKeyClass(Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new LoanIdeainfoJoinIterialByDAILI6(), args); System.exit(exitCode); } }
本文出自 “梦朝思夕” 博客,请务必保留此出处http://qiangmzsx.blog.51cto.com/2052549/1560553
MapReduce 中的两表 join 实例(二)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。