首页 > 代码库 > Hadoop0.20.2 Bloom filter应用示例

Hadoop0.20.2 Bloom filter应用示例

1. 简介

    参见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69

    

           

2. 案例

    网上大部分的说明仅仅是按照《Hadoop in Action》中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter。

    案例文件如下:

    customers.txt

    1,Stephanie Leung,555-555-5555
    2,Edward Kim,123-456-7890
    3,Jose Madriz,281-330-8004
    4,David Stork,408-555-0000

    -----------------------------------------------------------------

    orders.txt

    3,A,12.95,02-Jun-2008
    1,B,88.25,20-May-2008
    2,C,32.00,30-Nov-2007
    3,D,25.02,22-Jan-2009
    5,E,34.59,05-Jan-2010
    6,F,28.67,16-Jan-2008
    7,G,49.82,24-Jan-2009

    两个文件通过customer ID关联。

3. 代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class BloomMRMain {
	public static class BloomMapper extends Mapper<Object, Text, Text, Text> {
		BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH);
		
		protected void setup(Context context) throws IOException ,InterruptedException {
			Configuration conf = context.getConfiguration();
			
			String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt";
			Path file = new Path(path);
			
			FileSystem hdfs = FileSystem.get(conf);
			FSDataInputStream dis = hdfs.open(file);
			BufferedReader reader = new BufferedReader(new InputStreamReader(dis));
			String temp;  
			while ((temp = reader.readLine()) != null) { 
//				System.out.println("bloom filter temp:" + temp);
				String[] tokens = temp.split(",");
				if (tokens.length > 0) {
					bloomFilter.add(new Key(tokens[0].getBytes()));
				}
			}
		}
		
		protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException {
			//获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            if (pathName.contains("customers")) {
            	String data = http://www.mamicode.com/value.toString();>