首页 > 代码库 > Hadoop的Reduce Join+BloomFilter实现表链接

Hadoop的Reduce Join+BloomFilter实现表链接

适用于场景

连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有MAP端过滤掉不需要JOIN的数据,这样传到REDUCE的数据量减少,减少了网络传及磁盘IO。

缺点

Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间。并且,最终的JOIN在REDUCE端还要进行比对,所以对最终结果无影响。

 

 

下面我们先来简单了解下什么是布隆过滤器?

Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。如文章标题所述,本文只是做简单介绍,属于科普文章。

应用场景
在正式介绍Bloom Filter算法之前,先来看看什么时候需要用到Bloom Filter算法。
1. HTTP缓存服务器、Web爬虫等
主要工作是判断一条URL是否在现有的URL集合之中(可以认为这里的数据量级上亿)。
对于HTTP缓存服务器,当本地局域网中的PC发起一条HTTP请求时,缓存服务器会先查看一下这个URL是否已经存在于缓存之中,如果存在的话就没有必要去原始的服务器拉取数据了(为了简单起见,我们假设数据没有发生变化),这样既能节省流量,还能加快访问速度,以提高用户体验。
对于Web爬虫,要判断当前正在处理的网页是否已经处理过了,同样需要当前URL是否存在于已经处理过的URL列表之中。

2. 垃圾邮件过滤
假设邮件服务器通过发送方的邮件域或者IP地址对垃圾邮件进行过滤,那么就需要判断当前的邮件域或者IP地址是否处于黑名单之中。如果邮件服务器的通信邮件数量非常大(也可以认为数据量级上亿),那么也可以使用Bloom Filter算法。


1、BloomFilter能解决什么问题?
以少量的内存空间判断一个元素是否属于这个集合, 代价是有一定的错误率

2、工作原理
1. 初始化一个数组, 所有位标为0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始为0)
2. 将已知集合S中的每一个数组, 按以下方式映射到A中
2.0 选取n个互相独立的hash函数 h1, h2, … hk
2.1 将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi)
2.2 将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作)
3. 对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x)
如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S


3、几个前提
1. hash函数的计算不能性能太差, 否则得不偿失
2. 任意两个hash函数之间必须是独立的.
即任意两个hash函数不存在单一相关性, 否则hash到其中一个索引上的元素也必定会hash到另一个相关的索引上, 这样多个hash没有意义


4、错误率
工作原理的第3步, 的出来的结论, 一个是绝对靠谱的, 一个是不能100%靠谱的。在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。关于具体的错误率,这和最优的哈希函数个数以及位数组的大小有关,而这是可以估算求得一个最优解的:
哈希函数个数k、位数组大小m及字符串数量n之间存在相互关系。相关文献证明了对于给定的m、n,当 k = ln(2)* m/n 时出错的概率是最小的。 具体的请看:http://blog.csdn.net/jiaomeng/article/details/1495500


5、基本特征
从以上对基本原理和数学基础的分析,我们可以得到Bloom filter的如下基本特征,用于指导实际应用。
(1)存在一定错误率,发生在正向判断上(存在性),反向判断不会发生错误(不存在性);
(2)错误率是可控制的,通过改变位数组大小、hash函数个数或更低碰撞率的hash函数来调节;
(3)保持较低的错误率,位数组空位至少保持在一半以上;
(4)给定m和n,可以确定最优hash个数,即k = ln2 * (m/n),此时错误率最小;
(5)给定允许的错误率E,可以确定合适的位数组大小,即m >= log2(e) * (n * log2(1/E)),继而确定hash函数个数k;
(6)正向错误率无法完全消除,即使不对位数组大小和hash函数个数进行限制,即无法实现零错误率;
(7)空间效率高,仅保存“存在状态”,但无法存储完整信息,需要其他数据结构辅助存储;
(8)不支持元素删除操作,因为不能保证删除的安全性。


