首页 > 代码库 > [hadoop]Hadoop源码分析-Text
[hadoop]Hadoop源码分析-Text
Text是Hadoop中的一个Writable类,定义了Hadoop中的其中的数据类型以及操作。
This class stores text using standard UTF8 encoding. It provides methods to serialize, deserialize, and compare texts at byte level. The type of length is integer and is serialized using zero-compressed format.
In addition, it provides methods for string traversal without converting the byte array to a string.Also includes utilities for serializing/deserialing a string,encoding / decoding a string, checking if a byte array contains valid UTF8 code, calculating the length of an encoded string.
由上图的Writable层次结构图可以看到绝大多数的数据类型都实现了Writable、WritableComparable接口,在此先分析一下这两个接口情况。自顶下下逐步分析。
Writable接口的定义如下:
1 package org.apache.hadoop.io;2 3 import java.io.DataOutput;4 import java.io.DataInput;5 import java.io.IOException;6 public interface Writable {7 void write(DataOutput out) throws IOException;8 void readFields(DataInput in) throws IOException;9 }
void write(DataOutput out) throws IOException/* object将自身字段序列化后的的字节流写入输出流out中。参数: out - 接收object序列化后的字节流的输出流.*/
void readFields(DataInput in) throws IOException/* 将输入流in中的字节流反序列化然后写入object的字段参数: 字节流的出处*/
而DataInput、DataOutput是java.io.*中最基本的输入输出流接口,其他输入输出流都需要实现DataInput与DataOutput这两个接口的方法。关于这两个接口,另外开篇分析解读。
到此Writable接口解读完毕,其实这些东西大家看看API文档也可以看懂的,我只是想详细了解一下Writable类所以就写一次更加明白。
WritableComparable接口定义如下:
package org.apache.hadoop.io;public interface WritableComparable<T> extends Writable, comparable<T> {}
咋一看这个WritableComparable没有方法,其实它的方法全都是通过继承而来的,Writable接口上面已经分析了,所以WritableComparable以下两个方法。
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
还有来自comparable的方法,comparable是属于java.lang.*中的一个接口,它只有一个方法。
int compareTo( T other);/* 比较此对象与指定对象other的顺序。如果该对象小于、等于或大于指定对象,则分别返回负整数、零或正整数。 参数:o - 要比较的对象。 返回:负整数、零或正整数,根据此对象是小于、等于还是大于指定对象。 */
简单来说实现WritableComparable的类是一个可写可比较的类。
现在来分析基本类Text,声明定义如下
public class Textextends BinaryComparableimplements WritableComparable<BinaryComparable>;
1 package org.apache.hadoop.io; 2 3 import java.io.IOException; 4 import java.io.DataInput; 5 import java.io.DataOutput; 6 import java.nio.ByteBuffer; 7 import java.nio.CharBuffer; 8 import java.nio.charset.CharacterCodingException; 9 import java.nio.charset.Charset; 10 import java.nio.charset.CharsetDecoder; 11 import java.nio.charset.CharsetEncoder; 12 import java.nio.charset.CodingErrorAction; 13 import java.nio.charset.MalformedInputException; 14 import java.text.CharacterIterator; 15 import java.text.StringCharacterIterator; 16 17 import org.apache.commons.logging.Log; 18 import org.apache.commons.logging.LogFactory; 19 20 /** This class stores text using standard UTF8 encoding. It provides methods 21 * to serialize, deserialize, and compare texts at byte level. The type of 22 * length is integer and is serialized using zero-compressed format. <p>In 23 * addition, it provides methods for string traversal without converting the 24 * byte array to a string. <p>Also includes utilities for 25 * serializing/deserialing a string, coding/decoding a string, checking if a 26 * byte array contains valid UTF8 code, calculating the length of an encoded 27 * string. 28 */ 29 public class Text extends BinaryComparable 30 implements WritableComparable<BinaryComparable> { 31 private static final Log LOG= LogFactory.getLog(Text.class); 32 33 private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = 34 new ThreadLocal<CharsetEncoder>() { 35 protected CharsetEncoder initialValue() { 36 return Charset.forName("UTF-8").newEncoder(). 37 onMalformedInput(CodingErrorAction.REPORT). 38 onUnmappableCharacter(CodingErrorAction.REPORT); 39 } 40 }; 41 42 private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = 43 new ThreadLocal<CharsetDecoder>() { 44 protected CharsetDecoder initialValue() { 45 return Charset.forName("UTF-8").newDecoder(). 46 onMalformedInput(CodingErrorAction.REPORT). 47 onUnmappableCharacter(CodingErrorAction.REPORT); 48 } 49 }; 50 51 private static final byte [] EMPTY_BYTES = new byte[0]; 52 53 private byte[] bytes; 54 private int length; 55 56 public Text() { 57 bytes = EMPTY_BYTES; 58 } 59 60 /** Construct from a string. 61 */ 62 public Text(String string) { 63 set(string); 64 } 65 66 /** Construct from another text. */ 67 public Text(Text utf8) { 68 set(utf8); 69 } 70 71 /** Construct from a byte array. 72 */ 73 public Text(byte[] utf8) { 74 set(utf8); 75 } 76 77 /** 78 * Returns the raw bytes; however, only data up to {@link #getLength()} is 79 * valid. 80 */ 81 public byte[] getBytes() { 82 return bytes; 83 } 84 85 /** Returns the number of bytes in the byte array */ 86 public int getLength() { 87 return length; 88 } 89 90 /** 91 * Returns the Unicode Scalar Value (32-bit integer value) 92 * for the character at <code>position</code>. Note that this 93 * method avoids using the converter or doing String instatiation 94 * @return the Unicode scalar value at position or -1 95 * if the position is invalid or points to a 96 * trailing byte 97 */ 98 public int charAt(int position) { 99 if (position > this.length) return -1; // too long100 if (position < 0) return -1; // duh.101 102 ByteBuffer bb = (ByteBuffer)ByteBuffer.wrap(bytes).position(position);103 return bytesToCodePoint(bb.slice());104 }105 106 public int find(String what) {107 return find(what, 0);108 }109 110 /**111 * Finds any occurence of <code>what</code> in the backing112 * buffer, starting as position <code>start</code>. The starting113 * position is measured in bytes and the return value is in114 * terms of byte position in the buffer. The backing buffer is115 * not converted to a string for this operation.116 * @return byte position of the first occurence of the search117 * string in the UTF-8 buffer or -1 if not found118 */119 public int find(String what, int start) {120 try {121 ByteBuffer src = http://www.mamicode.com/ByteBuffer.wrap(this.bytes,0,this.length);122 ByteBuffer tgt = encode(what);123 byte b = tgt.get();124 src.position(start);125 126 while (src.hasRemaining()) {127 if (b == src.get()) { // matching first byte128 src.mark(); // save position in loop129 tgt.mark(); // save position in target130 boolean found = true;131 int pos = src.position()-1;132 while (tgt.hasRemaining()) {133 if (!src.hasRemaining()) { // src expired first134 tgt.reset();135 src.reset();136 found = false;137 break;138 }139 if (!(tgt.get() == src.get())) {140 tgt.reset();141 src.reset();142 found = false;143 break; // no match144 }145 }146 if (found) return pos;147 }148 }149 return -1; // not found150 } catch (CharacterCodingException e) {151 // can‘t get here152 e.printStackTrace();153 return -1;154 }155 } 156 /** Set to contain the contents of a string. 157 */158 public void set(String string) {159 try {160 ByteBuffer bb = encode(string, true);161 bytes = bb.array();162 length = bb.limit();163 }catch(CharacterCodingException e) {164 throw new RuntimeException("Should not have happened " + e.toString()); 165 }166 }167 168 /** Set to a utf8 byte array169 */170 public void set(byte[] utf8) {171 set(utf8, 0, utf8.length);172 }173 174 /** copy a text. */175 public void set(Text other) {176 set(other.getBytes(), 0, other.getLength());177 }178 179 /**180 * Set the Text to range of bytes181 * @param utf8 the data to copy from182 * @param start the first position of the new string183 * @param len the number of bytes of the new string184 */185 public void set(byte[] utf8, int start, int len) {186 setCapacity(len, false);187 System.arraycopy(utf8, start, bytes, 0, len);188 this.length = len;189 }190 191 /**192 * Append a range of bytes to the end of the given text193 * @param utf8 the data to copy from194 * @param start the first position to append from utf8195 * @param len the number of bytes to append196 */197 public void append(byte[] utf8, int start, int len) {198 setCapacity(length + len, true);199 System.arraycopy(utf8, start, bytes, length, len);200 length += len;201 }202 203 /**204 * Clear the string to empty.205 */206 public void clear() {207 length = 0;208 }209 210 /*211 * Sets the capacity of this Text object to <em>at least</em>212 * <code>len</code> bytes. If the current buffer is longer,213 * then the capacity and existing content of the buffer are214 * unchanged. If <code>len</code> is larger215 * than the current capacity, the Text object‘s capacity is216 * increased to match.217 * @param len the number of bytes we need218 * @param keepData should the old data be kept219 */220 private void setCapacity(int len, boolean keepData) {221 if (bytes == null || bytes.length < len) {222 byte[] newBytes = new byte[len];223 if (bytes != null && keepData) {224 System.arraycopy(bytes, 0, newBytes, 0, length);225 }226 bytes = newBytes;227 }228 }229 230 /** 231 * Convert text back to string232 * @see java.lang.Object#toString()233 */234 public String toString() {235 try {236 return decode(bytes, 0, length);237 } catch (CharacterCodingException e) { 238 throw new RuntimeException("Should not have happened " + e.toString()); 239 }240 }241 242 /** deserialize 243 */244 public void readFields(DataInput in) throws IOException {245 int newLength = WritableUtils.readVInt(in);246 setCapacity(newLength, false);247 in.readFully(bytes, 0, newLength);248 length = newLength;249 }250 251 /** Skips over one Text in the input. */252 public static void skip(DataInput in) throws IOException {253 int length = WritableUtils.readVInt(in);254 WritableUtils.skipFully(in, length);255 }256 257 /** serialize258 * write this object to out259 * length uses zero-compressed encoding260 * @see Writable#write(DataOutput)261 */262 public void write(DataOutput out) throws IOException {263 WritableUtils.writeVInt(out, length);264 out.write(bytes, 0, length);265 }266 267 /** Returns true iff <code>o</code> is a Text with the same contents. */268 public boolean equals(Object o) {269 if (o instanceof Text)270 return super.equals(o);271 return false;272 }273 274 public int hashCode() {275 return super.hashCode();276 }277 278 /** A WritableComparator optimized for Text keys. */279 public static class Comparator extends WritableComparator {280 public Comparator() {281 super(Text.class);282 }283 284 public int compare(byte[] b1, int s1, int l1,285 byte[] b2, int s2, int l2) {286 int n1 = WritableUtils.decodeVIntSize(b1[s1]);287 int n2 = WritableUtils.decodeVIntSize(b2[s2]);288 return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);289 }290 }291 292 static {293 // register this comparator294 WritableComparator.define(Text.class, new Comparator());295 }296 297 /// STATIC UTILITIES FROM HERE DOWN298 /**299 * Converts the provided byte array to a String using the300 * UTF-8 encoding. If the input is malformed,301 * replace by a default value.302 */303 public static String decode(byte[] utf8) throws CharacterCodingException {304 return decode(ByteBuffer.wrap(utf8), true);305 }306 307 public static String decode(byte[] utf8, int start, int length) 308 throws CharacterCodingException {309 return decode(ByteBuffer.wrap(utf8, start, length), true);310 }311 312 /**313 * Converts the provided byte array to a String using the314 * UTF-8 encoding. If <code>replace</code> is true, then315 * malformed input is replaced with the316 * substitution character, which is U+FFFD. Otherwise the317 * method throws a MalformedInputException.318 */319 public static String decode(byte[] utf8, int start, int length, boolean replace) 320 throws CharacterCodingException {321 return decode(ByteBuffer.wrap(utf8, start, length), replace);322 }323 324 private static String decode(ByteBuffer utf8, boolean replace) 325 throws CharacterCodingException {326 CharsetDecoder decoder = DECODER_FACTORY.get();327 if (replace) {328 decoder.onMalformedInput(329 java.nio.charset.CodingErrorAction.REPLACE);330 decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);331 }332 String str = decoder.decode(utf8).toString();333 // set decoder back to its default value: REPORT334 if (replace) {335 decoder.onMalformedInput(CodingErrorAction.REPORT);336 decoder.onUnmappableCharacter(CodingErrorAction.REPORT);337 }338 return str;339 }340 341 /**342 * Converts the provided String to bytes using the343 * UTF-8 encoding. If the input is malformed,344 * invalid chars are replaced by a default value.345 * @return ByteBuffer: bytes stores at ByteBuffer.array() 346 * and length is ByteBuffer.limit()347 */348 349 public static ByteBuffer encode(String string)350 throws CharacterCodingException {351 return encode(string, true);352 }353 354 /**355 * Converts the provided String to bytes using the356 * UTF-8 encoding. If <code>replace</code> is true, then357 * malformed input is replaced with the358 * substitution character, which is U+FFFD. Otherwise the359 * method throws a MalformedInputException.360 * @return ByteBuffer: bytes stores at ByteBuffer.array() 361 * and length is ByteBuffer.limit()362 */363 public static ByteBuffer encode(String string, boolean replace)364 throws CharacterCodingException {365 CharsetEncoder encoder = ENCODER_FACTORY.get();366 if (replace) {367 encoder.onMalformedInput(CodingErrorAction.REPLACE);368 encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);369 }370 ByteBuffer bytes = 371 encoder.encode(CharBuffer.wrap(string.toCharArray()));372 if (replace) {373 encoder.onMalformedInput(CodingErrorAction.REPORT);374 encoder.onUnmappableCharacter(CodingErrorAction.REPORT);375 }376 return bytes;377 }378 379 /** Read a UTF8 encoded string from in380 */381 public static String readString(DataInput in) throws IOException {382 int length = WritableUtils.readVInt(in);383 byte [] bytes = new byte[length];384 in.readFully(bytes, 0, length);385 return decode(bytes);386 }387 388 /** Write a UTF8 encoded string to out389 */390 public static int writeString(DataOutput out, String s) throws IOException {391 ByteBuffer bytes = encode(s);392 int length = bytes.limit();393 WritableUtils.writeVInt(out, length);394 out.write(bytes.array(), 0, length);395 return length;396 }397 398 ////// states for validateUTF8399 400 private static final int LEAD_BYTE = 0;401 402 private static final int TRAIL_BYTE_1 = 1;403 404 private static final int TRAIL_BYTE = 2;405 406 /** 407 * Check if a byte array contains valid utf-8408 * @param utf8 byte array409 * @throws MalformedInputException if the byte array contains invalid utf-8410 */411 public static void validateUTF8(byte[] utf8) throws MalformedInputException {412 validateUTF8(utf8, 0, utf8.length); 413 }414 415 /**416 * Check to see if a byte array is valid utf-8417 * @param utf8 the array of bytes418 * @param start the offset of the first byte in the array419 * @param len the length of the byte sequence420 * @throws MalformedInputException if the byte array contains invalid bytes421 */422 public static void validateUTF8(byte[] utf8, int start, int len)423 throws MalformedInputException {424 int count = start;425 int leadByte = 0;426 int length = 0;427 int state = LEAD_BYTE;428 while (count < start+len) {429 int aByte = ((int) utf8[count] & 0xFF);430 431 switch (state) {432 case LEAD_BYTE:433 leadByte = aByte;434 length = bytesFromUTF8[aByte];435 436 switch (length) {437 case 0: // check for ASCII438 if (leadByte > 0x7F)439 throw new MalformedInputException(count);440 break;441 case 1:442 if (leadByte < 0xC2 || leadByte > 0xDF)443 throw new MalformedInputException(count);444 state = TRAIL_BYTE_1;445 break;446 case 2:447 if (leadByte < 0xE0 || leadByte > 0xEF)448 throw new MalformedInputException(count);449 state = TRAIL_BYTE_1;450 break;451 case 3:452 if (leadByte < 0xF0 || leadByte > 0xF4)453 throw new MalformedInputException(count);454 state = TRAIL_BYTE_1;455 break;456 default:457 // too long! Longest valid UTF-8 is 4 bytes (lead + three)458 // or if < 0 we got a trail byte in the lead byte position459 throw new MalformedInputException(count);460 } // switch (length)461 break;462 463 case TRAIL_BYTE_1:464 if (leadByte == 0xF0 && aByte < 0x90)465 throw new MalformedInputException(count);466 if (leadByte == 0xF4 && aByte > 0x8F)467 throw new MalformedInputException(count);468 if (leadByte == 0xE0 && aByte < 0xA0)469 throw new MalformedInputException(count);470 if (leadByte == 0xED && aByte > 0x9F)471 throw new MalformedInputException(count);472 // falls through to regular trail-byte test!!473 case TRAIL_BYTE:474 if (aByte < 0x80 || aByte > 0xBF)475 throw new MalformedInputException(count);476 if (--length == 0) {477 state = LEAD_BYTE;478 } else {479 state = TRAIL_BYTE;480 }481 break;482 } // switch (state)483 count++;484 }485 }486 487 /**488 * Magic numbers for UTF-8. These are the number of bytes489 * that <em>follow</em> a given lead byte. Trailing bytes490 * have the value -1. The values 4 and 5 are presented in491 * this table, even though valid UTF-8 cannot include the492 * five and six byte sequences.493 */494 static final int[] bytesFromUTF8 =495 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,496 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,497 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,498 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,499 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,500 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,501 0, 0, 0, 0, 0, 0, 0,502 // trail bytes503 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,504 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,505 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,506 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,507 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,508 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3,509 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5 };510 511 /**512 * Returns the next code point at the current position in513 * the buffer. The buffer‘s position will be incremented.514 * Any mark set on this buffer will be changed by this method!515 */516 public static int bytesToCodePoint(ByteBuffer bytes) {517 bytes.mark();518 byte b = bytes.get();519 bytes.reset();520 int extraBytesToRead = bytesFromUTF8[(b & 0xFF)];521 if (extraBytesToRead < 0) return -1; // trailing byte!522 int ch = 0;523 524 switch (extraBytesToRead) {525 case 5: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */526 case 4: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */527 case 3: ch += (bytes.get() & 0xFF); ch <<= 6;528 case 2: ch += (bytes.get() & 0xFF); ch <<= 6;529 case 1: ch += (bytes.get() & 0xFF); ch <<= 6;530 case 0: ch += (bytes.get() & 0xFF);531 }532 ch -= offsetsFromUTF8[extraBytesToRead];533 534 return ch;535 }536 537 538 static final int offsetsFromUTF8[] =539 { 0x00000000, 0x00003080,540 0x000E2080, 0x03C82080, 0xFA082080, 0x82082080 };541 542 /**543 * For the given string, returns the number of UTF-8 bytes544 * required to encode the string.545 * @param string text to encode546 * @return number of UTF-8 bytes required to encode547 */548 public static int utf8Length(String string) {549 CharacterIterator iter = new StringCharacterIterator(string);550 char ch = iter.first();551 int size = 0;552 while (ch != CharacterIterator.DONE) {553 if ((ch >= 0xD800) && (ch < 0xDC00)) {554 // surrogate pair?555 char trail = iter.next();556 if ((trail > 0xDBFF) && (trail < 0xE000)) {557 // valid pair558 size += 4;559 } else {560 // invalid pair561 size += 3;562 iter.previous(); // rewind one563 }564 } else if (ch < 0x80) {565 size++;566 } else if (ch < 0x800) {567 size += 2;568 } else {569 // ch < 0x10000, that is, the largest char value570 size += 3;571 }572 ch = iter.next();573 }574 return size;575 }576 }
它继承了BinaryComparable基类、实现了WritableComparable<BinaryComparable>接口
WritableComparable已经在上面讲述,现来分析BinaryComparable基类,定义如下:
1 package org.apache.hadoop.io; 2 public abstract class BinaryComparable implements Comparable<BinaryComparable> { 3 public abstract int getLength(); 4 public abstract byte[] getBytes(); 5 public int compareTo(BinaryComparable other) { 6 if (this == other) 7 return 0; 8 return WritableComparator.compareBytes(getBytes(), 0, getLength(), 9 other.getBytes(), 0, other.getLength());10 }11 public int compareTo(byte[] other, int off, int len) {12 return WritableComparator.compareBytes(getBytes(), 0, getLength(),13 other, off, len);14 }15 public boolean equals(Object other) {16 if (!(other instanceof BinaryComparable))17 return false;18 BinaryComparable that = (BinaryComparable)other;19 if (this.getLength() != that.getLength())20 return false;21 return this.compareTo(that) == 0;22 }23 public int hashCode() {24 return WritableComparator.hashBytes(getBytes(), getLength());25 }26 27 }
BinaryComparable是一个抽象类,主要是提供一个在二进制流这一层次直接比较两个对象的功能
其中
WritableComparator.compareBytes(getBytes(), 0, getLength(), other.getBytes(), 0, other.getLength());
是根据字典序排序返回比较结果。
而
WritableComparator.hashBytes(getBytes(), getLength());
则是返回字节流的hashCode;
现在总括看看Text的方法
1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.io; 20 21 import java.io.IOException; 22 import java.io.DataInput; 23 import java.io.DataOutput; 24 import java.nio.ByteBuffer; 25 import java.nio.CharBuffer; 26 import java.nio.charset.CharacterCodingException; 27 import java.nio.charset.Charset; 28 import java.nio.charset.CharsetDecoder; 29 import java.nio.charset.CharsetEncoder; 30 import java.nio.charset.CodingErrorAction; 31 import java.nio.charset.MalformedInputException; 32 import java.text.CharacterIterator; 33 import java.text.StringCharacterIterator; 34 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 38 /** This class stores text using standard UTF8 encoding. It provides methods 39 * to serialize, deserialize, and compare texts at byte level. The type of 40 * length is integer and is serialized using zero-compressed format. <p>In 41 * addition, it provides methods for string traversal without converting the 42 * byte array to a string. <p>Also includes utilities for 43 * serializing/deserialing a string, coding/decoding a string, checking if a 44 * byte array contains valid UTF8 code, calculating the length of an encoded 45 * string. 46 */ 47 public class Text extends BinaryComparable 48 implements WritableComparable<BinaryComparable> { 49 private static final Log LOG= LogFactory.getLog(Text.class); 50 51 private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = 52 new ThreadLocal<CharsetEncoder>() { 53 protected CharsetEncoder initialValue() { 54 return Charset.forName("UTF-8").newEncoder(). 55 onMalformedInput(CodingErrorAction.REPORT). 56 onUnmappableCharacter(CodingErrorAction.REPORT); 57 } 58 }; 59 60 private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = 61 new ThreadLocal<CharsetDecoder>() { 62 protected CharsetDecoder initialValue() { 63 return Charset.forName("UTF-8").newDecoder(). 64 onMalformedInput(CodingErrorAction.REPORT). 65 onUnmappableCharacter(CodingErrorAction.REPORT); 66 } 67 }; 68 69 private static final byte [] EMPTY_BYTES = new byte[0]; 70 71 private byte[] bytes; 72 private int length; 73 74 public Text() { 75 bytes = EMPTY_BYTES; 76 } 77 78 /** Construct from a string. 79 */ 80 public Text(String string) { 81 set(string); 82 } 83 84 /** Construct from another text. */ 85 public Text(Text utf8) { 86 set(utf8); 87 } 88 89 /** Construct from a byte array. 90 */ 91 public Text(byte[] utf8) { 92 set(utf8); 93 } 94 95 /** 96 * Returns the raw bytes; however, only data up to {@link #getLength()} is 97 * valid. 98 */ 99 public byte[] getBytes() {100 return bytes;101 }102 103 /** Returns the number of bytes in the byte array */ 104 public int getLength() {105 return length;106 }107 108 /**109 * Returns the Unicode Scalar Value (32-bit integer value)110 * for the character at <code>position</code>. Note that this111 * method avoids using the converter or doing String instatiation112 * @return the Unicode scalar value at position or -1113 * if the position is invalid or points to a114 * trailing byte115 */116 public int charAt(int position) {117 if (position > this.length) return -1; // too long118 if (position < 0) return -1; // duh.119 120 ByteBuffer bb = (ByteBuffer)ByteBuffer.wrap(bytes).position(position);121 return bytesToCodePoint(bb.slice());122 }123 124 public int find(String what) {125 return find(what, 0);126 }127 128 /**129 * Finds any occurence of <code>what</code> in the backing130 * buffer, starting as position <code>start</code>. The starting131 * position is measured in bytes and the return value is in132 * terms of byte position in the buffer. The backing buffer is133 * not converted to a string for this operation.134 * @return byte position of the first occurence of the search135 * string in the UTF-8 buffer or -1 if not found136 */137 public int find(String what, int start) {138 try {139 ByteBuffer src = http://www.mamicode.com/ByteBuffer.wrap(this.bytes,0,this.length);140 ByteBuffer tgt = encode(what);141 byte b = tgt.get();142 src.position(start);143 144 while (src.hasRemaining()) {145 if (b == src.get()) { // matching first byte146 src.mark(); // save position in loop147 tgt.mark(); // save position in target148 boolean found = true;149 int pos = src.position()-1;150 while (tgt.hasRemaining()) {151 if (!src.hasRemaining()) { // src expired first152 tgt.reset();153 src.reset();154 found = false;155 break;156 }157 if (!(tgt.get() == src.get())) {158 tgt.reset();159 src.reset();160 found = false;161 break; // no match162 }163 }164 if (found) return pos;165 }166 }167 return -1; // not found168 } catch (CharacterCodingException e) {169 // can‘t get here170 e.printStackTrace();171 return -1;172 }173 } 174 /** Set to contain the contents of a string. 175 */176 public void set(String string) {177 try {178 ByteBuffer bb = encode(string, true);179 bytes = bb.array();180 length = bb.limit();181 }catch(CharacterCodingException e) {182 throw new RuntimeException("Should not have happened " + e.toString()); 183 }184 }185 186 /** Set to a utf8 byte array187 */188 public void set(byte[] utf8) {189 set(utf8, 0, utf8.length);190 }191 192 /** copy a text. */193 public void set(Text other) {194 set(other.getBytes(), 0, other.getLength());195 }196 197 /**198 * Set the Text to range of bytes199 * @param utf8 the data to copy from200 * @param start the first position of the new string201 * @param len the number of bytes of the new string202 */203 public void set(byte[] utf8, int start, int len) {204 setCapacity(len, false);205 System.arraycopy(utf8, start, bytes, 0, len);206 this.length = len;207 }208 209 /**210 * Append a range of bytes to the end of the given text211 * @param utf8 the data to copy from212 * @param start the first position to append from utf8213 * @param len the number of bytes to append214 */215 public void append(byte[] utf8, int start, int len) {216 setCapacity(length + len, true);217 System.arraycopy(utf8, start, bytes, length, len);218 length += len;219 }220 221 /**222 * Clear the string to empty.223 */224 public void clear() {225 length = 0;226 }227 228 /*229 * Sets the capacity of this Text object to <em>at least</em>230 * <code>len</code> bytes. If the current buffer is longer,231 * then the capacity and existing content of the buffer are232 * unchanged. If <code>len</code> is larger233 * than the current capacity, the Text object‘s capacity is234 * increased to match.235 * @param len the number of bytes we need236 * @param keepData should the old data be kept237 */238 private void setCapacity(int len, boolean keepData) {239 if (bytes == null || bytes.length < len) {240 byte[] newBytes = new byte[len];241 if (bytes != null && keepData) {242 System.arraycopy(bytes, 0, newBytes, 0, length);243 }244 bytes = newBytes;245 }246 }247 248 /** 249 * Convert text back to string250 * @see java.lang.Object#toString()251 */252 public String toString() {253 try {254 return decode(bytes, 0, length);255 } catch (CharacterCodingException e) { 256 throw new RuntimeException("Should not have happened " + e.toString()); 257 }258 }259 260 /** deserialize 261 */262 public void readFields(DataInput in) throws IOException {263 int newLength = WritableUtils.readVInt(in);264 setCapacity(newLength, false);265 in.readFully(bytes, 0, newLength);266 length = newLength;267 }268 269 /** Skips over one Text in the input. */270 public static void skip(DataInput in) throws IOException {271 int length = WritableUtils.readVInt(in);272 WritableUtils.skipFully(in, length);273 }274 275 /** serialize276 * write this object to out277 * length uses zero-compressed encoding278 * @see Writable#write(DataOutput)279 */280 public void write(DataOutput out) throws IOException {281 WritableUtils.writeVInt(out, length);282 out.write(bytes, 0, length);283 }284 285 /** Returns true iff <code>o</code> is a Text with the same contents. */286 public boolean equals(Object o) {287 if (o instanceof Text)288 return super.equals(o);289 return false;290 }291 292 public int hashCode() {293 return super.hashCode();294 }295 296 /** A WritableComparator optimized for Text keys. */297 public static class Comparator extends WritableComparator {298 public Comparator() {299 super(Text.class);300 }301 302 public int compare(byte[] b1, int s1, int l1,303 byte[] b2, int s2, int l2) {304 int n1 = WritableUtils.decodeVIntSize(b1[s1]);305 int n2 = WritableUtils.decodeVIntSize(b2[s2]);306 return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);307 }308 }309 310 static {311 // register this comparator312 WritableComparator.define(Text.class, new Comparator());313 }314 315 /// STATIC UTILITIES FROM HERE DOWN316 /**317 * Converts the provided byte array to a String using the318 * UTF-8 encoding. If the input is malformed,319 * replace by a default value.320 */321 public static String decode(byte[] utf8) throws CharacterCodingException {322 return decode(ByteBuffer.wrap(utf8), true);323 }324 325 public static String decode(byte[] utf8, int start, int length) 326 throws CharacterCodingException {327 return decode(ByteBuffer.wrap(utf8, start, length), true);328 }329 330 /**331 * Converts the provided byte array to a String using the332 * UTF-8 encoding. If <code>replace</code> is true, then333 * malformed input is replaced with the334 * substitution character, which is U+FFFD. Otherwise the335 * method throws a MalformedInputException.336 */337 public static String decode(byte[] utf8, int start, int length, boolean replace) 338 throws CharacterCodingException {339 return decode(ByteBuffer.wrap(utf8, start, length), replace);340 }341 342 private static String decode(ByteBuffer utf8, boolean replace) 343 throws CharacterCodingException {344 CharsetDecoder decoder = DECODER_FACTORY.get();345 if (replace) {346 decoder.onMalformedInput(347 java.nio.charset.CodingErrorAction.REPLACE);348 decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);349 }350 String str = decoder.decode(utf8).toString();351 // set decoder back to its default value: REPORT352 if (replace) {353 decoder.onMalformedInput(CodingErrorAction.REPORT);354 decoder.onUnmappableCharacter(CodingErrorAction.REPORT);355 }356 return str;357 }358 359 /**360 * Converts the provided String to bytes using the361 * UTF-8 encoding. If the input is malformed,362 * invalid chars are replaced by a default value.363 * @return ByteBuffer: bytes stores at ByteBuffer.array() 364 * and length is ByteBuffer.limit()365 */366 367 public static ByteBuffer encode(String string)368 throws CharacterCodingException {369 return encode(string, true);370 }371 372 /**373 * Converts the provided String to bytes using the374 * UTF-8 encoding. If <code>replace</code> is true, then375 * malformed input is replaced with the376 * substitution character, which is U+FFFD. Otherwise the377 * method throws a MalformedInputException.378 * @return ByteBuffer: bytes stores at ByteBuffer.array() 379 * and length is ByteBuffer.limit()380 */381 public static ByteBuffer encode(String string, boolean replace)382 throws CharacterCodingException {383 CharsetEncoder encoder = ENCODER_FACTORY.get();384 if (replace) {385 encoder.onMalformedInput(CodingErrorAction.REPLACE);386 encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);387 }388 ByteBuffer bytes = 389 encoder.encode(CharBuffer.wrap(string.toCharArray()));390 if (replace) {391 encoder.onMalformedInput(CodingErrorAction.REPORT);392 encoder.onUnmappableCharacter(CodingErrorAction.REPORT);393 }394 return bytes;395 }396 397 /** Read a UTF8 encoded string from in398 */399 public static String readString(DataInput in) throws IOException {400 int length = WritableUtils.readVInt(in);401 byte [] bytes = new byte[length];402 in.readFully(bytes, 0, length);403 return decode(bytes);404 }405 406 /** Write a UTF8 encoded string to out407 */408 public static int writeString(DataOutput out, String s) throws IOException {409 ByteBuffer bytes = encode(s);410 int length = bytes.limit();411 WritableUtils.writeVInt(out, length);412 out.write(bytes.array(), 0, length);413 return length;414 }415 416 ////// states for validateUTF8417 418 private static final int LEAD_BYTE = 0;419 420 private static final int TRAIL_BYTE_1 = 1;421 422 private static final int TRAIL_BYTE = 2;423 424 /** 425 * Check if a byte array contains valid utf-8426 * @param utf8 byte array427 * @throws MalformedInputException if the byte array contains invalid utf-8428 */429 public static void validateUTF8(byte[] utf8) throws MalformedInputException {430 validateUTF8(utf8, 0, utf8.length); 431 }432 433 /**434 * Check to see if a byte array is valid utf-8435 * @param utf8 the array of bytes436 * @param start the offset of the first byte in the array437 * @param len the length of the byte sequence438 * @throws MalformedInputException if the byte array contains invalid bytes439 */440 public static void validateUTF8(byte[] utf8, int start, int len)441 throws MalformedInputException {442 int count = start;443 int leadByte = 0;444 int length = 0;445 int state = LEAD_BYTE;446 while (count < start+len) {447 int aByte = ((int) utf8[count] & 0xFF);448 449 switch (state) {450 case LEAD_BYTE:451 leadByte = aByte;452 length = bytesFromUTF8[aByte];453 454 switch (length) {455 case 0: // check for ASCII456 if (leadByte > 0x7F)457 throw new MalformedInputException(count);458 break;459 case 1:460 if (leadByte < 0xC2 || leadByte > 0xDF)461 throw new MalformedInputException(count);462 state = TRAIL_BYTE_1;463 break;464 case 2:465 if (leadByte < 0xE0 || leadByte > 0xEF)466 throw new MalformedInputException(count);467 state = TRAIL_BYTE_1;468 break;469 case 3:470 if (leadByte < 0xF0 || leadByte > 0xF4)471 throw new MalformedInputException(count);472 state = TRAIL_BYTE_1;473 break;474 default:475 // too long! Longest valid UTF-8 is 4 bytes (lead + three)476 // or if < 0 we got a trail byte in the lead byte position477 throw new MalformedInputException(count);478 } // switch (length)479 break;480 481 case TRAIL_BYTE_1:482 if (leadByte == 0xF0 && aByte < 0x90)483 throw new MalformedInputException(count);484 if (leadByte == 0xF4 && aByte > 0x8F)485 throw new MalformedInputException(count);486 if (leadByte == 0xE0 && aByte < 0xA0)487 throw new MalformedInputException(count);488 if (leadByte == 0xED && aByte > 0x9F)489 throw new MalformedInputException(count);490 // falls through to regular trail-byte test!!491 case TRAIL_BYTE:492 if (aByte < 0x80 || aByte > 0xBF)493 throw new MalformedInputException(count);494 if (--length == 0) {495 state = LEAD_BYTE;496 } else {497 state = TRAIL_BYTE;498 }499 break;500 } // switch (state)501 count++;502 }503 }504 505 /**506 * Magic numbers for UTF-8. These are the number of bytes507 * that <em>follow</em> a given lead byte. Trailing bytes508 * have the value -1. The values 4 and 5 are presented in509 * this table, even though valid UTF-8 cannot include the510 * five and six byte sequences.511 */512 static final int[] bytesFromUTF8 =513 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,514 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,515 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,516 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,517 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,518 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,519 0, 0, 0, 0, 0, 0, 0,520 // trail bytes521 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,522 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,523 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,524 -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,525 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,526 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3,527 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5 };528 529 /**530 * Returns the next code point at the current position in531 * the buffer. The buffer‘s position will be incremented.532 * Any mark set on this buffer will be changed by this method!533 */534 public static int bytesToCodePoint(ByteBuffer bytes) {535 bytes.mark();536 byte b = bytes.get();537 bytes.reset();538 int extraBytesToRead = bytesFromUTF8[(b & 0xFF)];539 if (extraBytesToRead < 0) return -1; // trailing byte!540 int ch = 0;541 542 switch (extraBytesToRead) {543 case 5: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */544 case 4: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */545 case 3: ch += (bytes.get() & 0xFF); ch <<= 6;546 case 2: ch += (bytes.get() & 0xFF); ch <<= 6;547 case 1: ch += (bytes.get() & 0xFF); ch <<= 6;548 case 0: ch += (bytes.get() & 0xFF);549 }550 ch -= offsetsFromUTF8[extraBytesToRead];551 552 return ch;553 }554 555 556 static final int offsetsFromUTF8[] =557 { 0x00000000, 0x00003080,558 0x000E2080, 0x03C82080, 0xFA082080, 0x82082080 };559 560 /**561 * For the given string, returns the number of UTF-8 bytes562 * required to encode the string.563 * @param string text to encode564 * @return number of UTF-8 bytes required to encode565 */566 public static int utf8Length(String string) {567 CharacterIterator iter = new StringCharacterIterator(string);568 char ch = iter.first();569 int size = 0;570 while (ch != CharacterIterator.DONE) {571 if ((ch >= 0xD800) && (ch < 0xDC00)) {572 // surrogate pair?573 char trail = iter.next();574 if ((trail > 0xDBFF) && (trail < 0xE000)) {575 // valid pair576 size += 4;577 } else {578 // invalid pair579 size += 3;580 iter.previous(); // rewind one581 }582 } else if (ch < 0x80) {583 size++;584 } else if (ch < 0x800) {585 size += 2;586 } else {587 // ch < 0x10000, that is, the largest char value588 size += 3;589 }590 ch = iter.next();591 }592 return size;593 }594 }
Text是针对UTF-8序列的Writable类,一般可以认为它等价于java.lang.String 的 Writable,为了与输入流输出流DataInput、DataOutput兼容,Text是使用Java的UTF-8修改版来进行编码。关于UTF-8修改版如下:
本文基于知识共享署名-非商业性使用 3.0 许可协议进行许可。欢迎转载、演绎,但是必须保留本文的署名林羽飞扬,若需咨询,请给我发信
[hadoop]Hadoop源码分析-Text