首页 > 代码库 > MapReduce实现两表的Join--原理及python和java代码实现
MapReduce实现两表的Join--原理及python和java代码实现
用Hive一句话搞定的,但是有时必须要用mapreduce
方法介绍
1. 概述
在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法。
2. 常见的join方法介绍
假设要进行join的数据分别来自File1和File2.2.1 reduce side join
reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
2.2 map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
2.3 SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
更多关于半连接的介绍,可参考:半连接介绍:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
2.4 reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。
因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
更多关于BloomFilter的介绍,可参考:http://blog.csdn.net/jiaomeng/article/details/1495500
3. 二次排序
在Hadoop中,默认情况下是按照key进行排序,如果要按照value进行排序怎么办?即:对于同一个key,reduce函数接收到的value list是按照value排序的。这种应用需求在join操作中很常见,比如,希望相同的key中,小表对应的value排在前面。有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。
对于buffer and in memory sort,主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进行排序。 这种方法最大的缺点是:可能会造成out of memory。
对于value-to-key conversion,主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调用setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,用户需要自己实现Paritioner,以便只按照key进行数据划分。Hadoop显式的支持二次排序,在Configuration类中有个setGroupingComparatorClass()方法,可用于设置排序group的key值,
reduce-side-join python代码
hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:
使用原生java语言实现Map-reduce程序
- hadoop准备好数据后,将数据传送给java的map程序
- java的map程序将数据处理后,输出O1
- hadoop将O1打散、排序,然后传给不同的reduce机器
- 每个reduce机器将传来的数据传给reduce程序
- reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使用python语言实现Map-reduce程序
- hadoop准备好数据后,将数据传送给java的map程序
- java的map程序将数据处理成“键/值”对,并传送给python的map程序
- python的map程序将数据处理后,将结果传回给java的map程序
- java的map程序将数据输出为O1
- hadoop将O1打散、排序,然后传给不同的reduce机器
- 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
- python的reduce程序将数据处理后,将结果返回给java的reduce程序
- java的reduce程序将数据处理,输出最终数据O2
上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。
hadoop之实现集合join的需求
hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。
比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:
(学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)
数据事例1-学生信息:
学号sno | 姓名name |
01 | name1 |
02 | name2 |
03 | name3 |
04 | name4 |
数据事例2:-学生成绩:
学号sno | 课程号courseno | 成绩grade |
01 | 01 | 80 |
01 | 02 | 90 |
02 | 01 | 82 |
02 | 02 | 95 |
期待的最终输出:
学号sno | 姓名name | 课程courseno | 成绩grade |
01 | name1 | 01 | 80 |
01 | name1 | 02 | 90 |
02 | name2 | 01 | 82 |
02 | name2 | 02 | 95 |
实现join的注意点和易踩坑总结
如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:
- 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
- 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
- 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空
第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。
hadoop实现join操作的思路
具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。
1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;
2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区
3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出
hadoop使用python实现join的map和reduce代码
mapper.py的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | # -*- coding: utf-8 -*- #Mapper.py import os import sys #mapper脚本 def mapper(): #获取当前正在处理的文件的名字,这里我们有两个输入文件 #所以要加以区分 filepath = os.environ["map_input_file"] filename = os.path.split(filepath)[-1] for line in sys.stdin: if line.strip()=="": continue fields = line[:-1].split("\t") sno = fields[0] #以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记 if filename == ‘data_info‘: name = fields[1] #下面的数字‘0‘就是为数据源1加上的统一标记 print ‘\t‘.join((sno,‘0‘,name)) elif filename == ‘data_grade‘: courseno = fields[1] grade = fields[2] #下面的数字‘1‘就是为数据源1加上的统一标记 print ‘\t‘.join((sno,‘1‘,courseno,grade)) if __name__==‘__main__‘: mapper() |
reducer的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | # -*- coding: utf-8 -*- #reducer.py importsys defreducer(): #为了记录和上一个记录的区别,用lastsno记录上个sno lastsno="" forlineinsys.stdin: ifline.strip()=="": continue fields=line[:-1].split("\t") sno=fields[0] ‘‘‘ 处理思路: 遇见当前key与上一条key不同并且label=0,就记录下来name值, 当前key与上一条key相同并且label==1,则将本条数据的courseno、 grade联通上一条记录的name一起输出成最终结果 ‘‘‘ ifsno!=lastsno: name="" #这里没有判断label==1的情况, #因为sno!=lastno,并且label=1表示该条key没有数据源1的数据 iffields[1]=="0": name=fields[2] elifsno==lastno: #这里没有判断label==0的情况, #因为sno==lastno并且label==0表示该条key没有数据源2的数据 iffields[2]=="1": courseno=fields[2] grade=fields[3] ifname: print‘\t‘.join((lastsno,name,courseno,grade)) lastsno=sno if__name__==‘__main__‘: reducer() |
使用shell脚本启动hadoop程序的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #先删除输出目录 ~/hadoop-client/hadoop/bin/hadoopfs-rmr/hdfs/jointest/output #注意,下面配置中的环境值每个人机器不一样 ~/hadoop-client/hadoop/bin/hadoopstreaming\ -Dmapred.map.tasks=10\ -Dmapred.reduce.tasks=5\ -Dmapred.job.map.capacity=10\ -Dmapred.job.reduce.capacity=5\ -Dmapred.job.name="join--sno_name-sno_courseno_grade"\ -Dnum.key.fields.for.partition=1\ -Dstream.num.map.output.key.fields=2\ -partitionerorg.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner\ -input"/hdfs/jointest/input/*"\ -output"/hdfs/jointest/output"\ -mapper"python26/bin/python26.sh mapper.py"\ -reducer"python26/bin/python26.sh reducer.py"\ -file"mapper.py"\ -file"reducer.py"\ -cacheArchive"/share/python26.tar.gz#python26" #看看运行成功没,若输出0则表示成功了 echo$? |
可以自己手工构造输入输出数据进行测试,本程序是验证过的。
更多需要注意的地方
hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:
影响类型 | 影响的范围 |
key字段数目 | 1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置 |
3、map和reduce脚本中key的获取
4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型
1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;
M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;
M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来
reduce-side-join java代码
数据准备
- create table if not exists m_ys_lab_jointest_a (
- id bigint,
- name string
- )
- row format delimited
- fields terminated by ‘9‘
- lines terminated by ‘10‘
- stored as textfile;
id name 1 北京 2 天津 3 河北 4 山西 5 内蒙古 6 辽宁 7 吉林 8 黑龙江 |
- create table if not exists m_ys_lab_jointest_b (
- id bigint,
- statyear bigint,
- num bigint
- )
- row format delimited
- fields terminated by ‘9‘
- lines terminated by ‘10‘
- stored as textfile;
id statyear num 1 2010 1962 1 2011 2019 2 2010 1299 2 2011 1355 4 2010 3574 4 2011 3593 9 2010 2303 9 2011 2347 |
我们的目的是,以id为key做join操作,得到以下表:
id name statyear num 1 北京 2011 2019 1 北京 2010 1962 2 天津 2011 1355 2 天津 2010 1299 4 山西 2011 3593 4 山西 2010 3574 |
计算模型
代码
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Vector;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.RecordWriter;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
- /**
- * MapReduce实现Join操作
- */
- public class MapRedJoin {
- public static final String DELIMITER = "\u0009"; // 字段分隔符
- // map过程
- public static class MapClass extends MapReduceBase implements
- Mapper<LongWritable, Text, Text, Text> {
- public void configure(JobConf job) {
- super.configure(job);
- }
- public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException, ClassCastException {
- // 获取输入文件的全路径和名称
- String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
- // 获取记录字符串
- String line = value.toString();
- // 抛弃空记录
- if (line == null || line.equals("")) return;
- // 处理来自表A的记录
- if (filePath.contains("m_ys_lab_jointest_a")) {
- String[] values = line.split(DELIMITER); // 按分隔符分割出字段
- if (values.length < 2) return;
- String id = values[0]; // id
- String name = values[1]; // name
- output.collect(new Text(id), new Text("a#"+name));
- }
- // 处理来自表B的记录
- else if (filePath.contains("m_ys_lab_jointest_b")) {
- String[] values = line.split(DELIMITER); // 按分隔符分割出字段
- if (values.length < 3) return;
- String id = values[0]; // id
- String statyear = values[1]; // statyear
- String num = values[2]; //num
- output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
- }
- }
- }
- // reduce过程
- public static class Reduce extends MapReduceBase
- implements Reducer<Text, Text, Text, Text> {
- public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter)
- throws IOException {
- Vector<String> vecA = new Vector<String>(); // 存放来自表A的值
- Vector<String> vecB = new Vector<String>(); // 存放来自表B的值
- while (values.hasNext()) {
- String value = values.next().toString();
- if (value.startsWith("a#")) {
- vecA.add(value.substring(2));
- } else if (value.startsWith("b#")) {
- vecB.add(value.substring(2));
- }
- }
- int sizeA = vecA.size();
- int sizeB = vecB.size();
- // 遍历两个向量
- int i, j;
- for (i = 0; i < sizeA; i ++) {
- for (j = 0; j < sizeB; j ++) {
- output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
- }
- }
- }
- }
- protected void configJob(JobConf conf) {
- conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(Text.class);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
- conf.setOutputFormat(ReportOutFormat.class);
- }
- }
技术细节
所有方法的java代码(巨长)
从别人那转来
- package com.mr.reduceSizeJoin;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- public class CombineValues implements WritableComparable{
- //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
- private Text joinKey;//链接关键字
- private Text flag;//文件来源标志
- private Text secondPart;//除了链接键外的其他部分
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- public Text getFlag() {
- return flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public CombineValues() {
- this.joinKey = new Text();
- this.flag = new Text();
- this.secondPart = new Text();
- }
- @Override
- public void write(DataOutput out) throws IOException {
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- public int compareTo(CombineValues o) {
- return this.joinKey.compareTo(o.getJoinKey());
- }
- @Override
- public String toString() {
- // TODO Auto-generated method stub
- return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
- }
- }
- package com.mr.reduceSizeJoin;
- import java.io.IOException;
- import java.util.ArrayList;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- * 用途说明:
- * reudce side join中的left outer join
- * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
- * tb_dim_city.dat文件内容,分隔符为"|":
- * id name orderid city_code is_show
- * 0 其他 9999 9999 0
- * 1 长春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 辽源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------风骚的分割线-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件内容,分隔符为"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- *
- * -------------------------风骚的分割线-------------------------------
- * 结果:
- * 1 长春 1 901 1 1 2G 123
- * 1 长春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
- public static class LeftOutJoinMapper extends Mapper {
- private CombineValues combineValues = new CombineValues();
- private Text flag = new Text();
- private Text joinKey = new Text();
- private Text secondPart = new Text();
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //获得文件输入路径
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- //数据来自tb_dim_city.dat文件,标志即为"0"
- if(pathName.endsWith("tb_dim_city.dat")){
- String[] valueItems = value.toString().split("\\|");
- //过滤格式错误的记录
- if(valueItems.length != 5){
- return;
- }
- flag.set("0");
- joinKey.set(valueItems[0]);
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }//数据来自于tb_user_profiles.dat,标志即为"1"
- else if(pathName.endsWith("tb_user_profiles.dat")){
- String[] valueItems = value.toString().split("\\|");
- //过滤格式错误的记录
- if(valueItems.length != 4){
- return;
- }
- flag.set("1");
- joinKey.set(valueItems[3]);
- secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }
- }
- }
- public static class LeftOutJoinReducer extends Reducer {
- //存储一个分组中的左表信息
- private ArrayList leftTable = new ArrayList();
- //存储一个分组中的右表信息
- private ArrayList rightTable = new ArrayList();
- private Text secondPar = null;
- private Text output = new Text();
- /**
- * 一个分组调用一次reduce函数
- */
- @Override
- protected void reduce(Text key, Iterable value, Context context)
- throws IOException, InterruptedException {
- leftTable.clear();
- rightTable.clear();
- /**
- * 将分组中的元素按照文件分别进行存放
- * 这种方法要注意的问题:
- * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
- * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
- * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
- */
- for(CombineValues cv : value){
- secondPar = new Text(cv.getSecondPart().toString());
- //左表tb_dim_city
- if("0".equals(cv.getFlag().toString().trim())){
- leftTable.add(secondPar);
- }
- //右表tb_user_profiles
- else if("1".equals(cv.getFlag().toString().trim())){
- rightTable.add(secondPar);
- }
- }
- logger.info("tb_dim_city:"+leftTable.toString());
- logger.info("tb_user_profiles:"+rightTable.toString());
- for(Text leftPart : leftTable){
- for(Text rightPart : rightTable){
- output.set(leftPart+ "\t" + rightPart);
- context.write(key, output);
- }
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //获得配置文件对象
- Job job=new Job(conf,"LeftOutJoinMR");
- job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
- FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
- FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
- job.setMapperClass(LeftOutJoinMapper.class);
- job.setReducerClass(LeftOutJoinReducer.class);
- job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式
- //设置map的输出key和value类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineValues.class);
- //设置reduce的输出key和value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage());
- }
- }
- }
2、在Map端进行连接。
- package com.mr.mapSideJoin;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.HashMap;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- *
- * 用途说明:
- * Map side join中的left outer join
- * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
- * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":
- * id name orderid city_code is_show
- * 0 其他 9999 9999 0
- * 1 长春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 辽源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------风骚的分割线-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件内容,分隔符为"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- * -------------------------风骚的分割线-------------------------------
- * 结果:
- * 1 长春 1 901 1 1 2G 123
- * 1 长春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class MapSideJoinMain extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
- public static class LeftOutJoinMapper extends Mapper {
- private HashMap city_info = new HashMap();
- private Text outPutKey = new Text();
- private Text outPutValue = http://www.mamicode.com/new Text();
- private String mapInputStr = null;
- private String mapInputSpit[] = null;
- private String city_secondPart = null;
- /**
- * 此方法在每个task开始之前执行,这里主要用作从DistributedCache
- * 中取到tb_dim_city文件,并将里边记录取出放到内存中。
- */
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- BufferedReader br = null;
- //获得当前作业的DistributedCache相关文件
- Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- String cityInfo = null;
- for(Path p : distributePaths){
- if(p.toString().endsWith("tb_dim_city.dat")){
- //读缓存文件,并放到mem中
- br = new BufferedReader(new FileReader(p.toString()));
- while(null!=(cityInfo=br.readLine())){
- String[] cityPart = cityInfo.split("\\|",5);
- if(cityPart.length ==5){
- city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
- }
- }
- }
- }
- }
- /**
- * Map端的实现相当简单,直接判断tb_user_profiles.dat中的
- * cityID是否存在我的map中就ok了,这样就可以实现Map Join了
- */
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //排掉空行
- if(value =http://www.mamicode.com/= null || value.toString().equals("")){
- return;
- }
- mapInputStr = value.toString();
- mapInputSpit = mapInputStr.split("\\|",4);
- //过滤非法记录
- if(mapInputSpit.length != 4){
- return;
- }
- //判断链接字段是否在map中存在
- city_secondPart = city_info.get(mapInputSpit[3]);
- if(city_secondPart != null){
- this.outPutKey.set(mapInputSpit[3]);
- this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
- context.write(outPutKey, outPutValue);
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //获得配置文件对象
- DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件
- Job job=new Job(conf,"MapJoinMR");
- job.setNumReduceTasks(0);
- FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
- FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径
- job.setJarByClass(MapSideJoinMain.class);
- job.setMapperClass(LeftOutJoinMapper.class);
- job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
- //设置map的输出key和value类型
- job.setMapOutputKeyClass(Text.class);
- //设置reduce的输出key和value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new MapSideJoinMain(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage());
- }
- }
- }
3、SemiJoin。
- package com.mr.SemiJoin;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashSet;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- *
- * 用途说明:
- * reudce side join中的left outer join
- * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
- * tb_dim_city.dat文件内容,分隔符为"|":
- * id name orderid city_code is_show
- * 0 其他 9999 9999 0
- * 1 长春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 辽源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------风骚的分割线-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件内容,分隔符为"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- * -------------------------风骚的分割线-------------------------------
- * joinKey.dat内容:
- * city_code
- * 1
- * 2
- * 3
- * 4
- * -------------------------风骚的分割线-------------------------------
- * 结果:
- * 1 长春 1 901 1 1 2G 123
- * 1 长春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class SemiJoin extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);
- public static class SemiJoinMapper extends Mapper {
- private CombineValues combineValues = new CombineValues();
- private HashSet joinKeySet = new HashSet();
- private Text flag = new Text();
- private Text joinKey = new Text();
- private Text secondPart = new Text();
- /**
- * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b
- */
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- BufferedReader br = null;
- //获得当前作业的DistributedCache相关文件
- Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- String joinKeyStr = null;
- for(Path p : distributePaths){
- if(p.toString().endsWith("joinKey.dat")){
- //读缓存文件,并放到mem中
- br = new BufferedReader(new FileReader(p.toString()));
- while(null!=(joinKeyStr=br.readLine())){
- joinKeySet.add(joinKeyStr);
- }
- }
- }
- }
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //获得文件输入路径
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- //数据来自tb_dim_city.dat文件,标志即为"0"
- if(pathName.endsWith("tb_dim_city.dat")){
- String[] valueItems = value.toString().split("\\|");
- //过滤格式错误的记录
- if(valueItems.length != 5){
- return;
- }
- //过滤掉不需要参加join的记录
- if(joinKeySet.contains(valueItems[0])){
- flag.set("0");
- joinKey.set(valueItems[0]);
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }else{
- return ;
- }
- }//数据来自于tb_user_profiles.dat,标志即为"1"
- else if(pathName.endsWith("tb_user_profiles.dat")){
- String[] valueItems = value.toString().split("\\|");
- //过滤格式错误的记录
- if(valueItems.length != 4){
- return;
- }
- //过滤掉不需要参加join的记录
- if(joinKeySet.contains(valueItems[3])){
- flag.set("1");
- joinKey.set(valueItems[3]);
- secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }else{
- return ;
- }
- }
- }
- }
- public static class SemiJoinReducer extends Reducer {
- //存储一个分组中的左表信息
- private ArrayList leftTable = new ArrayList();
- //存储一个分组中的右表信息
- private ArrayList rightTable = new ArrayList();
- private Text secondPar = null;
- private Text output = new Text();
- /**
- * 一个分组调用一次reduce函数
- */
- @Override
- protected void reduce(Text key, Iterable value, Context context)
- throws IOException, InterruptedException {
- leftTable.clear();
- rightTable.clear();
- /**
- * 将分组中的元素按照文件分别进行存放
- * 这种方法要注意的问题:
- * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
- * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
- * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
- */
- for(CombineValues cv : value){
- secondPar = new Text(cv.getSecondPart().toString());
- //左表tb_dim_city
- if("0".equals(cv.getFlag().toString().trim())){
- leftTable.add(secondPar);
- }
- //右表tb_user_profiles
- else if("1".equals(cv.getFlag().toString().trim())){
- rightTable.add(secondPar);
- }
- }
- logger.info("tb_dim_city:"+leftTable.toString());
- logger.info("tb_user_profiles:"+rightTable.toString());
- for(Text leftPart : leftTable){
- for(Text rightPart : rightTable){
- output.set(leftPart+ "\t" + rightPart);
- context.write(key, output);
- }
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //获得配置文件对象
- DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
- Job job=new Job(conf,"LeftOutJoinMR");
- job.setJarByClass(SemiJoin.class);
- FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
- FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
- job.setMapperClass(SemiJoinMapper.class);
- job.setReducerClass(SemiJoinReducer.class);
- job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
- //设置map的输出key和value类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineValues.class);
- //设置reduce的输出key和value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new SemiJoin(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
总结
MapReduce实现两表的Join--原理及python和java代码实现