首页 > 代码库 > 用一个MapReduce job实现去重,多目录输出功能

用一个MapReduce job实现去重,多目录输出功能

总结之前工作中遇到的一个问题。


背景:
运维用scribe从apache服务器推送过来的日志有重复记录,所以这边的ETL处理要去重,还有个需求是要按业务类型多目录输出,方便挂分区,后面的使用。
这两个需求单独处理都没有问题,但要在一个mapreduce里完成,需要一点技巧。


1、map输入数据,经过一系列处理,输出时:
 if(ttype.equals("other")){
        	file = (result.toString().hashCode() & 0x7FFFFFFF)%400;
        }else if(ttype.equals("client")){
        	file = (result.toString().hashCode() & 0x7FFFFFFF)%260;
        }else{
        	file = (result.toString().hashCode()& 0x7FFFFFFF)%60;
        }
        tp = new TextPair(ttype+"_"+file, result.toString());
        
        context.write(tp, valuet);

 valuet是空的,什么都没有。
 我这里有三个类型,other,client,wap,分别代表日志来源平台,要按他们分目录输出。
 result就是整条记录。file得到的是最终输出文件名,hash,位操作,取模是为了输出均衡。
 map的输出结构<key,value> =(ttype+"_"+file,result.toString())
 这样做的目的是:保证相同的记录得到相同的key,同时还要保存类型。partition要按textPair的left,也就是这个key,
 保证了后面要写到同一个输出文件的所有记录都到同一个reduce里去,一个reduce可以写多个输出文件,但是一个输出文件不能来自多个reduce,原因很明了。
 这样的话大概400+260+60=720个输出文件,每个文件数据量大概差不多,job的reduce数我这里设置的240,这个数连同取模400,260,60都是根据我的数据量来定的,来尽量避免reduce的数据倾斜。
 
 
2、reduce方法去重:
 
 public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        
        rcfileCols = getRcfileCols(key.getSecond().toString().split("\001"));
        context.write(key.getFirst(), rcfileCols);


    }

    
  不用迭代,对相同的key组,只输出一次。注意这里job用到的比较器,一定不能是FirstComparator,而是整个textpair对的比较。(先比较left,再比较right)
  
  我的程序里输出文件格式是rcfile。
  
3、多目录输出:

 job.setOutputFormatClass(WapApacheMutiOutputFormat.class);
   
public class WapApacheMutiOutputFormat extends RCFileMultipleOutputFormat<Text, BytesRefArrayWritable> {
	Random r = new Random();
	protected String generateFileNameForKeyValue(Text key, BytesRefArrayWritable value,
			Configuration conf) {
		
		    String typedir = key.toString().split("_")[0];


			return typedir+"/"+key.toString();


	}
}



这里的RCFileMultipleOutputFormat是自己继承自FileOutputFormat 自己写的,主要实现了recordWriter。


最终输出去重的,分目录的数据文件。


理解的关键主要是partition key的设计,reduce的原理。