Hive JOIN使用详解

转自http://shiyanjun.cn/archives/588.html

Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL。有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job。 对于最基本的HQL查询我们不再累述,这里主要说明Hive中进行统计分析时使用到的JOIN操作。在说明Hive JOIN之前,我们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助我们理解HQL转换到底层的MR Job后是如何执行的。我们重点说明MapReduce执行过程中,从Map端到Reduce端这个过程(Shuffle)的执行情况,如图所示(来自《Hadoop: The Definitive Guide》):

基本执行过程,描述如下:

  1. 一个InputSplit输入到map,会运行我们实现的Mapper的处理逻辑,对数据进行映射操作。
  2. map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,可以通过io.sort.mb配置)。
  3. map自带的buffer使用容量达到一定门限(默认0.80或80%,可以通过io.sort.spill.percent配置),一个后台线程会准备将buffer中的数据写入到磁盘。
  4. 这个后台线程在将buffer中数据写入磁盘之前,会首先将buffer中的数据进行partition(分区,partition数为Reducer的个数),对于每个的数据会基于Key进行一个in-memory排序。
  5. 排序后,会检查是否配置了Combiner,如果配置了则直接作用到已排序的每个partition的数据上,对map输出进行化简压缩(这样写入磁盘的数据量就会减少,降低I/O操作开销)。
  6. 现在可以将经过处理的buffer中的数据写入磁盘,生成一个文件(每次buffer容量达到设置的门限,都会对应着一个写入到磁盘的文件)。
  7. map任务结束之前,会对输出的多个文件进行合并操作,合并成一个文件(若map输出至少3个文件,在多个文件合并后写入之前,如果配置了Combiner,则会运行来化简压缩输出的数据,文件个数可以通过min.num.splits.for.combine配置;如果指定了压缩map输出,这里会根据配置对数据进行压缩写入磁盘),这个文件仍然保持partition和排序的状态。
  8. reduce阶段,每个reduce任务开始从多个map上拷贝属于自己partition(map阶段已经做好partition,而且每个reduce任务知道应该拷贝哪个partition;拷贝过程是在不同节点之间,Reducer上拷贝线程基于HTTP来通过网络传输数据)。
  9. 每个reduce任务拷贝的map任务结果的指定partition,也是先将数据放入到自带的一个buffer中(buffer默认大小为Heap内存的70%,可以通过mapred.job.shuffle.input.buffer.percent配置),如果配置了map结果进行压缩,则这时要先将数据解压缩后放入buffer中。
  10. reduce自带的buffer使用容量达到一定门限(默认0.66或66%,可以通过mapred.job.shuffle.merge.percent配置),或者buffer中存放的map的输出的数量达到一定门限(默认1000,可以通过mapred.inmem.merge.threshold配置),buffer中的数据将会被写入到磁盘中。
  11. 在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。
  12. 当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件,这时开始执行合并操作,并保持每个map输出数据中Key的有序性,将多个文件合并成一个文件(在reduce端可能存在buffer和磁盘上都有数据的情况,这样在buffer中的数据可以减少一定量的I/O写入操作开销)。
  13. 最后,执行reduce阶段,运行我们实现的Reducer中化简逻辑,最终将结果直接输出到HDFS中(因为Reducer运行在DataNode上,输出结果的第一个replica直接在存储在本地节点上)。

通过上面的描述我们看到,在MR执行过程中,存在Shuffle过程的MR需要在网络中的节点之间(Mapper节点和Reducer节点)拷贝数据,如果传输的数据量很大会造成一定的网络开销。而且,Map端和Reduce端都会通过一个特定的buffer来在内存中临时缓存数据,如果无法根据实际应用场景中数据的规模来使用Hive,尤其是执行表的JOIN操作,有可能很浪费资源,降低了系统处理任务的效率,还可能因为内存不足造成OOME问题,导致计算任务失败。 下面,我们说明Hive中的JOIN操作,针对不同的JOIN方式,应该如何来实现和优化:

生成一个MR Job

多表连接,如果多个表中每个表都使用同一个列进行连接(出现在JOIN子句中),则只会生成一个MR Job,例如:

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key1)

三个表a、b、c都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个JOIN子句中,从而只生成一个MR Job。

生成多个MR Job

多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job,例如:

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key2)

三个表基于2个字段进行连接,这两个字段b.key1和b.key2同时出现在b表中。连接的过程是这样的:首先a和b表基于a.key和b.key1进行连接,对应着第一个MR Job;表a和b连接的结果,再和c进行连接,对应着第二个MR Job。

表连接顺序优化

多表连接,会转换成多个MR Job,每一个MR Job在Hive中称为JOIN阶段(Stage)。在每一个Stage,按照JOIN顺序中的最后一个表应该尽量是大表,因为JOIN前一阶段生成的数据会存在于Reducer的buffer中,通过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据可能是JOIN顺序中,前面表连接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的Key,与大表中的指定Key进行连接,速度会更快,也可能避免内存缓冲区溢出。例如:

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key1)

