首页 > 代码库 > MapReduce实现数据去重

MapReduce实现数据去重

一、原理分析

  Mapreduce的处理过程,由于Mapreduce会在Map~reduce中,将重复的Key合并在一起,所以Mapreduce很容易就去除重复的行。Map无须做任何处理,设置Map中写入context的东西为不作任何处理的行,也就是Map中最初处理的value即可,而Reduce同样无须做任何处理,写入输出文件的东西就是,最初得到的Key。

  我原来以为是map阶段用了hashmap,根据hash值的唯一性。估计应该不是...

  Map是输入文件有几行,就运行几次。

二、代码

2.1 Mapper

package algorithm;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class DuplicateRemoveMapper extends		Mapper<LongWritable, Text, Text, Text> {	//输入文件是数字 不过可能也有字符等 所以用Text,不用LongWritable	public void map(LongWritable key, Text value, Context context)			throws IOException, InterruptedException {		context.write(value, new Text());//后面不能是null,否则,空指针	}}

  

2.2 Reducer

package algorithm;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class DuplicateRemoveReducer extends Reducer<Text, Text, Text, Text> {	public void reduce(Text key, Iterable<Text> value, Context context)			throws IOException, InterruptedException {		// process values		context.write(key, null); //可以出处null	}}

  

2.3 Main

package algorithm;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DuplicateMainMR  {	public static void main(String[] args) throws Exception{		// TODO Auto-generated method stub		Configuration conf = new Configuration(); 		Job job = new Job(conf,"DuplicateRemove");		job.setJarByClass(DuplicateMainMR.class);		job.setMapperClass(DuplicateRemoveMapper.class);		job.setReducerClass(DuplicateRemoveReducer.class);		job.setOutputKeyClass(Text.class);		//输出是null,不过不能随意写  否则包类型不匹配		job.setOutputValueClass(Text.class);				job.setNumReduceTasks(1);		//hdfs上写错了文件名 DupblicateRemove  多了个b		//hdfs不支持修改操作		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.58.180:8020/ClassicalTest/DupblicateRemove/DuplicateRemove.txt"));		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.58.180:8020/ClassicalTest/DuplicateRemove/DuplicateRemoveOut"));		System.exit(job.waitForCompletion(true) ? 0 : 1);	}}

  

三、输出分析

3.1 输入与输出

没啥要对比的....不贴了

3.2 控制台

 

doop.mapreduce.Job.updateStatus(Job.java:323)  INFO - Job job_local4032991_0001 completed successfully DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:765)  INFO - Counters: 38	File System Counters		FILE: Number of bytes read=560		FILE: Number of bytes written=501592		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0		HDFS: Number of bytes read=48		HDFS: Number of bytes written=14		HDFS: Number of read operations=13		HDFS: Number of large read operations=0		HDFS: Number of write operations=4	Map-Reduce Framework		Map input records=8		Map output records=8		Map output bytes=26		Map output materialized bytes=48		Input split bytes=142		Combine input records=0		Combine output records=0		Reduce input groups=6		Reduce shuffle bytes=48		Reduce input records=8		Reduce output records=6		Spilled Records=16		Shuffled Maps =1		Failed Shuffles=0		Merged Map outputs=1		GC time elapsed (ms)=4		CPU time spent (ms)=0		Physical memory (bytes) snapshot=0		Virtual memory (bytes) snapshot=0		Total committed heap usage (bytes)=457179136	Shuffle Errors		BAD_ID=0		CONNECTION=0		IO_ERROR=0		WRONG_LENGTH=0		WRONG_MAP=0		WRONG_REDUCE=0	File Input Format Counters 		Bytes Read=24	File Output Format Counters 		Bytes Written=14 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:323) DEBUG - stopping client from cache: org.apache.hadoop.ipc.Client@37afeb11 DEBUG - removing client from cache: org.apache.hadoop.ipc.Client@37afeb11 DEBUG - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@37afeb11 DEBUG - Stopping client DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: closed DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: stopped, remaining connections 0

 

MapReduce实现数据去重