首页 > 代码库 > hadoop1-TopK问题实现

hadoop1-TopK问题实现

1、对于排名,一般都是很热衷的,那么如何实现在数据量多的情况下,得到所需要的数据呢,选取前几名的实际应用中,也会有许多,形成统一的算法实现,比着参考就可以了。

2、数据文件a.txt:

2

4

6

7

9

6

4

3、输出数据为(例如取前三名,前面为数据,后面为名次,名次可通过输入参数配置):

9        1

7        2

6        3

4、设计思路:

Map端:

           1)因为是topK问题,自然想到可以自己设计key,重写比较方法,按自己所想要的比较方法实现即可。

           2)map端输出,仅需输出最后的n个最大数即可,所以需要在最后的那个cleanup方法里实现,通过一个成员变量list保存数据。

           3)最后输出此list中的数据即可

    4)list为自定义的一个容器,在往里面添加元素的时候,会进行比较。

    5)在这个例子中,没有使用list,而是直接采用map的suffle对key进行的排序。一个map的输出量会很多。下一篇优化里面使用了list

Reduce端:

  1)  类似一个map,只针对key进行处理即可,输出前n的数值(此n个key已是经过suffle排过序的,取前n个即可,其它丢弃)。

5、程序实现:

  

package test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class TopK {	public static class Map extends Mapper<Object, Text, MyKey, NullWritable>{		//此处可以进行优化,只需输出此map处理的最大的那几个值即可。		protected void map(Object key, Text value, Context context) 				throws java.io.IOException ,InterruptedException {			try {				int num = Integer.parseInt(value.toString());				context.write(new MyKey(num), NullWritable.get());			} catch (Exception e) {				// TODO: handle exception				return ;			}		};	}	public static class Reduce extends Reducer<MyKey, NullWritable, Text, NullWritable>{		private static Text k = new Text();		//默认值,获取的个数		private static int count = 5;		//开始值		private static int start = 1;				//初始化时获取配置的参数		protected void setup(Context context) 				throws IOException ,InterruptedException {			//取得配置的参数,需要获取几个值			count = Integer.parseInt(context.getConfiguration().get("top_num"));		};				protected void reduce(MyKey key, Iterable<NullWritable> values, Context context) 				throws IOException ,InterruptedException {			//因为key是排序过来的,所以输出count个值就可以了			if(start <= count){				k.set(key.getNum()+"\t"+start);				context.write(k, NullWritable.get());				start++;			}		};	}	public static void main(String[] args) throws Exception {		Configuration conf = new Configuration();				String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();		if(otherArgs.length != 3){			System.err.println("Usage:TopK");			System.exit(2);		}		//参数3 为要获取的最大个数		conf.set("top_num", args[2]);		Job job = new Job(conf, "TopK");		job.setJarByClass(TopK.class);				job.setMapperClass(Map.class);		job.setReducerClass(Reduce.class);				job.setMapOutputKeyClass(MyKey.class);		job.setMapOutputValueClass(NullWritable.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(NullWritable.class);				FileInputFormat.addInputPath(job, new Path(args[0]));		FileOutputFormat.setOutputPath(job, new Path(args[1]));				System.exit(job.waitForCompletion(true) ? 0 : 1);	}	private static class MyKey implements WritableComparable<MyKey>{		private int num;				public int getNum() {			return num;		}		public MyKey() {		}		public MyKey(int num) {			super();			this.num = num;		}		@Override		public void readFields(DataInput in) throws IOException {			// TODO Auto-generated method stub			num = in.readInt();		}		@Override		public void write(DataOutput out) throws IOException {			// TODO Auto-generated method stub			out.writeInt(num);		}		@Override		public int compareTo(MyKey o) {			// TODO Auto-generated method stub			//反序输出			return o.num - this.num;		}	}}

 

计算结果:

9        1

7        2

6        3