Pig、Hive、MapReduce 解决分组 Top K 问题(转)

问题:

有如下数据文件 city.txt (id, city, value)

cat city.txt 
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。

1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:

1 a = load ‘/data/city.txt‘  using PigStorage(‘ ‘as (id:chararray, city:chararray, value:int);
2 b = group by city;
3 c = foreach b {c1=order by value desc; c2=limit c1 2; generate group,c2.value;};
4 d = stream c through `sed ‘s/[(){}]//g‘`;
5 dump d;

结果:

1 (bj,600,300)
2 (sh,900,400)
3 (wh,500,200)

这几行代码其实也实现了mysql中的 group_concat 函数的功能:

1 a = load ‘/data/city.txt‘  using PigStorage(‘ ‘as (id:chararray, city:chararray, value:int);
2 b = group by city;
3 c = foreach b {c1=order by value desc;  generate group,c1.value;};
4 d = stream c through `sed ‘s/[(){}]//g‘`;
5 dump d;

结果:

1 (bj,600,300,100)
2 (sh,900,400,200)
3 (wh,500,200,100)

2、下面我们再来看看hive如何处理group topk的问题:

本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,

比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?

1 select from city a where
2 2>(select count(1) from city where cname=a.cname and value>a.value)
3 distribute by a.cname sort by a.cname,a.value desc;

http://my.oschina.net/leejun2005/blog/78904

但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:

排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。

好了,上代码:

(1)定义UDF:

01 package com.example.hive.udf;
02 import org.apache.hadoop.hive.ql.exec.UDF;
03       
04 public final class Rank extends UDF{
05     private int  counter;
06     private String last_key;
07     public int evaluate(final String key){
08       if ( !key.equalsIgnoreCase(this.last_key) ) {
09          this.counter = 0;
10          this.last_key = key;
11       }
12       return this.counter++;
13     }
14 }

(2)注册jar、建表、导数据,查询:

1 add jar Rank.jar;
2 create temporary function rank as ‘com.example.hive.udf.Rank‘;
3 create table city(id int,cname string,value int) row format delimited fields terminated by ‘ ‘;
4 LOAD DATA LOCAL INPATH ‘city.txt‘ OVERWRITE INTO TABLE city;
5 select cname, value from (
6     select cname,rank(cname) csum,value from (
7         select id, cname, value from city distribute by cname sort by cname,value desc
8     )a
9 )b where csum < 2;

(3)结果:

1 bj  600
2 bj  300
3 sh  900
4 sh  400
5 wh  500
6 wh  200

可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。

REF:hive中分组取前N个值的实现

http://baiyunl.iteye.com/blog/1466343

3、最后我们来看一下原生态的MR:

01 import java.io.IOException;
02 import java.util.TreeSet;
03  
04 import org.apache.hadoop.conf.Configuration;
05 import org.apache.hadoop.fs.Path;
06 import org.apache.hadoop.io.IntWritable;
07 import org.apache.hadoop.io.LongWritable;
08 import org.apache.hadoop.io.Text;
09 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.GenericOptionsParser;
15  
16 public class GroupTopK {
17     // 这个 MR 将会取得每组年龄中 id 最大的前 3 个
18     // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
19     public static class GroupTopKMapper extends
20             Mapper<LongWritable, Text, IntWritable, LongWritable> {
21         IntWritable outKey = new IntWritable();
22         LongWritable outValue = new LongWritable();
23         String[] valArr = null;
24  
25         public void map(LongWritable key, Text value, Context context)
26                 throws IOException, InterruptedException {
27             valArr = value.toString().split("\t");
28             outKey.set(Integer.parseInt(valArr[2]));// age int
29             outValue.set(Long.parseLong(valArr[0]));// id long
30             context.write(outKey, outValue);
31         }
32     }
33  
34     public static class GroupTopKReducer extends
35             Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
36  
37         LongWritable outValue = new LongWritable();
38  
39         public void reduce(IntWritable key, Iterable<LongWritable> values,
40                 Context context) throws IOException, InterruptedException {
41             TreeSet<Long> idTreeSet = new TreeSet<Long>();
42             for (LongWritable val : values) {
43                 idTreeSet.add(val.get());
44                 if (idTreeSet.size() > 3) {
45                     idTreeSet.remove(idTreeSet.first());
46                 }
47             }
48             for (Long id : idTreeSet) {
49                 outValue.set(id);
50                 context.write(key, outValue);
51             }
52         }
53     }
54  
55     public static void main(String[] args) throws Exception {
56         Configuration conf = new Configuration();
57         String[] otherArgs = new GenericOptionsParser(conf, args)
58                 .getRemainingArgs();
59  
60         System.out.println(otherArgs.length);
61         System.out.println(otherArgs[0]);
62         System.out.println(otherArgs[1]);
63  
64         if (otherArgs.length != 3) {
65             System.err.println("Usage: GroupTopK <in> <out>");
66             System.exit(2);
67         }
68         Job job = new Job(conf, "GroupTopK");
69         job.setJarByClass(GroupTopK.class);
70         job.setMapperClass(GroupTopKMapper.class);
71         job.setReducerClass(GroupTopKReducer.class);
72         job.setNumReduceTasks(1);
73         job.setOutputKeyClass(IntWritable.class);
74         job.setOutputValueClass(LongWritable.class);
75         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
76         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
77         System.exit(job.waitForCompletion(true) ? 0 1);
78     }
79 }

hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1

结果:

hadoop fs -cat /tmp/1/part-r-00000
0       12869695
0       12869971
0       12869976
1       12869813
1       12869870
1       12869951

......

数据验证:

awk ‘$3==0{print $1}‘ record_new.txt|sort -nr|head -3
12869976
12869971
12869695

可以看到结果没有问题。

注:测试数据由以下脚本生成:

http://my.oschina.net/leejun2005/blog/76631

PS:

如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。

pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。

附几个HIVE UDAF链接,有兴趣的同学自己看下:

Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
编写Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113

时间: 08-20

Pig、Hive、MapReduce 解决分组 Top K 问题(转)的相关文章

pig询问top k,每个返回hour和ad_network_id最大的两个记录(SUBSTRING,order,COUNT_STAR,limit)

pig里面有一个TOP功能.我不知道为什么用不了.有时间去看看pig源代码. SET job.name 'top_k'; SET job.priority HIGH; --REGISTER piggybank.jar; REGISTER wizad-etl-udf-0.1.jar; --DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader(); DEFINE SequenceFileLoader

Top K问题的两种解决思路

Top K问题在数据分析中非常普遍的一个问题(在面试中也经常被问到),比如: 从20亿个数字的文本中,找出最大的前100个. 解决Top K问题有两种思路, 最直观:小顶堆(大顶堆 -> 最小100个数): 较高效:Quick Select算法. LeetCode上有一个问题215. Kth Largest Element in an Array,类似于Top K问题. 1. 堆 小顶堆(min-heap)有个重要的性质--每个结点的值均不大于其左右孩子结点的值,则堆顶元素即为整个堆的最小值.J

Top k 问题

Top K的问题: 给出大量数据,找出其中前K个最大(小)的数,或者在海量数据中找到出现频率最好的前K个数. 一.给出大量数据(N个),找出其中前K个最大数(没有其他资源上的限制) 1.使用排序算法 直接使用排序算法,如快速排序,然后遍历找到最大的K个数.时间复杂度为O(NlogN): 2.部分排序 因为,只要求出前K个最大值,所以我们不必全部排好.思路是:随意选出K个数形成一个数组,然后按从大到小进行排序,再从剩下的数中,选取一个数和数组中的最小值进行比较,若小于最小值,则取下一个数继续比较:

Top k问题(线性时间选择算法)

问题描述:给定n个整数,求其中第k小的数. 分析:显然,对所有的数据进行排序,即很容易找到第k小的数.但是排序的时间复杂度较高,很难达到线性时间,哈希排序可以实现,但是需要另外的辅助空间. 这里我提供了一种方法,可以在O(n)线性时间内解决Top k问题.关于时间复杂度的证明,不再解释,读者可以查阅相关资料.具体的算法描述如下: 算法:LinearSelect(S,k) 输入:数组S[1:n]和正整数k,其中1<=k<=n: 输出:S中第k小的元素 1. If  n<20  Then  

面试题-10亿个数中找出最大的10000个数(top K问题)

一个较好的方法:先拿出10000个建立小根堆,对于剩下的元素,如果大于堆顶元素的值,删除堆顶元素,再进行插入操作,否则直接跳过,这样知道所有元素遍历完,堆中的10000个就是最大的10000个.时间复杂度: m + (n-1)logm = O(nlogm) 优化的方法:可以把所有10亿个数据分组存放,比如分别放在1000个文件中(如果是字符串hash(x)%M).对每个文件,建立大小为10000的小根堆,然后按有序数组的合并合并起来,取出最大的10000个即是答案. top K问题 在大规模数据

排序算法Java版,以及各自的复杂度,以及由堆排序产生的top K问题

常用的排序算法包括: 冒泡排序:每次在无序队列里将相邻两个数依次进行比较,将小数调换到前面, 逐次比较,直至将最大的数移到最后.最将剩下的N-1个数继续比较,将次大数移至倒数第二.依此规律,直至比较结束.时间复杂度:O(n^2) 选择排序:每次在无序队列中"选择"出最大值,放到有序队列的最后,并从无序队列中去除该值(具体实现略有区别).时间复杂度:O(n^2) 直接插入排序:始终定义第一个元素为有序的,将元素逐个插入到有序排列之中,其特点是要不断的 移动数据,空出一个适当的位置,把待插

Top K问题!!!!!!!!!!!!!

转:http://blog.csdn.net/boo12355/article/details/11788655 Top K 算法详解应用场景: 搜索引擎会通过日志文件把用户每次检索使用的所有检索串都记录下来,每个查询串的长度为1-255字节.        假设目前有一千万个记录(这些查询串的重复度比较高,虽然总数是1千万,但如果除去重复后,不超过3百万个.一个查询串的重复度越高,说明查询它的用户越多,也就是越热门.),请你统计最热门的10个查询串,要求使用的内存不能超过1G. 必备知识:什么

大数据 hadoop pig hive 关系

初接触hadoop技术的朋友肯定会对它体系下寄生的个个开源项目糊涂了,我敢保证Hive,Pig,HBase这些开源技术会把你搞的有些糊涂,不要紧糊涂的不止你一个,如某个菜鸟的帖子的疑问,when to use Hbase and when to use Hive?....请教了^_^没关系这里我帮大家理清每个技术的原理和思路. Pig 一种操作hadoop的轻量级脚本语言,最初又雅虎公司推出,不过现在正在走下坡路了.当初雅虎自己慢慢退出pig的维护之后将它开源贡献到开源社区由所有爱好者来维护.不

pig hive hbase比较

Pig 一种操作hadoop的轻量级脚本语言,最初又雅虎公司推出,不过现在正在走下坡路了.当初雅虎自己慢慢退出pig的维护之后将它开源贡献到开源社区由所有爱好者来维护.不过现在还是有些公司在用,不过我认为与其使用pig不如使用hive.:) Pig是一种数据流语言,用来快速轻松的处理巨大的数据. Pig包含两个部分:Pig Interface,Pig Latin. Pig可以非常方便的处理HDFS和HBase的数据,和Hive一样,Pig可以非常高效的处理其需要做的,通过直接操作Pig查询可以节