首页 > 代码库 > Hadoop 高级程序设计(二)---自定义输入输出格式

Hadoop 高级程序设计(二)---自定义输入输出格式

Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式。

数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转换为Map过程的输入键值对等功能。Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于每个输入格式都有与之对应的RecordReader,LineRecordReader和KeyValueLineRecordReader。用户需要自定义输入格式,主要实现InputFormat中的createRecordReader()和getSplit()方法,而在RecordReader中实现getCurrentKey().....

例如:

package com.rpc.nefu;

import java.io.IOException;   
import org.apache.hadoop.fs.FSDataInputStream;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.InputSplit;    
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.util.LineReader;  
import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
 
//自定义的输入格式需要 继承FileInputFormat接口
public class ZInputFormat extends FileInputFormat<IntWritable,IntWritable>{  
          
        @Override  //实现RecordReader
        public RecordReader<IntWritable, IntWritable> createRecordReader(  
                InputSplit split, TaskAttemptContext context)  
                throws IOException, InterruptedException {  
            return new ZRecordReader();                                                  
        }  
  
        //自定义的数据类型  
        public static class ZRecordReader extends RecordReader<IntWritable,IntWritable>  
        {  
            //data  
            private LineReader in;      //输入流  
            private boolean more = true;//提示后续还有没有数据  
              
            private IntWritable key = null;  
            private IntWritable value = http://www.mamicode.com/null;  >
package reverseIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FileNameLocInputFormat extends FileInputFormat<Text, Text>{

	@Override
	public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(
			org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new FileNameLocRecordReader();
	}
	public static class FileNameLocRecordReader extends RecordReader<Text,Text>{
		
		String FileName;
		LineRecordReader line = new LineRecordReader();
		/**
		 * ......
		 */ 

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return new Text("("+FileName+"@"+line.getCurrentKey()+")");
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return line.getCurrentValue();
		}

		

		@Override
		public void initialize(InputSplit split, TaskAttemptContext arg1)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			line.initialize(split, arg1);
			FileSplit inputsplit = (FileSplit)split;
			FileName = (inputsplit).getPath().getName();	
		}

		@Override
		public void close() throws IOException {
			// TODO Auto-generated method stub
			
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return false;
		}
	}
}
Hadoop中也内置了很多的输出格式与RecordWriter.输出格式完成输出规范检查,作业结果数据输出。

自定义的输出格式:

public static class AlphaOutputFormat extends multiformat<Text, IntWritable>{
		
		@Override
		protected String generateFileNameForKeyValue(Text key,
				IntWritable value, Configuration conf) {
			// TODO Auto-generated method stub
			char c = key.toString().toLowerCase().charAt(0);
			if( c>='a' && c<='z'){
				return c+".txt";
			}else{
				return "other.txt";
			}
		}
		
	}

//设置输出格式
		job.setOutputFormatClass(AlphaOutputFormat.class);

package com.rpc.nefu;
import java.io.DataOutputStream;  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Iterator;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FSDataOutputStream;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.io.WritableComparable;  
import org.apache.hadoop.io.compress.CompressionCodec;  
import org.apache.hadoop.io.compress.GzipCodec;  
import org.apache.hadoop.mapreduce.OutputCommitter;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.ReflectionUtils;  

public abstract class multiformat<K extends WritableComparable<?>, V extends Writable>  
        extends FileOutputFormat<K, V> {  
    private MultiRecordWriter writer = null;  
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,  
            InterruptedException {  
        if (writer == null) {  
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));  
        }  
        return writer;  
    }  
    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {  
        Path workPath = null;  
        OutputCommitter committer = super.getOutputCommitter(conf);  
        if (committer instanceof FileOutputCommitter) {  
            workPath = ((FileOutputCommitter) committer).getWorkPath();  
        } else {  
            Path outputPath = super.getOutputPath(conf);  
            if (outputPath == null) {  
                throw new IOException("Undefined job output-path");  
            }  
            workPath = outputPath;  
        }  
        return workPath;  
    }  
    /**通过key, value, conf来确定输出文件名(含扩展名)*/  
    protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);  
    public class MultiRecordWriter extends RecordWriter<K, V> {  
        /**RecordWriter的缓存*/  
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;  
        private TaskAttemptContext job = null;  
        /**输出目录*/  
        private Path workPath = null;  
        public MultiRecordWriter(TaskAttemptContext job, Path workPath) {  
            super();  
            this.job = job;  
            this.workPath = workPath;  
            recordWriters = new HashMap<String, RecordWriter<K, V>>();  
        }  
        @Override  
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();  
            while (values.hasNext()) {  
                values.next().close(context);  
            }  
            this.recordWriters.clear();  
        }  
        @Override  
        public void write(K key, V value) throws IOException, InterruptedException {  
            //得到输出文件名  
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());  
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);  
            if (rw == null) {  
                rw = getBaseRecordWriter(job, baseName);  
                this.recordWriters.put(baseName, rw);  
            }  
            rw.write(key, value);  
        }  
        // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}  
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)  
                throws IOException, InterruptedException {  
            Configuration conf = job.getConfiguration();  
            boolean isCompressed = getCompressOutput(job);  
            String keyValueSeparator = ",";  
            RecordWriter<K, V> recordWriter = null;  
            if (isCompressed) {  
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,  
                        GzipCodec.class);  
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);  
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());  
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
                recordWriter = new lineRecordWrite<K, V>(new DataOutputStream(codec  
                        .createOutputStream(fileOut)), keyValueSeparator);  
            } else {  
                Path file = new Path(workPath, baseName);  
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
                recordWriter = new lineRecordWrite<K, V>(fileOut, keyValueSeparator);  
            }  
            return recordWriter;  
        }  
    }  
}  




Hadoop 高级程序设计(二)---自定义输入输出格式