6、应用场景举例:
(1)拼写检查、数据库系统、文件系统
(2)假设要你写一个网络蜘蛛(web crawler)。由于网络间的链接错综复杂,蜘蛛在网络间爬行很可能会形成“环”。为了避免形成“环”,就需要知道蜘蛛已经访问过那些URL。给一个URL,怎样知道蜘蛛是否已经访问过呢?
(3)网络应用
  P2P网络中查找资源操作,可以对每条网络通路保存Bloom Filter,当命中时,则选择该通路访问。
  广播消息时,可以检测某个IP是否已发包。
  检测广播消息包的环路,将Bloom Filter保存在包里,每个节点将自己添加入Bloom Filter。
  信息队列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾邮件地址过滤
  像网易,QQ这样的公众电子邮件(email)提供商,总是需要过滤来自发送垃圾邮件的人(spamer)的垃圾邮件。一个办法就是记录下那些发垃圾邮件的email 地址。由于那些发送者不停地在注册新的地址,全世界少说也有几十亿个发垃圾邮件的地址,将他们都存起来则需要大量的网络服务器。如果用哈希表,每存储一亿个 email 地址,就需要1.6GB 的内存(用哈希表实现的具体办法是将每一个email 地址对应成一个八字节的信息指纹,然后将这些信息指纹存入哈希表,由于哈希表的存储效率一般只有50%,因此一个email 地址需要占用十六个字节。一亿个地址大约要1.6GB, 即十六亿字节的内存)。因此存贮几十亿个邮件地址可能需要上百GB 的内存。而Bloom Filter只需要哈希表1/8 到1/4 的大小就能解决同样的问题。Bloom Filter决不会漏掉任何一个在黑名单中的可疑地址。而至于误判问题,常见的补救办法是在建立一个小的白名单,存储那些可能别误判的邮件地址。
(5)Bloomfilter在HBase中的作用
HBase利用Bloomfilter来提高随机读(Get)的性能,对于顺序读(Scan)而言,设置Bloomfilter是没有作用的(0.92以后,如果设置了bloomfilter为ROWCOL,对于指定了qualifier的Scan有一定的优化,但不是那种直接过滤文件,排除在查找范围的形式)
Bloomfilter在HBase中的开销?
Bloomfilter是一个列族(cf)级别的配置属性,如果你在表中设置了Bloomfilter,那么HBase会在生成StoreFile时包含一份bloomfilter结构的数据,称其为MetaBlock;MetaBlock与DataBlock(真实的KeyValue数据)一起由LRUBlockCache维护。所以,开启bloomfilter会有一定的存储及内存cache开销。
Bloomfilter如何提高随机读(Get)的性能?
对于某个region的随机读,HBase会遍历读memstore及storefile(按照一定的顺序),将结果合并返回给客户端。如果你设置了bloomfilter,那么在遍历读storefile时,就可以利用bloomfilter,忽略某些storefile。
注意:hbase的bloom filter是惰性加载的,在写压力比较大的情况下,会有不停的compact并产生storefile,那么新的storefile是不会马上将bloom filter加载到内存的,等到读请求来的时候才加载。
这样问题就来了,第一,如果storefile设置的比较大,max size为2G,这会导致bloom filter也比较大;第二,系统的读写压力都比较大。这样或许会经常出现单个 GET请求花费3-5秒的超时现象。

 使用布隆过滤器,之前,需要先对需要处理的key,生成处理的二进制文件,代码如下:

