首页 > 代码库 > MR中简单实现自定义的输入输出格式

MR中简单实现自定义的输入输出格式

import java.io.DataOutput;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class TestCombine extends Configured implements Tool {	private static class ProvinceMapper extends			Mapper<Object, Text, Text, Text> {		@Override		protected void map(Object key, Text value, Context context)				throws IOException, InterruptedException {			System.out.println("value : " + value + " Context " + context);			context.write(value, value);		}	}	private static class ProvinceReducer extends			Reducer<Text, Text, Text, Text> {		@Override		protected void reduce(Text key, Iterable<Text> values, Context context)				throws IOException, InterruptedException {			for (Text va : values) {			    System.out.println("reduce " + key);				context.write(key, key);			}		}	}		 // 输入格式     static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  	    @SuppressWarnings({ "unchecked", "rawtypes" })  	    @Override  	    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  	        return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);  	    }  	}  		 static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {  	    private CombineFileSplit split;  	    private TaskAttemptContext context;  	    private int index;  	    private RecordReader<K, V> rr;  	  	    @SuppressWarnings("unchecked")  	    public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  	        this.index = index;	        this.split = (CombineFileSplit) split;  	        this.context = context;  	  	        this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());  	    }  	  	    @SuppressWarnings("unchecked")  	    @Override  	    public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  	        this.split = (CombineFileSplit) curSplit;  	        this.context = curContext;  	  	        if (null == rr) {  	            rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  	        }  	  	        FileSplit fileSplit = new FileSplit(this.split.getPath(index),  	                this.split.getOffset(index), this.split.getLength(index),  	                this.split.getLocations());  	          	        this.rr.initialize(fileSplit, this.context);  	    }  	  	    @Override  	    public float getProgress() throws IOException, InterruptedException {  	        return rr.getProgress();  	    }  	  	    @Override  	    public void close() throws IOException {  	        if (null != rr) {  	            rr.close();  	            rr = null;  	        }  	    }  	  	    @Override  	    public K getCurrentKey()  	    throws IOException, InterruptedException {  	        return rr.getCurrentKey();  	    }  	  	    @Override  	    public V getCurrentValue()  	    throws IOException, InterruptedException {  	        return rr.getCurrentValue();  	    }  	  	    @Override  	    public boolean nextKeyValue() throws IOException, InterruptedException {  	        return rr.nextKeyValue();  	    }  	}  		// 输出格式	 static class MyOutputFormat extends FileOutputFormat<Text, Text>{		@Override		public RecordWriter<Text, Text> getRecordWriter(				TaskAttemptContext job) throws IOException, InterruptedException {			return new MyRecordWriter(job);		}	}	  public static class  MyRecordWriter extends RecordWriter<Text, Text> {				private Map<String, FSDataOutputStream> outputMap = null;		private static final String LINESEPARATOR = "\n";		private FileSystem fs;		private JobContext job;				public MyRecordWriter(JobContext job) throws IOException {			this.outputMap = new HashMap<String, FSDataOutputStream>();			this.job = job;			this.fs = FileSystem.get(job.getConfiguration());		}				// 参考 MultipleOutputs		public void write(Text key, Text value) throws IOException {			String k = key.toString();			if(k.isEmpty())				return;			FSDataOutputStream out = outputMap.get(k);			if(out==null) {				if(k.isEmpty())					System.out.println(value.toString());				Path outputPath = new Path(FileOutputFormat.getOutputPath(job), k);				if(!fs.exists(outputPath))					out = fs.create(outputPath);				else					return;				outputMap.put(k, out);			}			out.write(value.getBytes());			out.write(LINESEPARATOR.getBytes());		}		@Override		public void close(TaskAttemptContext context) throws IOException,				InterruptedException {			for(FSDataOutputStream out : outputMap.values()) {				out.close();			}		}	}		public int run(String[] args) throws Exception {		Configuration conf = new Configuration();				Job job = new Job(conf);		job.setJobName("TestCombine");		job.setJarByClass(TestCombine.class);		job.setMapperClass(ProvinceMapper.class);		job.setReducerClass(ProvinceReducer.class);				//job.setInputFormatClass(CombineSequenceFileInputFormat.class);		job.setOutputFormatClass(MyOutputFormat.class);				job.setOutputKeyClass(Text.class);		job.setOutputValueClass(Text.class);				String inpath = "/home/hadoop/tmp/combine";		String outpath = "/home/hadoop/tmp/combineout";		Path p = new Path(outpath);				FileSystem fs = FileSystem.get(conf);		if (fs.exists(p)){			fs.delete(p);		}		FileInputFormat.addInputPaths(job, inpath);		FileOutputFormat.setOutputPath(job, p);		return job.waitForCompletion(true) ? 0 : 1;	} 	public static void main(String[] args) throws Exception {		int ret = ToolRunner.run(new TestCombine(), args);		System.exit(ret);	} } 

 

MR中简单实现自定义的输入输出格式