首页 > 代码库 > 【转载】Hadoop自定义RecordReader

【转载】Hadoop自定义RecordReader

转自:http://www.linuxidc.com/Linux/2012-04/57831.htm

系统默认的LineRecordReader是按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。

现在要更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。

那么我们需要重写InputFormat和RecordReader,因为RecordReader是在InputFormat中调用的,当然重写RecordReader才是重点!

下面看代码InputFormat的重写:

  1. public class chDicInputFormat extends FileInputFormat<Text,Text>  
  2.     implements JobConfigurable{  
  3.     private CompressionCodecFactory compressionCodecs = null;  
  4.     public void configure(JobConf conf) {  
  5.         compressionCodecs = new CompressionCodecFactory(conf);  
  6.     }  
  7.     /** 
  8.      * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理 
  9.      * 
  10.      * @param fs 
  11.      * @param file 
  12.      * 
  13.      * @return false 
  14.      */  
  15.     protected boolean isSplitable(FileSystem fs, Path file) {  
  16.     //  CompressionCodec codec = compressionCodecs.getCode(file);   
  17.         return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片   
  18.     }  
  19.   
  20.     public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,  
  21.                             JobConf job, Reporter reporter) throws IOException{  
  22.         reporter.setStatus(genericSplit.toString());  
  23.         return new chDicRecordReader(job,(FileSplit)genericSplit);  
  24.     }  
  25.   
  26. }  

下面来看RecordReader的重写:

  1. public class chDicRecordReader implements RecordReader<Text,Text> {  
  2.     private static final Log LOG = LogFactory.getLog(chDicRecordReader.class.getName());  
  3.     private CompressionCodecFactory compressionCodecs = null;  
  4.     private long start;  
  5.     private long pos;  
  6.     private long end;  
  7.     private byte[] buffer;  
  8.     private String keyName;  
  9.     private FSDataInputStream fileIn;  
  10.       
  11.     public chDicRecordReader(Configuration job,FileSplit split) throws IOException{  
  12.         start = split.getStart(); //从中可以看出每个文件是作为一个split的   
  13.         end = split.getLength() + start;  
  14.         final Path path = split.getPath();  
  15.         keyName = path.toString();  
  16.         LOG.info("filename in hdfs is : " + keyName);  
  17.         final FileSystem fs = path.getFileSystem(job);  
  18.         fileIn = fs.open(path);  
  19.         fileIn.seek(start);  
  20.         buffer = new byte[(int)(end - start)];  
  21.         this.pos = start;  
  22.   
  23.     }  
  24.   
  25.     public Text createKey() {  
  26.         return new Text();  
  27.     }  
  28.   
  29.     public Text createValue() {  
  30.         return new Text();  
  31.     }  
  32.   
  33.     public long getPos() throws IOException{  
  34.         return pos;  
  35.     }  
  36.   
  37.     public float getProgress() {  
  38.         if (start == end) {  
  39.             return 0.0f;  
  40.         } else {  
  41.             return Math.min(1.0f, (pos - start) / (float)(end - start));  
  42.         }  
  43.     }  
  44.   
  45.         public boolean next(Text key, Text value) throws IOException{  
  46.         while(pos < end) {  
  47.             key.set(keyName);  
  48.             value.clear();  
  49.             fileIn.readFully(pos,buffer);  
  50.             value.set(buffer);  
  51.     //      LOG.info("---内容: " + value.toString());   
  52.             pos += buffer.length;  
  53.             LOG.info("end is : " + end  + " pos is : " + pos);  
  54.             return true;  
  55.         }  
  56.         return false;  
  57.     }  
  58.   
  59.     public void close() throws IOException{  
  60.         if(fileIn != null) {  
  61.             fileIn.close();  
  62.         }  
  63.           
  64.     }  
  65.   
  66. }  

通过上面的代码,然后再在main函数中设置InputFormat对应的类,就可以使用这种新的读入格式了。

【转载】Hadoop自定义RecordReader