首页 > 代码库 > Hadoop 多表连接

Hadoop 多表连接

环境:CentOS6.6  Hadoop1.2.1

样例数据:
[grid@hadoop1 ~]$ hadoop fs -cat ./in/7/dept.txt
        10 ACCOUNTING     NEW YORK
        20 RESEARCH       DALLAS
        30 SALES          CHICAGO
        40 OPERATIONS     BOSTON
[grid@hadoop1 ~]$ hadoop fs -cat ./in/7/emp.txt
      7369 SMITH      CLERK           7902 17-12月-80            800                    20
      7499 ALLEN      SALESMAN        7698 20-2月 -81           1600        300         30
      7521 WARD       SALESMAN        7698 22-2月 -81           1250        500         30
      7566 JONES      MANAGER         7839 02-4月 -81           2975                    20
      7654 MARTIN     SALESMAN        7698 28-9月 -81           1250       1400         30
      7698 BLAKE      MANAGER         7839 01-5月 -81           2850                    30
      7782 CLARK      MANAGER         7839 09-6月 -81           2450                    10
      7839 KING       PRESIDENT            17-11月-81           5000                    10
      7844 TURNER     SALESMAN        7698 08-9月 -81           1500          0         30
      7900 JAMES      CLERK           7698 03-12月-81            950                    30
      7902 FORD       ANALYST         7566 03-12月-81           3000                    20
      7934 MILLER     CLERK           7782 23-1月 -82           1300                    10

dept 文件第一列是部门编号,第二列是部门名称,第三列是部门所在城市
emp 文件第六列是员工工资,第八列是员工所在部门编号
要求输出:部门名称  员工数  平均工资

多表连接与单表自连接很类似,都是将关联列做 key ,另一列做 value 保存,并在 value 中添加额外信息来区分左右表
程序:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
	@Override
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.length() > 80) { //判断是左表还是右表
			String deptno = line.substring(87, 89);
			String sal = line.substring(62, 67).trim();
			context.write(new Text(deptno), new Text("A" + sal));
		} else {
			String deptno = line.substring(8, 10);
			String dname = line.substring(11, 25).trim();
			context.write(new Text(deptno), new Text("B" + dname));
		}
	}
}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class JoinReducer extends Reducer<Text, Text, Text, Text> {
	@Override
	public void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		int total = 0, count = 0;
		String dname = null;
		for (Text value : values) {
			if (value.toString().startsWith("A")) {
				total += Integer.parseInt(value.toString().substring(1));
				count++;
			} else {
				dname = value.toString().substring(1);
			}
		}
		String avg = (count == 0 ? "--" : ("" + total / count));
		context.write(new Text(dname), new Text(count + "	" + avg));
	}
}

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class Join {
	public static void main(String[] args) throws Exception {
		if (args.length != 3) {
			System.err.println("Usage: One <input path> <input path> <output path>");
			System.exit(-1);
		}
		Job job = new Job();
		job.setJarByClass(Join.class);
		job.setJobName("Join");
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileInputFormat.addInputPath(job, new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

查看输出:

[grid@hadoop1 ~]$ hadoop fs -cat ./out/7/part-r-00000
ACCOUNTING	3	2916
RESEARCH	3	2258
SALES	6	1566
OPERATIONS	0	--


Hadoop 多表连接