这个JOIN语句,会生成一个MR Job,在选择JOIN顺序的时候,数据量相比应该是b < c,表a和b基于a.key = b.key1进行连接,得到的结果(基于a和b进行连接的Key)会在Reducer上缓存在buffer中,在与c进行连接时,从buffer中读取Key(a.key=b.key1)来与表c的c.key进行连接。 另外,也可以通过给出一些Hint信息来启发JOIN操作,这指定了将哪个表作为大表,从而得到优化。例如:

1 SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON(c.key = b.key1)

上述JOIN语句中,a表被视为大表,则首先会对表b和c进行JOIN,然后再将得到的结果与表a进行JOIN。

基于条件的LEFT OUTER JOIN优化

左连接时,左表中出现的JOIN字段都保留,右表没有连接上的都为空。对于带WHERE条件的JOIN语句,例如:

1 SELECT a.val, b.val FROM LEFT OUTER JOIN ON (a.key=b.key)
2 WHERE a.ds=‘2009-07-07‘ AND b.ds=‘2009-07-07‘

执行顺序是,首先完成2表JOIN,然后再通过WHERE条件进行过滤,这样在JOIN过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。可以进行优化,将WHERE条件放在ON后,例如:

1 SELECT a.val, b.val FROM LEFT OUTER JOIN b
2 ON (a.key=b.key AND b.ds=‘2009-07-07‘ AND a.ds=‘2009-07-07‘)

这样,在JOIN的过程中,就对不满足条件的记录进行了预先过滤,可能会有更好的表现。

左半连接(LEFT SEMI JOIN)

左半连接实现了类似IN/EXISTS的查询语义,使用关系数据库子查询的方式实现查询SQL,例如:

1 SELECT a.key, a.value FROM WHERE a.key IN (SELECT b.key FROM b);

使用Hive对应于如下语句:

1 SELECT a.key, a.val FROM LEFT SEMI JOIN ON (a.key = b.key)

需要注意的是,在LEFT SEMI JOIN中,表b只能出现在ON子句后面,不能够出现在SELECT和WHERE子句中。 关于子查询,这里提一下,Hive支持情况如下:

  • 在0.12版本,只支持FROM子句中的子查询;
  • 在0.13版本,也支持WHERE子句中的子查询。

Map Side JOIN

Map Side JOIN优化的出发点是,Map任务输出后,不需要将数据拷贝到Reducer节点,降低的数据在网络节点之间传输的开销。 多表连接,如果只有一个表比较大,其他表都很小,则JOIN操作会转换成一个只包含Map的Job,例如:

1 SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM JOIN ON a.key = b.key

对于表a数据的每一个Map,都能够完全读取表b的数据。这里,表a与b不允许执行FULL OUTER JOIN、RIGHT OUTER JOIN。

BUCKET Map Side JOIN

我们先看两个表a和b的DDL,表a为:

