首页 > 代码库 > Hadoop中Writable类之四

Hadoop中Writable类之四

1.定制Writable类型

Hadoop中有一套Writable实现,例如:IntWritable、Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型。

定制分以下几步:

  • 需要实现WritableComparable接口,因为Writable常常作为健值对出现,而在MapReduce中,中间有个排序很重要,因此,Hadoop中就让Writable实现了WritableComparable
  • 需要实现WritableComparable的write()、readFields()、compareTo()方法
  • 需要重写java.lang.Object中的hashCode()、equals()、toString()方法。

由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法

如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法

 

Example:

CreateWritable.java

  1 package cn.roboson.writable;  2   3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6   7 import org.apache.hadoop.io.Text;  8 import org.apache.hadoop.io.WritableComparable;  9 /** 10  * 1.定制一个Writable类型,里面包含两个Text 11  * 2.和IntWritable等原有的Writable类似,它需要实现WritableComparable 12  * 3.由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法 13  * 4.如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法 14  * @author roboson 15  * 16  */ 17  18 public class CreateWritable implements WritableComparable<CreateWritable>{ 19      20     private Text first; 21     private Text second; 22      23     //构造方法,这个是必须要的 24     public CreateWritable(){ 25          26     } 27      28     public CreateWritable(String first,String second){ 29         set(new Text(first), new Text(second)); 30     } 31      32     public CreateWritable(Text first,Text second){ 33         set(first,second); 34     } 35      36      37      38     public Text getFirst() { 39         return first; 40     } 41  42     public void setFirst(Text first) { 43         this.first = first; 44     } 45  46     public Text getSecond() { 47         return second; 48     } 49  50     public void setSecond(Text second) { 51         this.second = second; 52     } 53  54     public void set(Text first,Text second){ 55         this.first=first; 56         this.second=second; 57     } 58      59  60     @Override 61     public void readFields(DataInput in) throws IOException { 62         // TODO Auto-generated method stub 63         first.readFields(in); 64         second.readFields(in); 65     } 66  67     @Override 68     public void write(DataOutput out) throws IOException { 69         // TODO Auto-generated method stub 70         first.write(out); 71         second.write(out); 72     } 73  74     @Override 75     public int compareTo(CreateWritable other) { 76         // TODO Auto-generated method stub 77         int cmp=first.compareTo(other.getFirst()); 78         if(cmp!=0){ 79             return cmp; 80         } 81         return second.compareTo(other.getSecond()); 82     } 83      84     @Override 85     public int hashCode() { 86         // TODO Auto-generated method stub 87         return first.hashCode()*163 + second.hashCode(); 88     } 89      90     @Override 91     public String toString() { 92         // TODO Auto-generated method stub 93         return first+"\t"+second; 94     } 95      96     @Override 97     public boolean equals(Object obj) { 98         // TODO Auto-generated method stub 99         if(obj instanceof CreateWritable){100             CreateWritable create = (CreateWritable) obj;101             return first.equals(create.getFirst()) && second.equals(create.getSecond());102         }103         return false;104     }105 106 }

 

Writable06.java

 1 package cn.roboson.writable; 2  3 import org.apache.hadoop.io.Text; 4  5 public class Writable06 { 6      7     public static void main(String[] args) { 8         CreateWritable createWritable01 = new CreateWritable("Hadoop", "roboson"); 9         CreateWritable createWritable02 = new CreateWritable(new Text("Hadoop"), new Text("roboson"));10         11         //重写了equals()方法,相比叫first 和 second两者都相同的时候,返回true12         System.out.println(createWritable01.equals(createWritable02));13         14         //重写了compareTo()方法,先比较first,再比较second,返回0:相等 -1:first<second 1:first>second15         System.out.println(createWritable01.compareTo(createWritable02));16         17         //重写了toString()方法,返回 first +"\t"+second18         System.out.println(createWritable01.toString());19     }20 }

 

运行结果:

 

 

2.为速度实现一个RawComparator

关于Comparator方面的知识,可以参考我的博文《Hadoop中WritableComparable 和 comparator》,Comparator中,有一个方法是compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2),该方法直接比较的是序列化,不必先将序列化数据流转化为对象,再进行比较,所以,与上面的compareTo()方法,相比,其更高效!其实,我们查看HadoopAPI,就可以发现,基本类型的Writable类都实现有了Comparator作为其内部类:

好,再看看IntWritable.Comparator这个类,如下图所示,发现它是一个静态类,好好观察它的结构:

 

通过上面的,我们可以发现,要为一个Writable实现一个Comparator,安装Hadoop的格式来,需要注意以下几点:

  • 将其在内部实现,作为内部类
  • 是一个静态的内部类
  • 注册,以便可以通过WritableComparator直接创建

 至于注册方面的知识,可以查看我的博客《Hadoop中Comparator原理》

Example:给上面自定义的CreateWritable类实现一个Comparator,也就是CreateWritable.Comparator类

CreateWritable.java

  1 package cn.roboson.writable;  2   3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6 import java.util.Comparator;  7   8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.io.WritableComparable; 10 import org.apache.hadoop.io.WritableComparator; 11 import org.apache.hadoop.io.WritableUtils; 12 /** 13  * 1.给定制的CreateWritable类实现一个Comparator,CreateWritable.Comparator 14  * @author roboson 15  * 16  */ 17  18 public class CreateWritable implements WritableComparable<CreateWritable>{ 19      20     private Text first; 21     private Text second; 22      23     //构造方法,这个是必须要的 24     public CreateWritable(){ 25          26     } 27      28     public CreateWritable(String first,String second){ 29         set(new Text(first), new Text(second)); 30     } 31      32     public CreateWritable(Text first,Text second){ 33         set(first,second); 34     } 35      36      37      38     public Text getFirst() { 39         return first; 40     } 41  42     public void setFirst(Text first) { 43         this.first = first; 44     } 45  46     public Text getSecond() { 47         return second; 48     } 49  50     public void setSecond(Text second) { 51         this.second = second; 52     } 53  54     public void set(Text first,Text second){ 55         this.first=first; 56         this.second=second; 57     } 58      59  60     @Override 61     public void readFields(DataInput in) throws IOException { 62         // TODO Auto-generated method stub 63         first.readFields(in); 64         second.readFields(in); 65     } 66  67     @Override 68     public void write(DataOutput out) throws IOException { 69         // TODO Auto-generated method stub 70         System.out.println(first); 71         first.write(out); 72         second.write(out); 73     } 74  75     @Override 76     public int compareTo(CreateWritable other) { 77         // TODO Auto-generated method stub 78         int cmp=first.compareTo(other.getFirst()); 79         if(cmp!=0){ 80             return cmp; 81         } 82         return second.compareTo(other.getSecond()); 83     } 84      85     @Override 86     public int hashCode() { 87         // TODO Auto-generated method stub 88         return first.hashCode()*163 + second.hashCode(); 89     } 90      91     @Override 92     public String toString() { 93         // TODO Auto-generated method stub 94         return first+"\t"+second; 95     } 96      97     @Override 98     public boolean equals(Object obj) { 99         // TODO Auto-generated method stub100         if(obj instanceof CreateWritable){101             CreateWritable create = (CreateWritable) obj;102             return first.equals(create.getFirst()) && second.equals(create.getSecond());103         }104         return false;105     }106     107     108     //创建静态内部类:CreateWritable.Comparator109     public static class Comparator extends WritableComparator{110 111         public Comparator() {112             super(CreateWritable.class);113             // TODO Auto-generated constructor stub114         }115         private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();116         117         @Override118         public int compare(byte[] b1, int s1, int l1, byte[] b2,119                 int s2, int l2) {120             // TODO Auto-generated method stub121             int firstL1 = 0,firstL2 = 0;122             try {123                 firstL1 = WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1, s1);124                 firstL2 = WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2);125                 int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);126                 if(cmp !=0){127                     return cmp;128                 }129             } catch (IOException e) {130                 // TODO Auto-generated catch block131                 e.printStackTrace();132             }133             return TEXT_COMPARATOR.compare(b1, s1+firstL1, l1-firstL1, b2, s2+firstL2, l2-firstL2);134         }135         136     }137     static{138         WritableComparator.define(CreateWritable.class,new Comparator());139     }140 }

 

Writable07.java

 1 package cn.roboson.writable; 2  3 import java.io.ByteArrayOutputStream; 4 import java.io.DataOutputStream; 5 import java.io.IOException; 6  7 import org.apache.hadoop.io.RawComparator; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.Writable;10 import org.apache.hadoop.io.WritableComparator;11 12 public class Writable07 {13     14     public static void main(String[] args) throws IOException {15         16         CreateWritable create01 = new CreateWritable(new Text("Hadoop"), new Text("1"));17         CreateWritable create02 = new CreateWritable(new Text("Hadoop"), new Text("2"));18         byte[] b1 = serialize(create01);19         byte[] b2 = serialize(create02);20         RawComparator<CreateWritable> comparator = WritableComparator.get(CreateWritable.class);21         int cmp =comparator.compare(b1, 0, b1.length, b2, 0, b2.length);22         if(cmp ==0){23             System.out.println("create01 == create02");24         }else if(cmp ==-1){25             System.out.println("create01<create02");26         }else{27             System.out.println("create01>create02");28         }29     }30     31     public static byte[] serialize(Writable writable) throws IOException{32         ByteArrayOutputStream out = new ByteArrayOutputStream();33         DataOutputStream dataOut = new DataOutputStream(out);34         writable.write(dataOut);35         return out.toByteArray();36         37     }38 }

 

运行结果:

分析:

在上面的CreateWritable的内部类Comparator中,需要实现一个方法compare()方法,那么compare该如何实现,才能够对序列化数据流进行比较。

CreateWritable是由两个Text对象组成的(Text first, Text second),而Text对象的二进制表示,是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传递给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。

也就是说Text对象序列化后,也是由两部分组成,一部分是记录本身有多少个字节,另一部分就是它自己的字节数!

记录它本身有多少个字节所占用的字节长度:WritableUtils.decodeVintSize()方法返回一个整数,代表它的字节长度。

它自己本身的字节数:WritableComparator的readVInt()方法返回一个整数,代表它本身的字节长度。

这样就方便了,相比叫first的序列化数据流,根据结果,判断,看是否需要再比较second的序列化数据流。

 

Hadoop中Writable类之四