首页 > 代码库 > Hadoop MapReduce纵表转横表

Hadoop MapReduce纵表转横表

输入数据如下:以\t分隔

0-3岁育儿百科 书        23
0-5v液位传感器  5
0-5轴承 2
0-6个月奶粉     23
0-6个月奶粉c2c报告      23
0-6个月奶粉在线购物排名 23
0-6个月奶粉市场前景     23
0-6个月配方奶粉 23
0.001g电子天平  5
0.01t化铝炉     2
0.01吨熔铝合金炉        2
0.03吨化镁炉    25
0.03吨电磁炉    11
其中左侧是搜索词,右侧是类别,可看成是数据库中的纵表,现需要将输入转成横表,即 类名\t语句1\t语句2...,这样的格式。

MapReduce最适合做这样的事情了。因为经常用到,记录一下。Hive表中的数据要转成横表的时候,单独写个MR来处理就很方便了。

package seg;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author zhf 
 * @email zhf.thu@gmail.com
 * @version 创建时间:2014年8月24日 上午9:56:45
 */
public class Vertical2Horizontal extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new Vertical2Horizontal(), args);
		System.exit(exitCode);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		String[] args = new GenericOptionsParser(arg0).getRemainingArgs();
		if(args.length != 2){
			System.out.println("Usage:seg.Horizontal2Vertical <input> <output>");
			System.exit(1);
		}
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1])))
			fs.delete(new Path(args[1]),true);
		Job job = new Job(conf);
		job.setJarByClass(getClass());
		job.setMapperClass(HVMapper.class);
		job.setReducerClass(HVReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		return job.waitForCompletion(true) ? 0:1;
	}

	public static class HVMapper extends Mapper<LongWritable,Text,Text,Text>{
		private Text text = new Text();
		private Text clazz = new Text();
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String line = value.toString();
			String params[] = line.split("\t");
			text.set(params[0]);
			clazz.set(params[1]);
			context.write(clazz,text);
		}
	}
	
	public static class HVReducer extends Reducer<Text,Text,Text,Text>{
		private Text result = new Text();
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			String tmp = "";
			for(Text val : values){
				tmp += val + "\t";
			}
			result.set(tmp.trim());
			context.write(key, result);
		}
	}

}

输出:

1       莱舍万 服装美学 莱芜劳保服装    南京羽绒服特卖会        螃蟹的秘密品牌内衣店    螃蟹的秘密内衣专卖店




Hadoop MapReduce纵表转横表