package com.reducebloomfilterjoin;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.util.zip.GZIPInputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.util.bloom.BloomFilter;import org.apache.hadoop.util.bloom.Key;import org.apache.hadoop.util.hash.Hash;/** * 布隆过滤器 * 生成二进制文件数据 *  * **/public class TrainingBloomData {    public static int getOptimalBloomFilterSize(int numRecords,    float falsePosRate) {        int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math                .pow(Math.log(2), 2));        return size;    }        public static int getOptimalK(float numMembers, float vectorSize) {                        return (int) Math.round(vectorSize / numMembers * Math.log(2));                    }        public static void main(String[] args) throws IOException {                 Path inputFile = new Path("hdfs://192.168.75.130:9000/root/bloom/kk.txt");        int numMembers = Integer.parseInt("10");        float falsePosRate = Float.parseFloat("0.01");        Path bfFile = new Path("hdfs://192.168.75.130:9000/root/bloom/bloom.bin");                 FileSystem fss=FileSystem.get(new Configuration());         if(fss.exists(bfFile)){             fss.delete(bfFile, true);             System.out.println("存在此路径,已经删除!");         }         // Calculate our vector size and optimal K value based on approximations        int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);        int nbHash = getOptimalK(numMembers, vectorSize);         // create new Bloom filter        BloomFilter filter = new BloomFilter(vectorSize, nbHash,                Hash.MURMUR_HASH);        // Open file for read         System.out.println("Training Bloom filter of size " + vectorSize                + " with " + nbHash + " hash functions, " + numMembers                + " approximate number of records, and " + falsePosRate                + " false positive rate");         String line = null;        int numRecords = 0;        FileSystem fs = FileSystem.get(new Configuration());        for (FileStatus status : fs.listStatus(inputFile)) {            BufferedReader rdr;            // if file is gzipped, wrap it in a GZIPInputStream            if (status.getPath().getName().endsWith(".gz")) {                rdr = new BufferedReader(new InputStreamReader(                        new GZIPInputStream(fs.open(status.getPath()))));            } else {                rdr = new BufferedReader(new InputStreamReader(fs.open(status                        .getPath())));            }             System.out.println("Reading " + status.getPath());            while ((line = rdr.readLine()) != null) {                filter.add(new Key(line.getBytes()));                ++numRecords;            }             rdr.close();        }         System.out.println("Trained Bloom filter with " + numRecords                + " entries.");         System.out.println("Serializing Bloom filter to HDFS at " + bfFile);        FSDataOutputStream strm = fs.create(bfFile);        filter.write(strm);         strm.flush();        strm.close();         System.out.println("Done training Bloom filter.");     }                }

测试数据如下:
小表模拟数据
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862

大表模拟数据
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10

 

package com.reducebloomfilterjoin;import java.io.BufferedReader;import java.io.DataInput;import java.io.DataInputStream;import java.io.DataOutput;import java.io.FileInputStream;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.bloom.BloomFilter;import org.apache.hadoop.util.bloom.Key;/*** *  * Hadoop1.2的版本 *  *  * ReduceJoin+布隆过滤器   实现Reduce Join *  * @author qindongliang *  *    大数据交流群:376932160 *  搜索技术交流群:324714439 *  *  *  * **/public class ReduceBoolmFilter {                 /**     *      *      * 自定义一个输出实体     *      * **/    private static class CombineEntity implements WritableComparable<CombineEntity>{                private Text joinKey;//连接key        private Text flag;//文件来源标志        private Text secondPart;//除了键外的其他部分的数据                        public CombineEntity() {            // TODO Auto-generated constructor stub            this.joinKey=new Text();            this.flag=new Text();            this.secondPart=new Text();        }                public Text getJoinKey() {            return joinKey;        }        public void setJoinKey(Text joinKey) {            this.joinKey = joinKey;        }        public Text getFlag() {            return flag;        }        public void setFlag(Text flag) {            this.flag = flag;        }        public Text getSecondPart() {            return secondPart;        }        public void setSecondPart(Text secondPart) {            this.secondPart = secondPart;        }        @Override        public void readFields(DataInput in) throws IOException {            this.joinKey.readFields(in);            this.flag.readFields(in);            this.secondPart.readFields(in);                    }        @Override        public void write(DataOutput out) throws IOException {            this.joinKey.write(out);            this.flag.write(out);            this.secondPart.write(out);                    }        @Override        public int compareTo(CombineEntity o) {            // TODO Auto-generated method stub            return this.joinKey.compareTo(o.joinKey);        }                            }                    private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{                private CombineEntity combine=new CombineEntity();        private Text flag=new Text();        private  Text joinKey=new Text();        private Text secondPart=new Text();        /**         *使用布隆过滤器存储key         * 代替原来的HashSet存储         *          * */        //private HashSet<String> joinKeySet=new HashSet<String>();                BloomFilter filter=new BloomFilter();        @Override        protected void setup(Context context)throws IOException, InterruptedException {                     //读取文件流            BufferedReader br=null;            String temp;            // 获取DistributedCached里面 的共享文件            Path paths[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());                  for (Path path : paths) {                                                                if (path.toString().contains("bloom.bin")) {                                                                    DataInputStream strm = new DataInputStream(                                                                            new FileInputStream(path.toString()));                                                                    // Read into our Bloom filter.                                                                    filter.readFields(strm);                                                                    strm.close();                                                                }                                                            }                                                           }                                @Override        protected void map(LongWritable key, Text value,Context context)                throws IOException, InterruptedException {                                   //获得文件输入路径            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();                    if(pathName.endsWith("a.txt")){                                String  valueItems[]=value.toString().split(",");                                                /**                 * 在这里过滤必须要的连接字符                 *                  * */                //if(joinKeySet.contains(valueItems[0])){                if(filter.membershipTest(new Key(valueItems[0].getBytes()))){                    //设置标志位                    flag.set("0");                       //设置链接键                    joinKey.set(valueItems[0]);                              //设置第二部分                    secondPart.set(valueItems[1]+"\t"+valueItems[2]);                                        //封装实体                    combine.setFlag(flag);//标志位                    combine.setJoinKey(joinKey);//链接键                    combine.setSecondPart(secondPart);//其他部分                                         //写出                    context.write(combine.getJoinKey(), combine);                    }else{                    System.out.println("a.txt里");                    System.out.println("在小表中无此记录,执行过滤掉!");                    for(String v:valueItems){                        System.out.print(v+"   ");                    }                                        return ;                                    }                                                            }else if(pathName.endsWith("b.txt")){                String  valueItems[]=value.toString().split(",");                                /**                 *                  * 判断是否在集合中                 *                  * */                if(filter.membershipTest(new Key(valueItems[0].getBytes()))){                //if(joinKeySet.contains(valueItems[0])){                    //if(joinKeySet.contains(valueItems[0])){                    //设置标志位                    flag.set("1");                                           //设置链接键                    joinKey.set(valueItems[0]);                              //设置第二部分注意不同的文件的列数不一样                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);                                        //封装实体                    combine.setFlag(flag);//标志位                    combine.setJoinKey(joinKey);//链接键                    combine.setSecondPart(secondPart);//其他部分                                         //写出                    context.write(combine.getJoinKey(), combine);                                                        }else{                                        //执行过滤 ......                    System.out.println("b.txt里");                    System.out.println("在小表中无此记录,执行过滤掉!");                    for(String v:valueItems){                        System.out.print(v+"   ");                    }                                        return ;                                                        }                                                                        }                                                                                          }            }            private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{                        //存储一个分组中左表信息        private List<Text> leftTable=new ArrayList<Text>();        //存储一个分组中右表信息        private List<Text> rightTable=new ArrayList<Text>();                private Text secondPart=null;                private Text output=new Text();                                 //一个分组调用一次        @Override        protected void reduce(Text key, Iterable<CombineEntity> values,Context context)                throws IOException, InterruptedException {             leftTable.clear();//清空分组数据             rightTable.clear();//清空分组数据                                       /**              * 将不同文件的数据,分别放在不同的集合              * 中,注意数据量过大时,会出现              * OOM的异常              *               * **/                          for(CombineEntity ce:values){                                  this.secondPart=new Text(ce.getSecondPart().toString());                                                   //左表                                  if(ce.getFlag().toString().trim().equals("0")){                     leftTable.add(secondPart);                                      }else if(ce.getFlag().toString().trim().equals("1")){                                          rightTable.add(secondPart);                                      }                                                                                 }                          //=====================             for(Text left:leftTable){                                  for(Text right:rightTable){                                          output.set(left+"\t"+right);//连接左右数据                     context.write(key, output);//输出                 }                              }                                                           }            }                                    public static void main(String[] args)throws Exception {                                  //Job job=new Job(conf,"myjoin");         JobConf conf=new JobConf(ReduceBoolmFilter.class);            conf.set("mapred.job.tracker","192.168.75.130:9001");            conf.setJar("tt.jar");                                //小表共享            String bpath="hdfs://192.168.75.130:9000/root/bloom/bloom.bin";            //添加到共享cache里        DistributedCache.addCacheFile(new URI(bpath), conf);                  Job job=new Job(conf, "aaaaa");         job.setJarByClass(ReduceBoolmFilter.class);         System.out.println("模式:  "+conf.get("mapred.job.tracker"));;                           //设置Map和Reduce自定义类         job.setMapperClass(JMapper.class);         job.setReducerClass(JReduce.class);                  //设置Map端输出         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(CombineEntity.class);                  //设置Reduce端的输出         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);                      job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);                       FileSystem fs=FileSystem.get(conf);                  Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew5");                  if(fs.exists(op)){             fs.delete(op, true);             System.out.println("存在此输出路径,已删除!!!");         }                        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));      FileOutputFormat.setOutputPath(job, op);             System.exit(job.waitForCompletion(true)?0:1);                                                                                                                                        }            }

运行情况

模式:  192.168.75.130:9001存在此输出路径,已删除!!!WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWARN - LoadSnappy.<clinit>(46) | Snappy native library not loadedINFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404300232_0003INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404300232_0003INFO - Counters.log(585) | Counters: 29INFO - Counters.log(587) |   Job Counters INFO - Counters.log(589) |     Launched reduce tasks=1INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12165INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0INFO - Counters.log(589) |     Launched map tasks=2INFO - Counters.log(589) |     Data-local map tasks=2INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9890INFO - Counters.log(587) |   File Output Format Counters INFO - Counters.log(589) |     Bytes Written=172INFO - Counters.log(587) |   FileSystemCountersINFO - Counters.log(589) |     FILE_BYTES_READ=237INFO - Counters.log(589) |     HDFS_BYTES_READ=455INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169734INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172INFO - Counters.log(587) |   File Input Format Counters INFO - Counters.log(589) |     Bytes Read=227INFO - Counters.log(587) |   Map-Reduce FrameworkINFO - Counters.log(589) |     Map output materialized bytes=243INFO - Counters.log(589) |     Map input records=10INFO - Counters.log(589) |     Reduce shuffle bytes=243INFO - Counters.log(589) |     Spilled Records=16INFO - Counters.log(589) |     Map output bytes=215INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944INFO - Counters.log(589) |     CPU time spent (ms)=1810INFO - Counters.log(589) |     Combine input records=0INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228INFO - Counters.log(589) |     Reduce input records=8INFO - Counters.log(589) |     Reduce input groups=4INFO - Counters.log(589) |     Combine output records=0INFO - Counters.log(589) |     Physical memory (bytes) snapshot=444403712INFO - Counters.log(589) |     Reduce output records=4INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184323072INFO - Counters.log(589) |     Map output records=8

运行结果

1    三劫散仙    13575468248    B    89    2013-02-052    凤舞九天    18965235874    C    69    2013-03-093    忙忙碌碌    15986854789    A    99    2013-03-053    忙忙碌碌    15986854789    D    56    2013-06-07