文档章节

Hive 数据倾斜总结

Avner
 Avner
发布于 2017/08/11 13:32
字数 3216
阅读 27
收藏 0

本文内容主要转至以下两篇文章:

http://www.cnblogs.com/skyl/p/4776083.html

http://sunyi514.github.io/2013/09/01/%E6%95%B0%E6%8D%AE%E4%BB%93%E5%BA%93%E4%B8%AD%E7%9A%84sql%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%EF%BC%88hive%E7%AF%87%EF%BC%89/

map/reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢(或卡在完成率99%左右),导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。

 

倾斜分成group by造成的倾斜和join造成的倾斜,需要分开看。

 

GroupBy倾斜

group by造成的倾斜相对来说比较容易解决。hive提供两个参数可以解决,一个是hive.map.aggr默认值已经为true,他的意思是做map aggregation,也就是在mapper里面做聚合。这个方法不同于直接写mapreduce的时候可以实现的combiner,但是却实现了类似combiner的效果。事实上各种基于mr的框架如pig,cascading等等用的都是map aggregation(或者叫partial aggregation)而非combiner的策略,也就是在mapper里面直接做聚合操作而不是输出到buffer给combiner做聚合对于map aggregation,hive还会做检查,如果aggregation的效果不好,那么hive会自动放弃map aggregation。判断效果的依据就是经过一小批数据的处理之后,检查聚合后的数据量是否减小到一定的比例,默认是0.5,由hive.map.aggr.hash.min.reduction这个参数控制。所以如果确认数据里面确实有个别取值倾斜,但是大部分值是比较稀疏的,这个时候可以把比例强制设为1,避免极端情况下map aggr失效。

hive.map.aggr还有一些相关参数,比如map aggr的内存占用等,具体可以参考这篇文章http://dev.bizo.com/2013/02/map-side-aggregations-in-apache-hive.html

另一个参数是hive.groupby.skewindata。这个参数的意思是做reduce操作的时候,拿到的key并不是所有相同值给同一个reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。所以这个参数其实跟hive.map.aggr做的是类似的事情,只是拿到reduce端来做,而且要额外启动一轮job,所以其实不怎么推荐用,效果不明显

另外需要注意的是count distinct操作往往需要改写SQL,可以按照下面这么做:

/*改写前*/
select a, count(distinct b) as c from tbl group by a;

/*改写后*/
select a, count(*) as c from (select a, b from tbl group by a, b) group by a;

 

Join倾斜

join造成的倾斜,常见情况是不能做map join的两个表(能做map join的话基本上可以避免倾斜),其中一个是行为表,另一个应该是属性表。比如我们有三个表,一个用户属性表users,一个商品属性表items,还有一个用户对商品的操作行为表日志表logs。假设现在需要将行为表关联用户表:

select * from logs a join users b on a.user_id = b.user_id;

其中logs表里面会有一个特殊用户user_id = 0,代表未登录用户,假如这种用户占了相当的比例,那么个别reduce会收到比其他reduce多得多的数据,因为它要接收所有user_id = 0的记录进行处理,使得其处理效果会非常差,其他reduce都跑完很久了它还在运行。

hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉hive这个join是个skew join,即:

set hive.optimize.skewjoin = true;

还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值。总结起来,skew join的流程可以用下图描述:

不过,这种方法还要去考虑阈值之类的情况,其实也不够通用所以针对join倾斜的问题,一般都是通过改写sql解决。对于上面这个问题,我们已经知道user_id = 0是一个特殊key,那么可以把特殊值隔离开来单独做join,这样特殊值肯定会转化成map join,非特殊值就是没有倾斜的普通join了:

select *
from (select * from logs where user_id = 0)  a 
join (select * from users where user_id = 0) b 
on a.user_id =  b.user_id
union all
select * 
from logs a join users b
on a.user_id <> 0 and a.user_id = b.user_id;

上面这种个别key倾斜的情况只是一种倾斜情况。最常见的倾斜是因为数据分布本身就具有长尾性质,比如我们将日志表和商品表关联:

select * from logs a join items b on a.item_id = b.item_id;

这个时候,分配到热门商品的reducer就会很慢,因为热门商品的行为日志肯定是最多的,而且我们也很难像上面处理特殊user那样去处理item这个时候就会用到加随机数的方法,也就是在join的时候增加一个随机数,随机数的取值范围n相当于将item给分散到n个reducer

