首页 > 代码库 > Hadoop 自连接

Hadoop 自连接

环境:CentOS6.6  Hadoop1.2.1

样例数据:第一列是 child ,第二列是 parent ,用空格分开,要求输出 grandchild  grandparent
[grid@hadoop1 ~]$ hadoop fs -cat ./in/genealogy.txt
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma

程序:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SelfJoinMapper extends Mapper<Text, Text, Text, Text> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, new Text("1" + key.toString())); //左表的 parent 做 key
        context.write(key, new Text("2" + value.toString())); //右表的 child 做 key
    }
}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        List<String> childList = new ArrayList<String>();
        List<String> grandList = new ArrayList<String>();
        for (Text value : values) {
            if (value.toString().startsWith("1")) {
                childList.add(value.toString().substring(1));
            } else {
                grandList.add(value.toString().substring(1));
            }
        }
        for (String child : childList) {
            for (String grand : grandList) {
                context.write(new Text(child), new Text(grand));
            }
        }
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoin {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: SelfJoin <input path> <output path>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); //设置分割符
        //conf.set("mapred.jar", "./out/SelfJoin.jar");
        //conf.set("fs.default.name", "hdfs://hadoop1:9000");
        //conf.set("mapred.job.tracker", "hadoop1:9001");

        Job job = new Job(conf);
        job.setInputFormatClass(KeyValueTextInputFormat.class); //设置 InputFormat
        job.setJarByClass(SelfJoin.class);
        job.setJobName("SelfJoin");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SelfJoinMapper.class);
        job.setReducerClass(SelfJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

查看程序输出如下:

[grid@hadoop1 ~]$ hadoop fs -cat ./out/9/part-r-00000
Tom	Alice
Tom	Jesse
Jone	Alice
Jone	Jesse
Tom	Mary
Tom	Ben
Jone	Mary
Jone	Ben
Philip	Alice
Philip	Jesse
Mark	Alice
Mark	Jesse

Hadoop 自连接