首页 > 代码库 > MR中使用sequnceFIle输入文件
MR中使用sequnceFIle输入文件
转换原始数据为块压缩的SequenceFIle
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.hadoop.compression.lzo.LzoCodec;public class ToSeqFile extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Job job = new Job(); job.setJarByClass(getClass()); Configuration conf=getConf(); FileSystem fs = FileSystem.get(conf); FileInputFormat.setInputPaths(job, "/home/hadoop/tmp/tmplzo.txt"); Path outDir=new Path("/home/hadoop/tmp/tmplzo.out"); fs.delete(outDir,true); FileOutputFormat.setOutputPath(job, outDir); //job.setMapperClass(IndentityMapper); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); //设置OutputFormat为SequenceFileOutputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); //允许压缩 SequenceFileOutputFormat.setCompressOutput(job, true); //压缩算法为gzip SequenceFileOutputFormat.setOutputCompressorClass(job, LzoCodec.class); //压缩模式为BLOCK SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new ToSeqFile(), args); System.exit(res); }}
MR处理压缩后的sequenceFile
import org.apache.hadoop.io.Text;import java.io.File;import java.io.IOException;import java.net.URI;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.*;import org.apache.hadoop.mapreduce.ContextFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Progressable;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;//import org.apache.hadoop.mapred.DeprecatedLzoTextInputFormat;import com.hadoop.compression.lzo.LzoCodec;import com.hadoop.mapreduce.LzoTextInputFormat;public class compress extends Configured implements Tool { private static final Log log = LogFactory.getLog(compress.class); private static class ProvinceMapper extends Mapper<Object, Text, Text, Text> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //System.out.println(value); // InputSplit inputSplit = context.getInputSplit(); //String fileName = ((FileSplit) inputSplit).getPath().toString(); //System.out.println(fileName); context.write(value, value); } } private static class ProvinceReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text va : values) { // System.out.println("reduce " + key); context.write(key, key); } } } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new compress(), args); } public static final String REDUCES_PER_HOST = "mapreduce.sort.reducesperhost"; @Override public int run(String[] args) throws Exception { log.info("我的服务查询开始....................................."); long beg = System.currentTimeMillis(); int result = 0; Configuration conf = new Configuration(); conf.set( "io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec"); conf.set("io.compression.codec.lzo.class", "com.hadoop.compression.lzo.LzoCodec"); conf.setBoolean("mapreduce.map.output.compress", true); conf.setClass("mapreduce.map.output.compression.codec", SnappyCodec.class, CompressionCodec.class); // conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); // 是否压缩输出 conf.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class, CompressionCodec.class); String[] argArray = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (argArray.length != 2) { System.err.println("Usage: compress <in> <out>"); System.exit(1); } // Hadoop总共有5个Job.java // /hadoop-2.0.0-cdh4.5.0/src/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Job job = new Job(conf, "compress"); job.setJarByClass(compress.class); job.setMapperClass(ProvinceMapper.class); job.setReducerClass(ProvinceReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //job.setInputFormatClass(LzoTextInputFormat.class); // TextInputFormat // MyFileinput // 使用lzo索引文件作为输入文件 // job.setInputFormatClass(LzoTextInputFormat.class); job.setInputFormatClass(SequenceFileInputFormat.class); // SequenceFileOutputFormat.set(job, LzoCodec.class); // 测试块大小 // FileInputFormat.setMinInputSplitSize(job, 150*1024*1024); // FileInputFormat.setMinInputSplitSize(job, 301349250); // FileInputFormat.setMaxInputSplitSize(job, 10000); // 推测执行的开关 另外还有针对map和reduce的对应开关 // job.setSpeculativeExecution(false); FileInputFormat.addInputPath(job, new Path(argArray[0])); FileOutputFormat.setOutputPath(job, new Path(argArray[1])); String uri = argArray[1]; Path path = new Path(uri); FileSystem fs = FileSystem.get(URI.create(uri), conf); if (fs.exists(path)) { fs.delete(path); } result = job.waitForCompletion(true) ? 0 : 1; // try {// result = job.waitForCompletion(true) ? 0 : 1;// } catch (ClassNotFoundException | InterruptedException e) {// e.printStackTrace();// } long end = (System.currentTimeMillis() -beg) ; System.out.println("耗时:" + end); return result; }}
测试结果
文件大小 544M(未使用任何压缩)
耗时:73805
使用 seqencefile(block使用lzo压缩, 中间结果使用snappy压缩)
44207s
MR中使用sequnceFIle输入文件
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。