首页 > 代码库 > 在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数
在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数
近期開始使用MapReduce,发现网上大部分样例都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理就可以。对于文本数据处理,这个类还是能满足一部分应用场景。可是假设要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。
本文以一个简单的应用场景为例:对依照二进制格式存储的整数做频数统计。当然,也能够在此基础上实现排序之类的其它应用。实现该应用的主要难点就是怎样处理输入数据。參考《权威指南·第三版》得知须要继承FileInputFormat这个类,并实现下面三个方法:
class MyInputFormat extends FileInputFormat<Type1, Type2> { /* * 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块 */ protected boolean isSplitable(Configuration conf, Path path) { } /* * MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。 * 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中 * 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。 * */ public List<InputSplit> getSplits(Configuration conf) throws IOException { } /* * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context). * 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑: * public void run(Context context) throws IOException, InterruptedException { * setup(context); * 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数 * while (context.nextKeyValue()) { * map(context.getCurrentKey(), context.getCurrentValue(), context); * } * cleanup(context); * } **/ public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } }
在RecordReader函数中实现下面几个接口:
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> { /*关闭文件流 * */ public void close() {} /* * 获取处理进度 **/ public float getProgress() {} /* * 获取当前的Key * */ public LongWritable getCurrentKey() throws IOException, InterruptedException {} /* 获取当前的Value * */ public IntWritable getCurrentValue() throws IOException,InterruptedException {} /* * 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等 * */ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {} /*生成下一个键值对 **/ public boolean nextKeyValue() throws IOException, InterruptedException { } }
下面为是三个文件的代码,首先是BinInputFormat.java的代码:
package org.apache.hadoop.examples; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.examples.BinRecordReader; class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> { private static final double SPLIT_SLOP=1.1; /* * 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块 */ protected boolean isSplitable(Configuration conf, Path path) { return true; } /* * MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。 * 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中 * 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。 * */ public List<InputSplit> getSplits(Configuration conf) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); long minSplitSize = conf.getLong("mapred.min.split.size",1); long maxSplitSize = conf.getLong("mapred.max.split.size", 1); long blockSize = conf.getLong("dfs.block.size",1); long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize)); FileSystem fs = FileSystem.get(conf); String path = conf.get(INPUT_DIR); FileStatus[] files = fs.listStatus(new Path(path)); for (int fileIndex = 0; fileIndex < files.length; fileIndex++) { FileStatus file = files[fileIndex]; System.out.println("input file: " + file.getPath().toString()); long length = file.getLen(); FileSystem fsin = file.getPath().getFileSystem(conf); BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(conf, file.getPath())) { long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(file.getPath(), 0, length, new String[0])); } } return splits; } /* * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context). * 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑: * public void run(Context context) throws IOException, InterruptedException { * setup(context); * 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数 * while (context.nextKeyValue()) { * map(context.getCurrentKey(), context.getCurrentValue(), context); * } * cleanup(context); * } **/ public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub BinRecordReader reader = new BinRecordReader(); reader.initialize(split,context); return reader; } }
下面为BinRecordReader.java的代码:
package org.apache.hadoop.examples; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordReader; /** * Return a single record (filename, "") where the filename is taken from * the file split. */ public class BinRecordReader extends RecordReader<LongWritable, IntWritable> { private FSDataInputStream inputStream = null; private long start,end,pos; private Configuration conf = null; private FileSplit fileSplit = null; private LongWritable key = new LongWritable(); private IntWritable value = http://www.mamicode.com/new IntWritable();>
下面是主文件BinCount.java的代码package org.apache.hadoop.examples; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.examples.BinInputFormat; public class IntCount { public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text intNum = new Text(); public void map(LongWritable key, IntWritable value, Context context ) throws IOException, InterruptedException { intNum.set(Integer.toString(value.get())); context.write(intNum, one); } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { System.out.println("testing1"); Configuration conf = new Configuration(); String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"}; String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "IntCount"); job.setJarByClass(IntCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); //设置自己定义的输入类 job.setInputFormatClass(BinInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }接着我们用一段C语言生成二进制格式存储的文件,C语言代码例如以下:#include<stdio.h> int main(){ FILE * fp = fopen("tmpfile","wb"); int i,j; for(i=0;i<10;i++) { for(j=0;j<10;j++) fwrite(&j,sizeof(int),1,fp); } fclose(fp); return 0; }
将生成的文件复制到/read/下,接着启动IntCount这个MapReduce程序,打开执行结果:
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。