首页 > 代码库 > MapReduce 学习4 ---- 自定义分区、自定义排序、自定义组分

MapReduce 学习4 ---- 自定义分区、自定义排序、自定义组分

1. map任务处理

1.3 对输出的key、value进行分区。

分区的目的指的是把相同分类的<k,v>交给同一个reducer任务处理。

 

public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{		static HashMap<String,Integer> map = null;		static{			map = new HashMap<String,Integer>();			map.put("gz1", 0);			map.put("gz2", 0);			map.put("sz1", 1);			map.put("sz2", 1);		}		/**		 * 这里是对mapper任务输出的<k2,v2>进行操作		 * getPartition函数返回多少的值,就会有多少个reducer任务		 * 		 * “gz1”与“gz2”的返回的都是0,所以与分发到同一个reducer任务上,但是k2的值不一样		 * 所以分组就是		 * <gz1,123>		 * <gz2,234>		 * 然后出现在不同reduce函数上		 */		@Override		public int getPartition(Text key, LongWritable value, int numPartitions) {						return (Integer)map.get(key.toString()).intValue();		}		}


//设置分区
        wcjob.setPartitionerClass(MyPartitioner.class);

 

自定义排序,排序是根据k2来进行排序的,k2就需要自己进行自定义类型

 private static class MyNewKey implements WritableComparable<MyNewKey> {        long firstNum;        long secondNum;        public MyNewKey() {        }        public MyNewKey(long first, long second) {            firstNum = first;            secondNum = second;        }        @Override        public void write(DataOutput out) throws IOException {            out.writeLong(firstNum);            out.writeLong(secondNum);        }        @Override        public void readFields(DataInput in) throws IOException {            firstNum = in.readLong();            secondNum = in.readLong();        }        /*         * 当key进行排序时会调用以下这个compreTo方法         */        @Override        public int compareTo(MyNewKey anotherKey) {            long min = firstNum - anotherKey.firstNum;            if (min != 0) {                // 说明第一列不相等,则返回两数之间小的数                return (int) min;            } else {                return (int) (secondNum - anotherKey.secondNum);            }        }    }

 

 

自定义分组

为了针对新的key类型作分组,我们也需要自定义一下分组规则:

(1)编写一个新的分组比较类型用于我们的分组:

 private static class MyGroupingComparator implements            RawComparator<MyNewKey> {        /*         * 基本分组规则:按第一列firstNum进行分组         */        @Override        public int compare(MyNewKey key1, MyNewKey key2) {            return (int) (key1.firstNum - key2.firstNum);        }        /*         * @param b1 表示第一个参与比较的字节数组         *          * @param s1 表示第一个参与比较的字节数组的起始位置         *          * @param l1 表示第一个参与比较的字节数组的偏移量         *          * @param b2 表示第二个参与比较的字节数组         *          * @param s2 表示第二个参与比较的字节数组的起始位置         *          * @param l2 表示第二个参与比较的字节数组的偏移量         */        @Override        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);        }    }

 从代码中我们可以知道,我们自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,而RawComparator接口又实现了Comparator接口,下面看看这两个接口的定义:

  首先是RawComparator接口的定义:

public interface RawComparator<T> extends Comparator<T> {  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);}

 其次是Comparator接口的定义:

public interface Comparator<T> {    int compare(T o1, T o2);    boolean equals(Object obj);}

 

MyGroupingComparator中分别对这两个接口中的定义进行了实现,RawComparator中的compare()方法是基于字节的比较,Comparator中的compare()方法是基于对象的比较。

  在基于字节的比较方法中,有六个参数,一下子眼花了:

Params:

* @param arg0 表示第一个参与比较的字节数组
* @param arg1 表示第一个参与比较的字节数组的起始位置
* @param arg2 表示第一个参与比较的字节数组的偏移量

* @param arg3 表示第二个参与比较的字节数组
* @param arg4 表示第二个参与比较的字节数组的起始位置
* @param arg5 表示第二个参与比较的字节数组的偏移量

 

由于在MyNewKey中有两个long类型,每个long类型又占8个字节。这里因为比较的是第一列数字,所以读取的偏移量为8字节。

  (2)添加对分组规则的设置:

// 设置自定义分组规则   job.setGroupingComparatorClass(MyGroupingComparator.class);

 

MapReduce 学习4 ---- 自定义分区、自定义排序、自定义组分