首页 > 代码库 > MapReduce 实现数据join操作

MapReduce 实现数据join操作

前段时间有一个业务需求,要在外网商品(TOPB2C)信息中加入 联营自营 识别的字段。但存在的一个问题是,商品信息  自营联营标示数据是 两份数据;商品信息较大,是存放在hbase中。他们之前唯一的关联是url。所以考虑用urlkey将两者做join,将 联营自营标识 信息加入的商品信息中,最终生成我需要的数据;

 

一,首先展示一下两份数据的demo example

1. 自营联营标识数据(下面开始就叫做unionseller.txt

http://cn.abc.www/product43675,1

http://cn.abc.www/product43710,0

 

(是两列的数据,其中第一列是url也即是join操作要使用的key,后面的0,1是表示自营联营标识,0表示自营,1表示联营)

 

2. 商品信息数据(下面开始叫做shangpin_hbase.txt

ROWKEY=http://cn.abc.www/product43675^_

 

image:image_url@1375896006814=http://*/*.jpg^_

image:is_default_image@1375897937280=0^_

image:thumbnail@1375897937280=**^_

parser:attribute@1375896006814={

......

...... 

uri:url@1375896006814=http://www.baby.com.cn/product-35520^_

^^

(上面是一条商品数据的样例,使用的是spider团队提供的一个jar包从hbase中导出来的,但也是比较标准的模式,可以使用hbase中HBaseDocInputFormat的类进行读取,其中貌似笑脸的那些field separator和line separator实际上是不可见字符,这里主要是为了展示而用可见字符代替;处于隐私原因,将一些内容用*代替)

 

由于商品数据是在hbase里面的,所以首先将商品数据从hbase中dump出来,通过rowkey来访问hbase大致有两种方式:

1. 通过单个row key访问:即按照某个row key键值进行get操作;

2. 通过row key的range进行scan;在范围内进行扫描;

一般来说,scan的方式会快一些。由于商品数据集的数量较为庞大(会有2000多万条url的商品及属性),并且商品是以url(host反转的方式,例如http://cn.abc.www/product43675)作为rowkey进行存储的。所以采用scan的方式从hbase中取出上商品数据集。

 

接下来获取自营联营识别数据,由于自营联营标识在hbase中是没有的,是spider团队在进行商品数据dump的时候计算出来的,所以直接用python写个streaming程序从dump数据中取得url字段和自营联营标识就可以了。较为简单,略过,本文主要来讲解join操作和hbase字段解析的过程。

 

二,现在我们手头上有了shangpin_hbase.txt以及unionseller.txt下面就可以考虑实现join操作的mapreduce了。

用mapreduce实现join比较常见的有两种方法,map端join和reduce端join

1. Mapjoin所针对的场景是,两个要进行join的数据集中,其中一个非常大,另一个非常小,以至于我们可以将小的数据集放到内存中。这样我们可以将小数据集复制多份,每个运行map task的内存中都存在一份(可以使用hash table),这样map task可以只扫描大的数据,对于大数据中的每一条记录,去小数据中找到相应的key的数据,然后连接输出;如果想让小数据集复制到每个map task中,可以使用mapreduce提供的Distributed Cache机制。

?2. Reduce端join是一种比较简单和容易想到的方式,适合的环境也更为普遍。基本思路是在mapper中为每一个记录打上标记,并且使用连接键作为map输出键,使键相同的记录能够被分到同一个reducer中。

??Reduce端join比较简单,但缺点是两个数据集都要经过mapreduce的shuffle过程(里面涉及到写磁盘,归并排序,还有网络传输等)。所以reduce端的join操作,效率往往低些。本文采用简单一些的reduce端join的实现。

 

?1. 首先是mapper编写,由于join操作至少有两份数据集,所以常需要使用mapReduce的MultipleInputs.addInputPath()来添加多个输入文件的路径。也需要两个mapper类来实现对不同数据集的处理,首先来看一下处理unionseller.txt的mapper程序:

?alt

?由于unionseller.txt的每一行是以逗号分割的两列,所以只要用逗号将两者分隔开就好,得到作为连接键的urllst[0]和自营联营识别标识项lst[1]Map的的输出keyTextPair类型,输出valuePut类型。这两个重点说一下:

?(1)TextPair类型

?map的输出即reduce的输入,有相同key的数据会被运送到同一个reduce来处理。为了在reduce端能够区分unionseller.txtshangpin_hbase.txt类型,在TextPair中增加了一个识别字段,unionseller.txt “0”,即new TextPair(lst[0],”0”);而shangpin_hbase.txt字段是”1”,这样在reduce端就可以有依据来区分数据的类型。

?TextPair是一个自定义的数据类型,其包含两个成员变量,firstsecond,两个成员变量都是Hadoop自带的Text类型。自己实现自定义的Key是要有些限制的,原因是:

(aMapper会根据key进行hash操作来决定记录被分配到哪一个reducer中去;

(b)在MapReduce内部机制中,在Mapper以及Reducer阶段都涉及到将数据写到磁盘的IO操作(spill阶段)和根据key对数据进行排序的操作。所以这就要求Mapper中的key是可以比较的,并且keyvalue是都是可以序列化的。Hadoop对自带的数据类型(Text,IntWritable等)都实现了序列化方法和内置的比较方法,但是对于自定义类型,就必须自己去实现相应的接口,并且重写方法。TextPair代码如下:

alt

altalt

由于key需要进行比较并且需要序列化,所以实现了WritableComparable接口,其中compareTo()是实现了比较的方法,而readFieldswrite方法则是用于序列化的,告诉hadoop怎样读和写自定义的数据结构。这里解决了对key比较和序列化的问题,那么mapper对数据进行分发(决定到将记录发送到哪个reduce)的操作该怎样实现,这个需要集成Partitioner类并重写getPartition方法,实现自己的分发策略。

alt

??这里面自己定义了hash方法。

?(2)介绍完了TextPair类,说一下Put类型,说Put类型之前,先看下处理shangpin_hbase.txtmapper实现:

alt

Put类型是Hbase自带的一个类型,由于shangpin_hbase.txt是从Hbase中导出来的,需要Hbase中的HBaseDocInputFormat.class进行解析,解析之后会成为<ImmutableBytesWritable, Put>的键值对,所以Mapperkeyvalue分别为ImmutableBytesWritablePut类型。Put”row”就是这条数据的rowkeyPut本质上是一个<byte[], List<KeyValue>结构的map,其中KeyValue也是Hbase所定义的一个数据类型,KeyValue是将Hbasetimestampfamilyqualifiervalue扁平化存储的数据结构,要细讲就多了,感兴趣的可以看一下hbase的源代码。本文在后面会一个将Put转化为String的代码。处理shangpin_hbase.txtmap输出的textpair的标识是”1”?

??

?2. 两个Mapper的工作介绍完了,接下来就要编写Reducer了。想一想,Reducer的任务比较简单,含有同样key的数据都在一起了。只要根据”0””1”的,取出自营,联营的识别标识,然后将这个塞到商品信息中就好了。

?但这里面还有一个坑,就是shuffle的group过程,其实MapReduce框架中,在Reduce之前有一个group操作,将数据进行分组,同一个分组的数据会在一次reduce函数中被处理。group默认会使用keycompareTo方法来进行分组操作,按照上面TextPaircompareTo方法,url相同的”0””1”数据是分不到一个group里面的。这样从业务逻辑上分析是有问题的,所以我们需要对group的比较方法进行调整,MapReduce框架中也可以自定义group的比较方法:

?alt

?这里我们设置,只有url相同,数据就会被放到同一个group里面。

下面是Reduce的代码以及 将Hbase中Put类型转化为String的方法:

alt

alt

至此,这次join操作就讲完了。附件中有实例代码。其实这个需求实现起来也可以不使用这么多的自定义函数,只不过文章中的实现更有助于了解MapReduce的原理。

 

By the way,用hive实现join是更简单的。。。