首页 > 代码库 > 【转载】自定义InputFormat

【转载】自定义InputFormat

转自:http://blog.csdn.net/jackydai987/article/details/6226108

系统默认的TextInputFormat.Java

 

[java] view plain copy
 
  1. public class TextInputFormat extends FileInputFormat<LongWritable, Text> {  
  2.   @Override  
  3.   public RecordReader<LongWritable, Text>   
  4.     createRecordReader(InputSplit split,  
  5.                        TaskAttemptContext context) {  
  6.     return new LineRecordReader();//这里是系统实现的的RecordReader按行读取,等会我们就是要改写这个类。  
  7.   }  
  8.   @Override  
  9.   protected boolean isSplitable(JobContext context, Path file) {  
  10.     CompressionCodec codec =   
  11.       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
  12.     return codec == null;//而这里通过返回一个null,实际就是关闭了对当前读入文件的划分。  
  13.   }  
  14. }  

 

这个类,没什么说的。接着我们来实现我们的读取类. MyRecordReader

 

[java] view plain copy
 
  1. //我实现的功能比较简单,只要明白了原理,剩下的就看自己发挥了。  
  2. //我们知道系统默认的TextInputFormat读取的key、value分别是偏移和行,而我就简单改下,改成key、value分别是行号和行  
  3. public class MyRecordReader extends RecordReader<LongWritable, Text>{  //这里继承RecordReader来实现我们自己的读取。  
  4.     private static final Log LOG = LogFactory.getLog(MyRecordReader.class);  
  5.        
  6.       private long pos; //记录行号  
  7.       private boolean more;  
  8.       private LineReader in;  
  9.       private int maxLineLength;  
  10.       private LongWritable key = null;  
  11.       private Text value = null;  
  12.       public void initialize(InputSplit genericSplit,  
  13.                              TaskAttemptContext context) throws IOException {  
  14.         pos = 1;  
  15.         more = true;  
  16.         FileSplit split = (FileSplit) genericSplit;//获取split  
  17.         Configuration job = context.getConfiguration();  
  18.         this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",  
  19.                                         Integer.MAX_VALUE);  
  20.          
  21.         final Path file = split.getPath();//得到文件路径  
  22.         // open the file and seek to the start of the split  
  23.         FileSystem fs = file.getFileSystem(job);  
  24.         FSDataInputStream fileIn = fs.open(split.getPath()); //打开文件  
  25.           
  26.         in = new LineReader(fileIn, job);  
  27.       }  
  28.         
  29.       public boolean nextKeyValue() throws IOException {  //这个函数会被MapRunner循环读取<key、value>  
  30.         if (key == null) {  
  31.           key = new LongWritable();  
  32.         }  
  33.         key.set(pos);  //设置key  
  34.         if (value == null) {  
  35.           value = new Text();  
  36.         }  
  37.         int newSize = 0;  
  38.         while (true) {  
  39.           newSize = in.readLine(value, maxLineLength,maxLineLength); //读取一行内容  
  40.           pos++;  //行号自加一  
  41.           if (newSize == 0) {  
  42.             break;  
  43.           }  
  44.            
  45.           if (newSize < maxLineLength) {  
  46.             break;  
  47.           }  
  48.             
  49.           // line too long. try again  
  50.           LOG.info("Skipped line of size " + newSize + " at pos " +   
  51.                    (pos - newSize));  
  52.         }  
  53.         if (newSize == 0) {  
  54.           key = null;  
  55.           value = null;  
  56.           more = false;  
  57.           return false;  
  58.         } else {  
  59.           return true;  
  60.         }  
  61.       }  
  62. //下面的函数都是必须实现的,跟系统默认的差不多,我就不多说了。  
  63.       @Override  
  64.       public LongWritable getCurrentKey() {  
  65.         return key;  
  66.       }  
  67.       @Override  
  68.       public Text getCurrentValue() {  
  69.         return value;  
  70.       }  
  71.       /** 
  72.        * Get the progress within the split 
  73.        */  
  74.       public float getProgress() {  
  75.         if (more) {  
  76.           return 0.0f;  
  77.         } else {  
  78.           return 100;  
  79.         }  
  80.       }  
  81.         
  82.       public synchronized void close() throws IOException {  
  83.         if (in != null) {  
  84.           in.close();   
  85.         }  
  86.       }  
  87. }  

 

有了,自己的MyInputFormat,就可以试试效果了。

 

[java] view plain copy
 
  1. Configuration conf = new Configuration();           Job job = new Job(conf, "helloword");  
  2.               
  3.             job.setJarByClass(helloHadoopV1.class);  
  4.             job.setInputFormatClass(MyInputFormat.class); //这里就是调用我们自己写的InputFormat  
  5.             job.setOutputKeyClass(LongWritable.class);  
  6.             job.setOutputValueClass(Text.class);  
  7.               
  8.             job.setMapperClass(HelloMapper.class);  
  9.             //job.setReducerClass(HelloReducer.class);  //这里不设置Reduce就会按Mapper的中间过程原样输出。  
  10.               
  11.               
  12.             FileInputFormat.setInputPaths(job, new Path(args[1]));  
  13.             FileOutputFormat.setOutputPath(job, new Path(args[2]));  
  14.               
  15.             checkAndDelete(args[2], conf);  
  16.               
  17.             job.waitForCompletion(true);  
技术分享

 

我们上面都自定义的都比较简单,这样才容易懂。

最后再补充一句,上面RecordReader<LongWritable, Text>的<key、value>都是可以自己定义的。但key必须实现WritableComparable类,而value必须实现Writable类。

【转载】自定义InputFormat