问题:
有如下数据文件 city.txt (id, city, value)
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。
1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:
1 | a = load ‘/data/city.txt‘ using PigStorage( ‘ ‘ ) as (id:chararray, city:chararray, value: int ); |
3 | c = foreach b {c1= order a by value desc ; c2=limit c1 2; generate group ,c2.value;}; |
4 | d = stream c through `sed ‘s/[(){}]//g‘ `; |
结果:这几行代码其实也实现了mysql中的 group_concat 函数的功能:1 | a = load ‘/data/city.txt‘ using PigStorage( ‘ ‘ ) as (id:chararray, city:chararray, value: int ); |
3 | c = foreach b {c1= order a by value desc ; generate group ,c1.value;}; |
4 | d = stream c through `sed ‘s/[(){}]//g‘ `; |
结果:2、下面我们再来看看hive如何处理group topk的问题:
本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,
比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?
1 | select * from city a where |
2 | 2>( select count (1) from city where cname=a.cname and value>a.value) |
3 | distribute by a.cname sort by a.cname,a.value desc ; |
http://my.oschina.net/leejun2005/blog/78904
但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:
排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。
好了,上代码:
(1)定义UDF:
01 | package com.example.hive.udf; |
02 | import org.apache.hadoop.hive.ql. exec .UDF; |
04 | public final class Rank extends UDF{ |
06 | private String last_key; |
07 | public int evaluate(final String key ){ |
08 | if ( ! key .equalsIgnoreCase(this.last_key) ) { |
12 | return this.counter++; |
(2)注册jar、建表、导数据,查询:2 | create temporary function rank as ‘com.example.hive.udf.Rank‘ ; |
3 | create table city(id int ,cname string,value int ) row format delimited fields terminated by ‘ ‘ ; |
4 | LOAD DATA LOCAL INPATH ‘city.txt‘ OVERWRITE INTO TABLE city; |
5 | select cname, value from ( |
6 | select cname,rank(cname) csum,value from ( |
7 | select id, cname, value from city distribute by cname sort by cname,value desc |
(3)结果:
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。
REF:hive中分组取前N个值的实现
http://baiyunl.iteye.com/blog/1466343
3、最后我们来看一下原生态的MR:
01 | import java.io.IOException; |
02 | import java.util.TreeSet; |
04 | import org.apache.hadoop.conf.Configuration; |
05 | import org.apache.hadoop.fs.Path; |
06 | import org.apache.hadoop.io.IntWritable; |
07 | import org.apache.hadoop.io.LongWritable; |
08 | import org.apache.hadoop.io.Text; |
09 | import org.apache.hadoop.mapreduce.Job; |
10 | import org.apache.hadoop.mapreduce.Mapper; |
11 | import org.apache.hadoop.mapreduce.Reducer; |
12 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
13 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
14 | import org.apache.hadoop.util.GenericOptionsParser; |
16 | public class GroupTopK { |
19 | public static class GroupTopKMapper extends |
20 | Mapper<LongWritable, Text, IntWritable, LongWritable> { |
21 | IntWritable outKey = new IntWritable(); |
22 | LongWritable outValue =http://www.mamicode.com/ new LongWritable(); |
23 | String[] valArr = null ; |
25 | public void map(LongWritable key, Text value, Context context) |
26 | throws IOException, InterruptedException { |
27 | valArr = value.toString().split( "\t" ); |
28 | outKey.set(Integer.parseInt(valArr[ 2 ])); |
29 | outValue.set(Long.parseLong(valArr[ 0 ])); |
30 | context.write(outKey, outValue); |
34 | public static class GroupTopKReducer extends |
35 | Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { |
37 | LongWritable outValue =http://www.mamicode.com/ new LongWritable(); |
39 | public void reduce(IntWritable key, Iterable<LongWritable> values, |
40 | Context context) throws IOException, InterruptedException { |
41 | TreeSet<Long> idTreeSet = new TreeSet<Long>(); |
42 | for (LongWritable val : values) { |
43 | idTreeSet.add(val.get()); |
44 | if (idTreeSet.size() > 3 ) { |
45 | idTreeSet.remove(idTreeSet.first()); |
48 | for (Long id : idTreeSet) { |
50 | context.write(key, outValue); |
55 | public static void main(String[] args) throws Exception { |
56 | Configuration conf = new Configuration(); |
57 | String[] otherArgs = new GenericOptionsParser(conf, args) |
60 | System.out.println(otherArgs.length); |
61 | System.out.println(otherArgs[ 0 ]); |
62 | System.out.println(otherArgs[ 1 ]); |
64 | if (otherArgs.length != 3 ) { |
65 | System.err.println( "Usage: GroupTopK <in> <out>" ); |
68 | Job job = new Job(conf, "GroupTopK" ); |
69 | job.setJarByClass(GroupTopK. class ); |
70 | job.setMapperClass(GroupTopKMapper. class ); |
71 | job.setReducerClass(GroupTopKReducer. class ); |
72 | job.setNumReduceTasks( 1 ); |
73 | job.setOutputKeyClass(IntWritable. class ); |
74 | job.setOutputValueClass(LongWritable. class ); |
75 | FileInputFormat.addInputPath(job, new Path(otherArgs[ 1 ])); |
76 | FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 2 ])); |
77 | System.exit(job.waitForCompletion( true ) ? 0 : 1 ); |
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
结果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
数据验证:
awk ‘$3==0{print $1}‘ record_new.txt|sort -nr|head -3
12869976
12869971
12869695
可以看到结果没有问题。
注:测试数据由以下脚本生成:
http://my.oschina.net/leejun2005/blog/76631
PS:
如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。
pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。
附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
编写Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113