首页 > 代码库 > MR中使用sequnceFIle输入文件

MR中使用sequnceFIle输入文件

转换原始数据为块压缩的SequenceFIle

 

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.hadoop.compression.lzo.LzoCodec;public class ToSeqFile extends Configured implements Tool {    @Override    public int run(String[] arg0) throws Exception {        Job job = new Job();        job.setJarByClass(getClass());        Configuration conf=getConf();        FileSystem fs = FileSystem.get(conf);            FileInputFormat.setInputPaths(job, "/home/hadoop/tmp/tmplzo.txt");        Path outDir=new Path("/home/hadoop/tmp/tmplzo.out");        fs.delete(outDir,true);        FileOutputFormat.setOutputPath(job, outDir);                //job.setMapperClass(IndentityMapper);        job.setNumReduceTasks(0);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(Text.class);        //设置OutputFormat为SequenceFileOutputFormat        job.setOutputFormatClass(SequenceFileOutputFormat.class);        //允许压缩         SequenceFileOutputFormat.setCompressOutput(job, true);         //压缩算法为gzip         SequenceFileOutputFormat.setOutputCompressorClass(job, LzoCodec.class);        //压缩模式为BLOCK         SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);        return job.waitForCompletion(true)?0:1;    }    public static void main(String[] args) throws Exception {        int res = ToolRunner.run(new Configuration(), new ToSeqFile(), args);        System.exit(res);    }}

 

MR处理压缩后的sequenceFile

import org.apache.hadoop.io.Text;import java.io.File;import java.io.IOException;import java.net.URI;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.*;import org.apache.hadoop.mapreduce.ContextFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Progressable;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;//import org.apache.hadoop.mapred.DeprecatedLzoTextInputFormat;import com.hadoop.compression.lzo.LzoCodec;import com.hadoop.mapreduce.LzoTextInputFormat;public class compress extends Configured implements Tool {	private static final Log log = LogFactory.getLog(compress.class);	private static class ProvinceMapper extends			Mapper<Object, Text, Text, Text> {		@Override		protected void map(Object key, Text value, Context context)				throws IOException, InterruptedException {			//System.out.println(value);						// InputSplit inputSplit = context.getInputSplit();			//String fileName = ((FileSplit) inputSplit).getPath().toString();						//System.out.println(fileName);			context.write(value, value);		}	}	private static class ProvinceReducer extends			Reducer<Text, Text, Text, Text> {		@Override		protected void reduce(Text key, Iterable<Text> values, Context context)				throws IOException, InterruptedException {			for (Text va : values) {				// System.out.println("reduce " + key);				context.write(key, key);			}		}	}	public static void main(String[] args) throws Exception {		ToolRunner.run(new Configuration(), new compress(), args);	}	public static final String REDUCES_PER_HOST = "mapreduce.sort.reducesperhost";	@Override	public int run(String[] args) throws Exception {		log.info("我的服务查询开始.....................................");        		long beg = System.currentTimeMillis();		int result = 0;		Configuration conf = new Configuration();		conf.set(				"io.compression.codecs",				"org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");		conf.set("io.compression.codec.lzo.class",				"com.hadoop.compression.lzo.LzoCodec");						conf.setBoolean("mapreduce.map.output.compress", true);	    conf.setClass("mapreduce.map.output.compression.codec", SnappyCodec.class, CompressionCodec.class);	   // conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); // 是否压缩输出	    conf.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class, CompressionCodec.class);		String[] argArray = new GenericOptionsParser(conf, args)				.getRemainingArgs();		if (argArray.length != 2) {			System.err.println("Usage: compress <in> <out>");			System.exit(1);		}		// Hadoop总共有5个Job.java		// /hadoop-2.0.0-cdh4.5.0/src/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java		Job job = new Job(conf, "compress");		job.setJarByClass(compress.class);		job.setMapperClass(ProvinceMapper.class);		job.setReducerClass(ProvinceReducer.class);		job.setMapOutputKeyClass(Text.class);		job.setMapOutputValueClass(Text.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(Text.class);		 //job.setInputFormatClass(LzoTextInputFormat.class); // TextInputFormat		// MyFileinput				// 使用lzo索引文件作为输入文件		// job.setInputFormatClass(LzoTextInputFormat.class);		job.setInputFormatClass(SequenceFileInputFormat.class);				// SequenceFileOutputFormat.set(job, LzoCodec.class);				// 测试块大小		// FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);		// FileInputFormat.setMinInputSplitSize(job, 301349250);		// FileInputFormat.setMaxInputSplitSize(job, 10000);		// 推测执行的开关 另外还有针对map和reduce的对应开关		// job.setSpeculativeExecution(false);		FileInputFormat.addInputPath(job, new Path(argArray[0]));		FileOutputFormat.setOutputPath(job, new Path(argArray[1]));		String uri = argArray[1];		Path path = new Path(uri);		FileSystem fs = FileSystem.get(URI.create(uri), conf);		if (fs.exists(path)) {			fs.delete(path);		}		result = job.waitForCompletion(true) ? 0 : 1;		//		try {//			result = job.waitForCompletion(true) ? 0 : 1;//		} catch (ClassNotFoundException | InterruptedException e) {//			e.printStackTrace();//		}		long end = (System.currentTimeMillis() -beg) ;        System.out.println("耗时:" + end);		return result;	}}

 

测试结果

文件大小 544M(未使用任何压缩)
耗时:73805

使用 seqencefile(block使用lzo压缩, 中间结果使用snappy压缩)

44207s

 

MR中使用sequnceFIle输入文件