select a.*, b.*
from (select *, cast(rand() * 10 as int) as r_id from logs)a
join (select *, r_id 
      from items 
      lateral view explode(range_list(1,10)) rl as r_id
      )b
on a.item_id = b.item_id and a.r_id = b.r_id

上面的写法里,对行为表的每条记录生成一个1-10的随机整数对于item属性表,每个item生成10条记录,随机key分别也是1-10,这样就能保证行为表关联上属性表。其中range_list(1,10)代表用udf实现的一个返回1-10整数序列的方法这个做法是一个解决join倾斜比较根本性的通用思路,就是如何用随机数将key进行分散。当然,可以根据具体的业务场景做实现上的简化或变化。

除了上面两类情况,还有一类情况是因为业务设计导致的问题,也就是说即使行为日志里面join key的数据分布本身并不明显倾斜,但是业务设计导致其倾斜。比如对于商品item_id的编码,除了本身的id序列,还人为的把item的类型也作为编码放在最后两位,这样如果类型1(电子产品)的编码是00,类型2(家居产品)的编码是01,并且类型1是主要商品类,将会造成以00为结尾的商品整体倾斜。这时,如果reduce的数量恰好是100的整数倍,会造成partitioner把00结尾的item_id都hash到同一个reducer,引爆问题这种特殊情况可以简单的设置合适的reduce值来解决,但是这种坑对于不了解业务的情况下就会比较隐蔽。

 

 

1.万能膏药:hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。
  第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的
  第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。 

1.1.参数调优:hive.map.aggr=true. Map端部分聚合,相当于Combiner 。

2. 大小表关联:
可以使用Map Join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce.

3. 大表和大表关联:
把空值NULL的key变成一个字符串加上随机数把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。例如:Demo1.空值数据倾斜 (下面的例子)。

4. count distinct大量相同特殊值:
count distinct时,将值为空的情况单独处理。如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union

Demo1.空值数据倾斜
场景:如日志中,常会有信息丢失的问题,比如全网日志中的user_id,如果取其中的user_id和bmw_users关联,会碰到数据倾斜的问题。
解决方法1: user_id为空的不参与关联

Select * From log a 
   Join bmw_users b 
     On a.user_id is not nullAnd a.user_id = b.user_id
Union all 
Select * from log a where a.user_id is null;

解决方法2 :赋予空值新的key值

Select * from log a 
  left outer Join bmw_users b 
on case when a.user_id is null then concat(‘dp_hive’,rand()) else a.user_id 
   end = b.user_id; 

结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。

方法1的log读取两次,jobs是2。方法2的job数是1。这个优化适合无效id(比如-99,’’,null等)产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题

Demo2.不同数据类型关联产生数据倾斜 
场景:一张表s8的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题,s8的日志中有字符串商品id,也有数字的商品id,类型是string的,但商品中的数字id是bigint的。
问题原因:把s8的商品id转成数字id做hash来分配reduce,所以字符串id的s8日志,都到一个reduce上了,解决的方法验证了这个猜测。
解决方法:把数字类型转换成字符串类型;

Select * from s8_log a
 Left outer join r_auction_auctions b
  On a.auction_id = cast(b.auction_id as string);

Demo3.大表Join的数据偏斜
MapReduce编程模型下开发代码需要考虑数据偏斜的问题,Hive代码也是一样。数据偏斜的原因包括以下两点:
  1. Map输出key数量极少,导致reduce端退化为单机作业。
  2. Map输出key分布不均,少量key对应大量value,导致reduce端单机瓶颈。

Hive中我们使用MapJoin解决数据偏斜的问题,即将其中的某个小表(全量)分发到所有Map端的内存进行Join,从而避免了reduce这要求分发的表可以被全量载入内存
极限情况下,Join两边的表都是大表,就无法使用MapJoin。这种问题最为棘手,目前已知的解决思路有两种:
1. 如果是上述情况1,考虑先对Join中的一个表去重,以此结果过滤无用信息。
  这样一般会将其中一个大表转化为小表,再使用MapJoin 。一个实例是广告投放效果分析,
  例如将广告投放者信息表i中的信息填充到广告曝光日志表w中,使用投放者id关联。因为实际广告投放者数量很少(但是投放者信息表i很大),因此可以考虑先在w表中去重查询所有实际广告投放者id列表,以此Join过滤表i,这一结果必然是一个小表,就可以使用MapJoin。

