首页 > 代码库 > Scala实现Mapreduce程序4-----数据去重

Scala实现Mapreduce程序4-----数据去重

数据去重,key只输出一次

scala实现:先groupByKey(),然后SortByKey(),然后输出keys

 

object Reduplicate {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("remove duplication");
val sc=new SparkContext(conf);
val line=sc.textFile("");
line.filter(_.trim.length>0).map(line=>(line.trim,"")).groupByKey().sortByKey().keys.collect()

}

}

MapReduce实现:以整个数据作为key,reduce过程中输出key就可以了

package HadoopvsSpark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;

//对数据进行去重,以整个数据作为key,reduce过程中输出key就可以了

/**
* Created by Administrator on 2017/5/25.
*/
public class Duplicate {
public static class Map extends Mapper<Object,Text,Text,Text>{

private static Text text=new Text( );
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
text=value;
context.write( new Text(text),new Text(" ") );
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
context.write( key,new Text( "" ) );
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job=new Job();
Configuration conf=new Configuration();
conf.set("mapred.job.tracker","192.169.1.101:8200");

String[] input=new String[]{"a","s"};
String[] otherArgs=new GenericOptionsParser(conf,input).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);
}

job.setJarByClass(Duplicate.class);
FileInputFormat.addInputPath( job,new Path( otherArgs[0] ) );
FileOutputFormat.setOutputPath( job,new Path(otherArgs[1]) );

job.setOutputKeyClass( Text.class);
job.setOutputValueClass( Text.class );
job.setMapperClass( Map.class );
job.setCombinerClass( Reduce.class );
job.setReducerClass( Reduce.class);
System.out.println(job.waitForCompletion( true )? 1:0 );
}

}

Scala实现Mapreduce程序4-----数据去重