首页 > 代码库 > hadoop二次排序

hadoop二次排序

import java.io.DataInput;import java.io.DataOutput;import java.io.File;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DataInputBuffer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.Lz4Codec;//import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.io.compress.SnappyCodec;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.util.GenericOptionsParser;import com.hadoop.compression.lzo.LzoCodec;// 002484 18.29// 600879 12.89public class SecondSotrStr {public static class StrPair implements WritableComparable<StrPair> {private Text first;private Text second;private Text third;private Text fourth;public StrPair() {set(new Text(), new Text(), new Text(), new Text());}public void set(Text left, Text right, Text third, Text fourth) {this.first = left;this.second = right;this.third = third;this.fourth = fourth;}public Text getFirst() {return first;}public Text getSecond() {return second;}public Text getThird() {return third;}public Text getFourth() {return fourth;}@Overridepublic String toString() {return first + "\t" + second + "\t" + third + "\t" + fourth;}@Overridepublic void readFields(DataInput in) throws IOException {first.readFields(in);second.readFields(in);third.readFields(in);fourth.readFields(in);}@Overridepublic void write(DataOutput out) throws IOException {first.write(out);second.write(out);third.write(out);fourth.write(out);}@Overridepublic int hashCode() {return first.hashCode() * 157 + second.hashCode() * 10+ third.hashCode();}@Overridepublic boolean equals(Object right) {if (right instanceof StrPair) {StrPair r = (StrPair) right;return first.equals(r.first) && second.equals(r.second)&& third.equals(r.third) && fourth.equals(r.fourth);} else {return false;}}/** A Comparator that compares serialized StrPair. */public static class Comparator extends WritableComparator {private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();public Comparator() {super(StrPair.class);}// 排序比较器,数据全部存在byte数组public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,int l2) {// 二进制数组读取/** try { //System.out.println("--" + b1[s1]); Integer firstL1 =* WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);* //String str = readSt // System.out.println("firstL1 = " +* firstL1); } catch (IOException e) { // TODO Auto-generated* catch block e.printStackTrace(); }*/// int intvalue = http://www.mamicode.com/readInt(b1, s1);"third = " +* third);*/System.out.println("l1 = " + l1);return compareBytes(b1, s1, l1, b2, s2, l2);/*try {int firstl1 = WritableUtils.decodeVIntSize(b1[s1])+ readVInt(b1, s1);int firstl2 = WritableUtils.decodeVIntSize(b2[s2])+ readVInt(b2, s2);int cmp = TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2,firstl2);if (cmp != 0)return cmp;int firstl12 = WritableUtils.decodeVIntSize(b1[s1 + firstl1])+ readVInt(b1 , s1 + firstl1);int firstl22 = WritableUtils.decodeVIntSize(b2[s2 + firstl2])+ readVInt(b2, s2 + firstl2);cmp = TEXT_COMPARATOR.compare(b1, s1 + firstl1, firstl12, b2, s2 + firstl2,firstl22);if (cmp != 0) return cmp;int firstl13 = WritableUtils.decodeVIntSize(b1[s1+ firstl1 + firstl12])+ readVInt(b1 , s1 + firstl1 + firstl22);int firstl23 = WritableUtils.decodeVIntSize(b2[s2 + firstl2 + firstl22])+ readVInt(b2, s2 + firstl2 + firstl22);cmp = TEXT_COMPARATOR.compare(b1, s1+ firstl1 + firstl12, firstl13, b2, s2 + firstl2 + firstl22,firstl23);//if (cmp != 0)return cmp;return TEXT_COMPARATOR.compare(b1, s1 + firstl1, l1- firstl1, b2, s2 + firstl2, l1 - firstl2);} catch (IOException e) {throw new IllegalArgumentException(e);}*/}}static { // register this comparatorWritableComparator.define(StrPair.class, new Comparator());}// @Overridepublic int compareTo(StrPair o) {/** if (first != o.first) { return first* < o.first ? -1 : 1; } else if (second* != o.second) { return second <* o.second ? -1 : 1; }// else if (third* != o.third) { // return third <* o.third ? -1 : 1;}* * return 0;*/return 0;}}/*** Partition based on the first part of the pair.*/public static class FirstPartitioner extends Partitioner<StrPair, Text> {@Override//public int getPartition(StrPair key, Text value, int numPartitions) {return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;}}/*** Compare only the first part of the pair, so that reduce is called once* for each value of the first part.*/// 调用这里public static class FirstGroupingComparator implementsRawComparator<StrPair> {@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,b2, s2, Integer.SIZE / 8);}@Overridepublic int compare(StrPair o1, StrPair o2) {System.out.println("-----group2-----");Text l = o1.getFirst();Text r = o2.getFirst();return l.equals(r) ? 0 : 1;// return l == r ? 0 : (l < r ? -1 : 1);}}/*** Read two integers from each line and generate a key, value pair as* ((left, right), right).*/public static class MapClass extendsMapper<LongWritable, Text, StrPair, NullWritable> {private final StrPair key = new StrPair();private final IntWritable value = http://www.mamicode.com/new IntWritable();"value" + inValue.toString());StringTokenizer itr = new StringTokenizer(inValue.toString());if (itr.hasMoreTokens()) {left.set((itr.nextToken()));if (itr.hasMoreTokens()) {right.set(itr.nextToken());if (itr.hasMoreTokens()) {third.set(itr.nextToken());if (itr.hasMoreTokens()) {fourth.set(itr.nextToken());}}}key.set(left, right, third, fourth);// value.set(right);context.write(key, NullWritable.get());}}}/*** A reducer class that just emits the sum of the input values.*/public static class Reduce extendsReducer<StrPair, NullWritable, Text, NullWritable> {private static final Text SEPARATOR = new Text("------------------------------------------------");private final Text first = new Text();@Overridepublic void reduce(StrPair key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {// Text outkey = new Text(key.to);// context.write(SEPARATOR, null);// first.set(Integer.toString(key.getFirst()));// System.out.println("key1 " + key );for (NullWritable value : values) {System.out.println("key2 " + key);context.write(new Text(key.toString()), NullWritable.get());}}}private static boolean flag;public static boolean deleteFile(String sPath) {flag = false;File file = new File(sPath);// 路径为文件且不为空则进行删除if (file.isFile() && file.exists()) {file.delete();flag = true;}return flag;}public static boolean deleteDirectory(String sPath) {// 如果sPath不以文件分隔符结尾,自动添加文件分隔符if (!sPath.endsWith(File.separator)) {sPath = sPath + File.separator;}File dirFile = new File(sPath);// 如果dir对应的文件不存在,或者不是一个目录,则退出if (!dirFile.exists() || !dirFile.isDirectory()) {return false;}flag = true;// 删除文件夹下的所有文件(包括子目录)File[] files = dirFile.listFiles();for (int i = 0; i < files.length; i++) {// 删除子文件if (files[i].isFile()) {flag = deleteFile(files[i].getAbsolutePath());if (!flag)break;} // 删除子目录else {flag = deleteDirectory(files[i].getAbsolutePath());if (!flag)break;}}if (!flag)return false;// 删除当前目录if (dirFile.delete()) {return true;} else {return false;}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.setBoolean("mapreduce.map.output.compress", true);* //conf.setBoolean("mapreduce.output.fileoutputformat.compress",* false);* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",* GzipCodec.class, CompressionCodec.class);*/// gzip/** conf.setBoolean("mapreduce.map.output.compress", true);* conf.setClass("mapreduce.map.output.compression.codec",* GzipCodec.class, CompressionCodec.class);* conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",* GzipCodec.class, CompressionCodec.class);*/conf.set("mapreduce.map.log.level", "DEBUG");// snappy/** conf.setBoolean("mapreduce.map.output.compress", true);* conf.setClass("mapreduce.map.output.compression.codec",* SnappyCodec.class, CompressionCodec.class);* conf.setBoolean("mapreduce.output.fileoutputformat.compress", false);* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",* SnappyCodec.class, CompressionCodec.class);*/String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: SecondSotrStr <in> <out>");System.exit(2);}Path outputDir = new Path(otherArgs[1]);FileSystem fs = FileSystem.get(conf);if (fs.exists(outputDir)) {fs.delete(outputDir, true);}Job job = new Job(conf, "secondary sort");job.setJarByClass(SecondSotrStr.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);/** conf.setBoolean("mapred.output.compress", true); //* conf.setClass("mapred.output.compression.codec", GzipCodec.class,* CompressionCodec.class);* conf.setClass("mapred.output.compression.codec", SnappyCodec.class,* CompressionCodec.class);* * conf.setBoolean("reduce.output.compress", true); //* conf.setClass("mapred.output.compression.codec", GzipCodec.class,* CompressionCodec.class);* conf.setClass("reduce.output.compression.codec", SnappyCodec.class,* CompressionCodec.class);* * /* conf.setBoolean("mapreduce.output.compress", true);* conf.setClass("mapreduce.output.compression.codec", GzipCodec.class,* CompressionCodec.class);*/// group and partition by the first int in the pairjob.setPartitionerClass(FirstPartitioner.class);job.setGroupingComparatorClass(FirstGroupingComparator.class);// the map output is StrPair, IntWritablejob.setMapOutputKeyClass(StrPair.class);job.setMapOutputValueClass(NullWritable.class);// the reduce output is Text, IntWritablejob.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// lzo/** conf.setBoolean("mapreduce.map.output.compress", true);* conf.setClass("mapreduce.map.output.compression.codec",* LzoCodec.class, CompressionCodec.class);* conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",* LzoCodec.class, CompressionCodec.class);*/// 块压缩// job.setOutputFormatClass(SequenceFileOutputFormat.class);conf.set("mapred.output.compression.type", "BLOCK");FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

  

hadoop二次排序