首页 > 代码库 > SecondarySort 原理
SecondarySort 原理
定义IntPair 以及 IntPair(first,second)的compareto,先比較first的大小,再比較second的大小
定义FirstPartitioner是为了让partition的时候依照IntPair的first来做为选择reduce的根据
定义FirstGroupingComparator是为了:《Pro Hadoop》,里面有一部分内容详解了这个问题,看后最终明确了,和大家分享一下。reduce方法每次是读一条记录,读到对应的key,可是处理value集合时,处理完当前记录的values后,还会推断下一条记录是不是和当前的key是不是同一个组,假设是的话,会继续读取这些记录的值,而这个记录也会被觉得已经处理了,直到记录不是当前组,这次reduce调用才结束,这样一次reduce调用就会处理掉一个组中的全部记录,而不不过一条了。
以下是从hadoop里取出的源码,能够再理解下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; /** * This is an example Hadoop Map/Reduce application. * It reads the text input files that must contain two integers per a line. * The output is sorted by the first and second number and grouped on the * first number. * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort * <i>in-dir</i> <i>out-dir</i> */ public class SecondarySort { /** * Define a pair of integers that are writable. * They are serialized in a byte comparable format. */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; /** * Set the left and right values. */ public void set(int left, int right) { first = left; second = right; } public IntPair(){} public IntPair(int left,int right){ set(left, right); } public int getFirst() { return first; } public int getSecond() { return second; } /** * Read the two integers. * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1 */ @Override public void readFields(DataInput in) throws IOException { first = in.readInt() + Integer.MIN_VALUE; second = in.readInt() + Integer.MIN_VALUE; } @Override public void write(DataOutput out) throws IOException { out.writeInt(first - Integer.MIN_VALUE); out.writeInt(second - Integer.MIN_VALUE); } @Override public int hashCode() { return first * 157 + second; } @Override public boolean equals(Object right) { if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; } else { return false; } } /** A Comparator that compares serialized IntPair. */ public static class Comparator extends WritableComparator { public Comparator() { super(IntPair.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(IntPair.class, new Comparator()); } @Override public int compareTo(IntPair o) { if (first != o.first) { return first < o.first ? -1 : 1; } else if (second != o.second) { return second < o.second ? -1 : 1; } else { return 0; } } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } } /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = http://www.mamicode.com/new IntWritable();>
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。