首页 > 代码库 > Hadoop 自连接
Hadoop 自连接
环境:CentOS6.6 Hadoop1.2.1
样例数据:第一列是 child ,第二列是 parent ,用空格分开,要求输出 grandchild grandparent
样例数据:第一列是 child ,第二列是 parent ,用空格分开,要求输出 grandchild grandparent
[grid@hadoop1 ~]$ hadoop fs -cat ./in/genealogy.txt Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma
程序:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SelfJoinMapper extends Mapper<Text, Text, Text, Text> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(value, new Text("1" + key.toString())); //左表的 parent 做 key context.write(key, new Text("2" + value.toString())); //右表的 child 做 key } } import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> childList = new ArrayList<String>(); List<String> grandList = new ArrayList<String>(); for (Text value : values) { if (value.toString().startsWith("1")) { childList.add(value.toString().substring(1)); } else { grandList.add(value.toString().substring(1)); } } for (String child : childList) { for (String grand : grandList) { context.write(new Text(child), new Text(grand)); } } } } import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SelfJoin { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: SelfJoin <input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); //设置分割符 //conf.set("mapred.jar", "./out/SelfJoin.jar"); //conf.set("fs.default.name", "hdfs://hadoop1:9000"); //conf.set("mapred.job.tracker", "hadoop1:9001"); Job job = new Job(conf); job.setInputFormatClass(KeyValueTextInputFormat.class); //设置 InputFormat job.setJarByClass(SelfJoin.class); job.setJobName("SelfJoin"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(SelfJoinMapper.class); job.setReducerClass(SelfJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
查看程序输出如下:
[grid@hadoop1 ~]$ hadoop fs -cat ./out/9/part-r-00000 Tom Alice Tom Jesse Jone Alice Jone Jesse Tom Mary Tom Ben Jone Mary Jone Ben Philip Alice Philip Jesse Mark Alice Mark Jesse
Hadoop 自连接
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。