1 CREATE TABLE a(key INT, othera STRING)
2 CLUSTERED BY(keyINTO 4 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED BY ‘\001‘
5 COLLECTION ITEMS TERMINATED BY ‘\002‘
6 MAP KEYS TERMINATED BY ‘\003‘
7 STORED AS SEQUENCEFILE;

表b为:

1 CREATE TABLE b(key INT, otherb STRING)
2 CLUSTERED BY(keyINTO 32 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED BY ‘\001‘
5 COLLECTION ITEMS TERMINATED BY ‘\002‘
6 MAP KEYS TERMINATED BY ‘\003‘
7 STORED AS SEQUENCEFILE;

现在要基于a.key和b.key进行JOIN操作,此时JOIN列同时也是BUCKET列,JOIN语句如下:

1 SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM JOIN ON a.key = b.key

并且表a有4个BUCKET,表b有32个BUCKET,默认情况下,对于表a的每一个BUCKET,都会去获取表b中的每一个BUCKET来进行JOIN,这回造成一定的开销,因为只有表b中满足JOIN条件的BUCKET才会真正与表a的BUCKET进行连接。 这种默认行为可以进行优化,通过改变默认JOIN行为,只需要设置变量:

1 set hive.optimize.bucketmapjoin = true

这样,JOIN的过程是,表a的BUCKET 1只会与表b中的BUCKET 1进行JOIN,而不再考虑表b中的其他BUCKET 2~32。 如果上述表具有相同的BUCKET,如都是32个,而且还是排序的,亦即,在表定义中在CLUSTERED BY(key)后面增加如下约束:

1 SORTED BY(key)

则上述JOIN语句会执行一个Sort-Merge-Bucket (SMB) JOIN,同样需要设置如下参数来改变默认行为,优化JOIN时只遍历相关的BUCKET即可:

1 set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
2 set hive.optimize.bucketmapjoin = true;
3 set hive.optimize.bucketmapjoin.sortedmerge = true;
时间: 07-15

Hive JOIN使用详解的相关文章

Hadoop Hive sql语法详解

Hive 是基于Hadoop 构建的一套数据仓库分析系统,它提供了丰富的SQL查询方式来分析存储在Hadoop 分布式文件系统中的数据,可以将结构化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行,通过自己的SQL 去查询分析需要的内容,这套SQL 简称Hive SQL,使不熟悉mapreduce 的用户很方便的利用SQL 语言查询,汇总,分析数据.而mapreduce开发人员可以把己写的mapper 和reducer 作为插件来支持

Hive的配置详解和日常维护

Hive的配置详解和日常维护 一.Hive的参数配置详解 1>.mapred.reduce.tasks  默认为-1.指定Hive作业的reduce task个数,如果保留默认值,则Hive 自己决定应该使用多少个task. 2>.hive.mapred.mode  2.x下的默认值为strict,1.x以及之前的版本默认值为nonstrict.如果 设为strict,Hive将禁止一些危险的查询:分区表未用分区字段筛选: order by语句后未跟limit子句:join后没有on语句从而形

SQL中的JOIN语法详解

参考以下两篇博客: 第一个是 sql语法:inner join on, left join on, right join on详细使用方法 讲了 inner join, left join, right join的意义和用法. 第二个是 SQL中的left outer join,inner join,right outer join用法详解 讲了关系运算背后的数学原理,以及提到了更多类型的连接操作: inner join- 笛卡尔乘积再选取, left outer join, right out

T-SQL之JOIN关键字详解

这几天由于工作的需要,写了一个业务逻辑较复杂的存储过程,其中多次用到了JOIN.LEFT JOIN.RIGHT JOIN在处理表之间的逻辑的作用被渲染的淋漓尽致.说明一下,之前少处理数据库后台的经验,特别是没有好好的研究SQL的知识细节.不过也简单的学习了,自以为看明白了.就懂了.现在想来那时就是浅尝辄止.用了之后,对它的理解更深了.现在及时总结下.以备忘和日后查看.下面示例的demo代码是来自W3School.接下来直入主题. JOIN 1 SELECT Persons.LastName, P

Java线程join示例详解

Java线程的join方法可用于暂停当前线程的执行直至目标线程死亡.Thread中一共有三个join的重载方法. public final void join():该方法将当前线程放入等待队列中,直至被它调用的线程死亡为止.如果该线程被中断,则会抛出InterruptedException异常. public final synchronized void join(long millis):该方法用于让当前线程进入等待状态,直至被它调用的线程死亡或是经过millis毫秒.由于线程的执行依赖于操

[Hive] - Hive参数含义详解

hive中参数分为三类,第一种system环境变量信息,是系统环境变量信息:第二种是env环境变量信息,是当前用户环境变量信息:第三种是hive参数变量信息,是由hive-site.xml文件定义的以及当前hive会话定义的环境变量信息.其中第三种hive参数变量信息中又由hadoop hdfs参数(直接是hadoop的).mapreduce参数.metastore元数据存储参数.metastore连接参数以及hive运行参数构成. Hive-0.13.1-cdh5.3.6参数变量信息详解 参数

Hive的基本操作详解

一 Hive数据类型 1.1 基本数据类型 Hive数据类型 Java数据类型 长度 例子 TINYINT byte 1byte有符号整数 20 SMALINT short 2byte有符号整数 20 INT int 4byte有符号整数 20 BIGINT long 8byte有符号整数 20 BOOLEAN boolean 布尔类型,true或者false TRUE  FALSE FLOAT float 单精度浮点数 3.14159 DOUBLE double 双精度浮点数 3.14159

Hadoop Hive sql 语法详解

Hive 是基于Hadoop 构建的一套数据仓库分析系统,它提供了丰富的SQL查询方式来分析存储在Hadoop 分布式文件系统中的数据,可以将结构化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行,通过自己的SQL 去查询分析需要的内容,这套SQL 简称Hive SQL,使不熟悉mapreduce 的用户很方便的利用SQL 语言查询,汇总,分析数据.而mapreduce开发人员可以把己写的mapper 和reducer 作为插件来支持

hive sql 语法详解

Hive 是基于Hadoop 构建的一套数据仓库分析系统,它提供了丰富的SQL查询方式来分析存储在Hadoop 分布式文件系统中的数据,可以将结构 化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行,通过自己的SQL 去查询分析需 要的内容,这套SQL 简称Hive SQL,使不熟悉mapreduce 的用户很方便的利用SQL 语言查询,汇总,分析数据.而mapreduce开发人员可以把 己写的mapper 和reducer 作为插件