首页 > 代码库 > MR中简单实现自定义的输入输出格式
MR中简单实现自定义的输入输出格式
import java.io.DataOutput;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class TestCombine extends Configured implements Tool { private static class ProvinceMapper extends Mapper<Object, Text, Text, Text> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("value : " + value + " Context " + context); context.write(value, value); } } private static class ProvinceReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text va : values) { System.out.println("reduce " + key); context.write(key, key); } } } // 输入格式 static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class); } } static class CombineLineRecordReader<K, V> extends RecordReader<K, V> { private CombineFileSplit split; private TaskAttemptContext context; private int index; private RecordReader<K, V> rr; @SuppressWarnings("unchecked") public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { this.index = index; this.split = (CombineFileSplit) split; this.context = context; this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration()); } @SuppressWarnings("unchecked") @Override public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException { this.split = (CombineFileSplit) curSplit; this.context = curContext; if (null == rr) { rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration()); } FileSplit fileSplit = new FileSplit(this.split.getPath(index), this.split.getOffset(index), this.split.getLength(index), this.split.getLocations()); this.rr.initialize(fileSplit, this.context); } @Override public float getProgress() throws IOException, InterruptedException { return rr.getProgress(); } @Override public void close() throws IOException { if (null != rr) { rr.close(); rr = null; } } @Override public K getCurrentKey() throws IOException, InterruptedException { return rr.getCurrentKey(); } @Override public V getCurrentValue() throws IOException, InterruptedException { return rr.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return rr.nextKeyValue(); } } // 输出格式 static class MyOutputFormat extends FileOutputFormat<Text, Text>{ @Override public RecordWriter<Text, Text> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { return new MyRecordWriter(job); } } public static class MyRecordWriter extends RecordWriter<Text, Text> { private Map<String, FSDataOutputStream> outputMap = null; private static final String LINESEPARATOR = "\n"; private FileSystem fs; private JobContext job; public MyRecordWriter(JobContext job) throws IOException { this.outputMap = new HashMap<String, FSDataOutputStream>(); this.job = job; this.fs = FileSystem.get(job.getConfiguration()); } // 参考 MultipleOutputs public void write(Text key, Text value) throws IOException { String k = key.toString(); if(k.isEmpty()) return; FSDataOutputStream out = outputMap.get(k); if(out==null) { if(k.isEmpty()) System.out.println(value.toString()); Path outputPath = new Path(FileOutputFormat.getOutputPath(job), k); if(!fs.exists(outputPath)) out = fs.create(outputPath); else return; outputMap.put(k, out); } out.write(value.getBytes()); out.write(LINESEPARATOR.getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for(FSDataOutputStream out : outputMap.values()) { out.close(); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("TestCombine"); job.setJarByClass(TestCombine.class); job.setMapperClass(ProvinceMapper.class); job.setReducerClass(ProvinceReducer.class); //job.setInputFormatClass(CombineSequenceFileInputFormat.class); job.setOutputFormatClass(MyOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String inpath = "/home/hadoop/tmp/combine"; String outpath = "/home/hadoop/tmp/combineout"; Path p = new Path(outpath); FileSystem fs = FileSystem.get(conf); if (fs.exists(p)){ fs.delete(p); } FileInputFormat.addInputPaths(job, inpath); FileOutputFormat.setOutputPath(job, p); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new TestCombine(), args); System.exit(ret); } }
MR中简单实现自定义的输入输出格式
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。