首页 > 代码库 > 【转载】自定义InputFormat
【转载】自定义InputFormat
转自:http://blog.csdn.net/jackydai987/article/details/6226108
系统默认的TextInputFormat.Java
[java] view plain copy
- public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
- @Override
- public RecordReader<LongWritable, Text>
- createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new LineRecordReader();//这里是系统实现的的RecordReader按行读取,等会我们就是要改写这个类。
- }
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
- return codec == null;//而这里通过返回一个null,实际就是关闭了对当前读入文件的划分。
- }
- }
这个类,没什么说的。接着我们来实现我们的读取类. MyRecordReader
[java] view plain copy
- //我实现的功能比较简单,只要明白了原理,剩下的就看自己发挥了。
- //我们知道系统默认的TextInputFormat读取的key、value分别是偏移和行,而我就简单改下,改成key、value分别是行号和行
- public class MyRecordReader extends RecordReader<LongWritable, Text>{ //这里继承RecordReader来实现我们自己的读取。
- private static final Log LOG = LogFactory.getLog(MyRecordReader.class);
- private long pos; //记录行号
- private boolean more;
- private LineReader in;
- private int maxLineLength;
- private LongWritable key = null;
- private Text value = null;
- public void initialize(InputSplit genericSplit,
- TaskAttemptContext context) throws IOException {
- pos = 1;
- more = true;
- FileSplit split = (FileSplit) genericSplit;//获取split
- Configuration job = context.getConfiguration();
- this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
- Integer.MAX_VALUE);
- final Path file = split.getPath();//得到文件路径
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath()); //打开文件
- in = new LineReader(fileIn, job);
- }
- public boolean nextKeyValue() throws IOException { //这个函数会被MapRunner循环读取<key、value>
- if (key == null) {
- key = new LongWritable();
- }
- key.set(pos); //设置key
- if (value == null) {
- value = new Text();
- }
- int newSize = 0;
- while (true) {
- newSize = in.readLine(value, maxLineLength,maxLineLength); //读取一行内容
- pos++; //行号自加一
- if (newSize == 0) {
- break;
- }
- if (newSize < maxLineLength) {
- break;
- }
- // line too long. try again
- LOG.info("Skipped line of size " + newSize + " at pos " +
- (pos - newSize));
- }
- if (newSize == 0) {
- key = null;
- value = null;
- more = false;
- return false;
- } else {
- return true;
- }
- }
- //下面的函数都是必须实现的,跟系统默认的差不多,我就不多说了。
- @Override
- public LongWritable getCurrentKey() {
- return key;
- }
- @Override
- public Text getCurrentValue() {
- return value;
- }
- /**
- * Get the progress within the split
- */
- public float getProgress() {
- if (more) {
- return 0.0f;
- } else {
- return 100;
- }
- }
- public synchronized void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
- }
有了,自己的MyInputFormat,就可以试试效果了。
[java] view plain copy
- Configuration conf = new Configuration(); Job job = new Job(conf, "helloword");
- job.setJarByClass(helloHadoopV1.class);
- job.setInputFormatClass(MyInputFormat.class); //这里就是调用我们自己写的InputFormat
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(Text.class);
- job.setMapperClass(HelloMapper.class);
- //job.setReducerClass(HelloReducer.class); //这里不设置Reduce就会按Mapper的中间过程原样输出。
- FileInputFormat.setInputPaths(job, new Path(args[1]));
- FileOutputFormat.setOutputPath(job, new Path(args[2]));
- checkAndDelete(args[2], conf);
- job.waitForCompletion(true);
我们上面都自定义的都比较简单,这样才容易懂。
最后再补充一句,上面RecordReader<LongWritable, Text>的<key、value>都是可以自己定义的。但key必须实现WritableComparable类,而value必须实现Writable类。
【转载】自定义InputFormat
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。