首页 > 代码库 > Hadoop 自连接

Hadoop 自连接

                                           Hadoop自连接

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。

    child        parent 


Tom        Lucy


Tom        Jack 


Jone        Lucy


Jone        Jack


Lucy        Mary


Lucy        Ben


Jack        Alice


Jack        Jesse


Terry        Alice


Terry        Jesse


Philip        Terry


Philip        Alma


Mark        Terry


Mark        Alma



结果输出为:

Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse


代码为:


package com.hadoop.twelve;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class SelfJoinMapper extends Mapper<LongWritable, Text, Text, Text> {


@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[]arr =line.split("\t");
if(arr.length==2){
context.write(new Text(arr[1]), new Text("0_"+arr[0]));//left表
context.write(new Text(arr[0]), new Text("1_"+arr[1]));//right表
}
}


}



package com.hadoop.twelve;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {


@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
for (Text temp : values) {
String[] arr =temp.toString().split("_");
if("0".equals(arr[0])){
grandchild.add(arr[1]);
}else if("1".equals(arr[0])){
grandparent.add(arr[1]);
}
}

              //这里我也不太明白笛卡尔积就完成了???????求指点!!
for(String gc:grandchild){
for(String gp:grandparent){
context.write(new Text(gc), new Text(gp));
}
}


}

    //test测试
public static void main(String[] args) {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
grandchild.add("a");
grandchild.add("b");
grandchild.add("c");
grandchild.add("d");

grandparent.add("a1");
grandparent.add("b1");
grandparent.add("c1");
grandparent.add("d1");

for(String gc:grandchild){
for(String gp:grandparent){
System.out.println(gc+"-------->"+gp);
}
}
}

}


package com.hadoop.twelve;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"self_join_job");
job.setJarByClass(JobMain.class);

job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);


job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);


}


}


Hadoop 自连接