首页 > 代码库 > MapReduce数据连接

MapReduce数据连接

对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下

1 a
2 b
3 c
4 d

1 good
2 bad
3 ok
4 hello

须要将其连接成一个新的例如以下的文件:

a good
b bad
c ok
d hello

处理步骤能够分成两步:

1.map阶段,将两个输入文件里的数据进行打散,例如以下:

1 a
1 good
2 b
2 bad
3 c
3 ok
4 d
4 hello

2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。

package cn.zhf.hadoop;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SingleJoin extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		Tool tool = new SingleJoin();
		ToolRunner.run(tool, args);
		print(tool);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = getConf();
		Job job = new Job();
		job.setJarByClass(getClass());
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path("out"),true);
		FileInputFormat.addInputPath(job, new Path("a.txt"));
		FileInputFormat.addInputPath(job, new Path("b.txt"));
		FileOutputFormat.setOutputPath(job,new Path("out"));
		
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.waitForCompletion(true);
		return 0;
	}
	public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String[] str = value.toString().split(" ");
			context.write(new Text(str[0]), new Text(str[1]));
		}
	}

	public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			Iterator<Text> iterator = values.iterator();
			Text keyy = new Text();
			Text valuee = new Text();
			while(iterator.hasNext()){
				Text temp = iterator.next();
				if(temp.toString().length() == 1){
					keyy.set(temp);
					valuee.set(iterator.next());
				}else{
					valuee.set(temp);
					keyy.set(iterator.next());
				}
			}
			context.write(keyy, valuee);
		}
	}
	public static void print(Tool tool) throws IOException{
		FileSystem fs = FileSystem.get(tool.getConf());
		Path path = new Path("out/part-r-00000");
		FSDataInputStream fsin = fs.open(path);
		int length = 0;
		byte[] buff = new byte[128];
		while((length = fsin.read(buff,0,128)) != -1)
			System.out.println(new String(buff,0,length));
	}
}

reference:《MapReduce2.0源代码分析及编程实践》

MapReduce数据连接