首页 > 代码库 > Hadoop 自连接
Hadoop 自连接
Hadoop自连接
实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
结果输出为:
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
代码为:
package com.hadoop.twelve;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SelfJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[]arr =line.split("\t");
if(arr.length==2){
context.write(new Text(arr[1]), new Text("0_"+arr[0]));//left表
context.write(new Text(arr[0]), new Text("1_"+arr[1]));//right表
}
}
}
package com.hadoop.twelve;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
for (Text temp : values) {
String[] arr =temp.toString().split("_");
if("0".equals(arr[0])){
grandchild.add(arr[1]);
}else if("1".equals(arr[0])){
grandparent.add(arr[1]);
}
}
//这里我也不太明白笛卡尔积就完成了???????求指点!!
for(String gc:grandchild){
for(String gp:grandparent){
context.write(new Text(gc), new Text(gp));
}
}
}
//test测试
public static void main(String[] args) {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
grandchild.add("a");
grandchild.add("b");
grandchild.add("c");
grandchild.add("d");
grandparent.add("a1");
grandparent.add("b1");
grandparent.add("c1");
grandparent.add("d1");
for(String gc:grandchild){
for(String gp:grandparent){
System.out.println(gc+"-------->"+gp);
}
}
}
}
package com.hadoop.twelve;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobMain {
/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"self_join_job");
job.setJarByClass(JobMain.class);
job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true)?0:1);
}
}
Hadoop 自连接