首页 > 代码库 > Hadoop DistributedCache分布式缓存的使用

Hadoop DistributedCache分布式缓存的使用

转载请注明:http://www.cnblogs.com/demievil/p/4059141.html

做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签。因为标签库不是很大,没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件。


main方法中的配置:

//分布式缓存要存储的文件路径String cachePath[] = {                "hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv",                "hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"        };//向分布式缓存中添加文件        job.addCacheFile(new Path(cachePath[0]).toUri());        job.addCacheFile(new Path(cachePath[1]).toUri());

参考上面代码即可向分布式缓存中添加文件。


在Mapper和Reducer方法中读取分布式缓存文件:

/* * 重写Mapper的setup方法,获取分布式缓存中的文件 */    @Override    protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)                   throws IOException, InterruptedException {        // TODO Auto-generated method stub        super.setup(context);        URI[] cacheFile = context.getCacheFiles();        Path tagSetPath = new Path(cacheFile[0]);        Path tagedUrlPath = new Path(cacheFile[1]);        文件操作(如把内容读到set或map中);    }@Overridepublic void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException {            在map()中使用读取出的数据;      }

同样,如果在Reducer中也要读取分布式缓存文件,示例如下:

/* * 重写Reducer的setup方法,获取分布式缓存中的文件 */    @Override    protected void setup(Context context)                    throws IOException, InterruptedException {        super.setup(context);        mos = new MultipleOutputs<Text, Text>(context);              URI[] cacheFile = context.getCacheFiles();        Path tagSetPath = new Path(cacheFile[0]);        Path tagSetPath = new Path(cacheFile[1]);        文件读取操作;    } @Override  public void reduce(Text key, Iterable<Text> values, Context context)              throws IOException, InterruptedException {      while(values.iterator().hasNext()){          使用读取出的数据;      }       context.write(key, new Text(sb.toString()));      }

 

以上。

Hadoop DistributedCache分布式缓存的使用