select /*+mapjoin(x)*/ * 
from log a 
left outer join 
  (
  select /*+mapjoin(c)*/ d.*
  from (select distinct user_id from log ) c
  join users d 
    on c.user_id = d.user_id
   ) x 
on a.user_id = b.user_id;

2. 如果是上述情况2,考虑切分Join中的一个表为多片,以便将切片全部载入内存,然后采用多次MapJoin得到结果。
一个实例是商品浏览日志分析,例如将商品信息表i中的信息填充到商品浏览日志表w中,使用商品id关联。但是某些热卖商品浏览量很大,造成数据偏斜。例如,以下语句实现了一个inner join逻辑,将商品信息表拆分成2个表:

select * from
       (
        select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
       from w 
         left outer join i sampletable(1 out of 2 on id) i1
       )
union all
       ( select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat
      from w 
        left outer join i sampletable(1 out of 2 on id) i2
       );

 

© 著作权归作者所有

共有 人打赏支持
上一篇: Hive 文件格式
下一篇: Hive分桶表总结
Avner
粉丝 8
博文 60
码字总数 54757
作品 0
杭州
程序员
私信 提问
hive 数据倾斜总结

数据倾斜总结 在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得...

八戒_o
2016/03/30
21
0
漫谈千亿级数据优化实践:数据倾斜

0x00 前言 引用 数据倾斜是大数据领域绕不开的拦路虎,当你所需处理的数据量到达了上亿甚至是千亿条的时候,数据倾斜将是横在你面前一道巨大的坎。 迈的过去,将会海阔天空!迈不过去,就要做...

GordonNemo
前天
0
0
Hadoop 中的数据倾斜

最近几次被问到关于数据倾斜的问题,这里找了些资料也结合一些自己的理解. 在并行计算中我们总希望分配的每一个task 都能以差不多的粒度来切分并且完成时间相差不大,但是集群中可能硬件不同...

HIVE
2016/07/02
38
0
浅析 Hadoop 中的数据倾斜

最近几次被问到关于数据倾斜的问题,这里找了些资料也结合一些自己的理解. 在并行计算中我们总希望分配的每一个task 都能以差不多的粒度来切分并且完成时间相差不大,但是集群中可能硬件不同...

大数据之路
2013/01/09
0
2
007.hive调优:大数据倾斜

-----成王败寇(陈小春) 原文章地址:http://www.tbdata.org/archives/2109 hive大数据倾斜总结 在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要...

片刻
2014/01/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Confluence 6 教程:在 Confluence 中导航

当你对 Confluence 有所了解后,你会发现 Confluence 使用起来非常简单。这个教程主要是针对你使用的 Confluence 界面进行一些说明,同时向你展示在那里可以进行一些通用的任务和操作。 空间...

honeymose
今天
2
0
sed, awk 练习

1. sed打印某行到某行之间的内容 2. sed 转换大小写 将单词首字母转化大写 将所有小写转化大写 3. sed 在某一行最后面添加一个数字 4. 删除某行到最后一行 解析: {:a;N;$!ba;d} :a : 是...

Fc丶
今天
2
0
babel6升级到7,jest-babel报错:Requires Babel "^7.0.0-0", but was loaded with "6.26.3".

自从将前端环境更新到babel7,jest-babel之前是基于babel6的,执行时候就会报:Requires Babel "^7.0.0-0", but was loaded with "6.26.3". 很烦,因为连续帮好几台电脑修复这个问题,所以记...

曾建凯
今天
1
0
探索802.11ax

802.11ax承诺在真实条件下改善峰值性能和最差情况。 如何改善今天的Wi-Fi? 在决定如何改进当前版本以外的Wi-Fi时,802.11ac,IEEE和Wi-Fi联盟调查了Wi-Fi部署和行为,以确定更广泛使用的障碍...

linuxprobe16
今天
2
0
使用linux将64G的SDCARD格式化为FAT32

一、命令如下: sudo fdisk -lsudo mkfs.vfat /dev/sda -Isudo fdisk /dev/sda Welcome to fdisk (util-linux 2.29.2). Changes will remain in memory only, until you decide to wri......

mbzhong
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部