首页 > 代码库 > Scala实现Mapreduce程序2-----Top5

Scala实现Mapreduce程序2-----Top5

输入n个数,返回TOP5的数字

scala实现,以各个数字为key,""为空,按照key进行排序,取出前5个

object Top5 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("")
val sc = new SparkContext(conf)
val one = sc.textFile("/spark/test")
var index=0
val text=one.filter(x=>(x.trim.length>0)&&(x.split(",").length==4)).map(_.split(",")(2).toInt).
map(x=>(x,"")).sortByKey(false).map(x=>x._1).take(5).foreach(x=>{
index=index+1
println("top index:"+index+"\t"+x)
})

}
}

Mapreduce实现,(key,"") =>(index+"",key)

MapReduce中的IntWritable默认是按照降序排列的,要实现升序排序,自己实现MyIntWritabel
public class MyIntWritable implements WritableComparable<MyIntWritable> {
private Integer num;

public MyIntWritable(Integer num){
this.num=num;
}

public MyIntWritable(){}

public void write(DataOutput output) throws IOException {
output.writeInt(num);
}

public void readFields(DataInput input) throws IOException {
this.num=input.readInt();
}

public int compareTo(MyIntWritable o){
int minux=this.num-o.num;
return minux*(-1);
}

@Override
public int hashCode() {
return this.num.hashCode();
}

public String toSting(){
return this.num+"";
}

public boolean equals(Object obj) {
if (obj instanceof MyIntWritable) {
return false;
}
MyIntWritable ok2 = (MyIntWritable) obj;
return (this.num == ok2.num);
}
}
package HadoopvsSpark;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created by Administrator on 2017/5/26.
*/
public class TopN {
public static class TopNMapper extends Mapper<LongWritable,Text,MyIntWritable,Text>{
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
String line=value.toString();
if(line.trim().length()>0){
String str[]=line.split( "," );
if(str.length==4){
context.write( new MyIntWritable( Integer.parseInt( str[2] ) ),new Text( "" ) );
}
}
}
}

public static class TopNReducer extends Reducer<MyIntWritable,Text,Text,MyIntWritable>{
private int index=0;
public void reduce(MyIntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
index++;
if(index<=5){
context.write( new Text( index+" " ),key );
}
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {


org.apache.hadoop.conf.Configuration conf=new org.apache.hadoop.conf.Configuration();
Job job=new Job(conf,"topn");
job.setJarByClass( TopN.class );

job.setMapperClass( TopNMapper.class );
job.setMapOutputKeyClass( MyIntWritable.class );
job.setMapOutputValueClass( Text.class );

job.setReducerClass( TopNReducer.class );
job.setOutputKeyClass( Text.class);
job.setOutputValueClass( MyIntWritable.class );

FileInputFormat.addInputPath( job,new Path( args[0] ) );
Path outputdir=new Path( args[1] );
FileSystem fs=FileSystem.get( conf ); //判断输出目录是否存在
if(fs.exists( outputdir )){
fs.delete( outputdir,true );
}
FileOutputFormat.setOutputPath( job,outputdir ) ;
System.out.println(job.waitForCompletion( true )?1:0);
}
}

 

Scala实现Mapreduce程序2-----Top5