一文带你搞清楚什么是“数据倾斜”

? 什么是数据倾斜

我们在用hive取数的时候,有的时候只是跑一个简单的join语句,但是却跑了很长的时间,有的时候我们会觉得是集群资源不够导致的,但是很大情况下就是出现了"数据倾斜"的情况。

在了解数据倾斜之前,我们应该有一个常识,就是现实生活中的数据分布是不均匀的,俗话说"28定理",80%的财富集中在20%的人手中之类的故事相信大家都看得不少。所以,在我们日常处理的现实数据中,也是符合这种数据分布的,数据倾斜一般有两种情况:

数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被partition分配到一个分区里,造成了"一个人累死,其他人闲死"的情况,这违背了并行计算的初衷,整体的效率是十分低下的。

? 数据倾斜的原因

当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。

而导致这个的原因,大致可以分为下面几点:

具体可以体现在下面的常见操作:

备注:图片文字内容来自网络

? Hadoop计算框架的特点

在了解如何避免数据倾斜之前,我们先来看看Hadoop框架的特性:

?? 优化的常用手段

说的是优化手段,但更多的是"踩坑"的经验之谈。

?? 优化之道:

?? SQL语句调节:

看完上面的经验总结还是有点懵逼?说实话我也是的,太多的信息量冲击,不过我们可以收藏起来以后继续看多几次,加深印象。

接下来,我们将从一些具体的案例来讲讲SQL语句优化的技巧,非常常用,对我们日常写SQL很有帮助。

优化案例

场景1:RAC常用(real application clusters的缩写,译为“实时应用集群”)

有一张user表,为卖家每天收入表,user_id,dt(日期)为key,属性有主营类目(cat),指标有交易金额,交易比数。每天要取每个user的主营类目在过去10天的总收入,总比数。

常规做法:取每个user_id最近一天的主营类目,存入临时表t1,汇总过去10天的总交易金额,交易比数,存入临时表t2,连接t1,t2,得到最终的结果。

优化做法:

SELECT user_id , substr(MAX(concat(dt, cat)), 9) AS main_cat , SUM(qty), SUM(amt)FROM usersWHERE dt BETWEEN 20101201 AND 20101210GROUP BY user_id;

场景2:空值产生的数据倾斜(最常见的现象)

日志中,常会有信息丢失的问题,比如全网日志中的 user_id,如果取其中的 user_id 和 bmw_users 关联,会碰到数据倾斜的问题。

解决方式1:user_id为空的不参与关联

SELECT *FROM log a JOIN bmw_users b ON a.user_id IS NOT NULL

AND a.user_id = b.user_idUNION ALLSELECT *FROM log aWHERE a.user_id IS NULL;

解决方式2:赋与空值分新的key值

SELECT *FROM log a LEFT JOIN bmw_users b ON CASE

WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand)

ELSE a.user_id END = b.user_id;

结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1 log表被读取了两次,jobs是2。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

场景3:不同数据类型关联产生数据倾斜

一张表 s8_log,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8_log 中有字符串商品 id,也有数字的商品 id。字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。

解决方式:把数字类型转换成字符串类型

SELECT *FROM s8_log a LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);

场景4:多表 union all 会优化成一个 job

推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。

SELECT *FROM effect a JOIN (

SELECT auction_id AS auction_id FROM auctions UNION ALL

SELECT auction_string_id AS auction_id FROM auctions ) b ON a.auction_id = b.auction_id;

结论:这样子比分别过滤数字 id,字符串 id ,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个 对,。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。

场景5:消灭子查询内的 group by

原写法:

SELECT *FROM (

SELECT *

FROM t1 GROUP BY c1, c2, c3 UNION ALL

SELECT *

FROM t2 GROUP BY c1, c2, c3) t3GROUP BY c1, c2, c3;

优化写法:

SELECT *FROM (

SELECT *

FROM t1 UNION ALL

SELECT *

FROM t2) t3GROUP BY c1, c2, c3;

结论:从业务逻辑上说,子查询内的 group by 功能与外层的 group by 重复,除非子查询内有 count(distinct)。经过测试,并未出现 union all 的 hive bug,数据是一致的。MR 的作业数由3减少到1。t1 相当于一个目录,t2 相当于一个目录,对map reduce程序来说,t1,t2 可以做为 map reduce 作业的 mutli inputs。这可以通过一个 map reduce 来解决这个问题。Hadoop的计算框架,不怕数据多,怕作业数多。

场景6:消灭子查询内的count(distinct),max,min

原写法:

SELECT c1, c2, c3, sum(pv)FROM (

SELECT c1, c2, c3, COUNT(c4)

FROM t1 GROUP BY c1, c2, c3 UNION ALL

SELECT c1, c2, c3, COUNT(DISTINCT c4)

FROM t2 GROUP BY c1, c2, c3) t3GROUP BY c1, c2, c3;

这种我们不能直接union 再groupby,因为其中有一个表的操作用到了去重,这种情况,我们可以通过建立临时表来消灭这种数据倾斜问题。

优化写法:

INSERT INTO t4SELECT c1, c2, c3, COUNT(DISTINCT c4)FROM t2GROUP BY c1, c2, c3;SELECT c1, c2, c3, SUM(pv)FROM (

SELECT c1, c2, c3, COUNT(c4)

FROM t1 UNION ALL

SELECT *

FROM t4) t3GROUP BY c1, c2, c3;

场景7:两张大表join

有两张表,一张是用户访问日志表log,一张是用户表users,其中log表上T,user表也上G,如何每日做到快速连接呢?

解决方法:

SELECT *FROM log a LEFT JOIN (

SELECT d.*

FROM (

SELECT DISTINCT memberid FROM log ) c JOIN users d ON c.memberid = d.memberid ) x ON a.memberid = b.memberid;

上面代码的意思,就是我们可以通过缩小主键的范围来达到减少表的连接操作,比如说限值某段时间,这样子,memberid就会有所减少了,而不是全量数据。

场景8:reduce的时间过长

还是场景7的例子,假设一个memberid对应的log里有很多数据,那么最后合并的时候,也是十分耗时的,所以,这里需要找到一个方法来解决这种reduce分配不均的问题。

解决方法:

SELECT *FROM log a LEFT JOIN (

SELECT memberid, number FROM users d JOIN num e ) b ON a.memberid = b.memberid AND mod(a.pvtime, 30) + 1 = b.number;

解释一下,上面的num是一张1列30行的表,对应1-30的正整数,把users表膨胀成N份(基于倾斜程度做一个合适的选择),然后把log数据根据memberid和pvtime分到不同的reduce里去,这样可以保证每个reduce分配到的数据可以相对均匀。

场景9:过多的where条件

有的时候,我们会写超级多的where条件来限制查询,其实这样子是非常低效的,主要原因是因为这个and条件hive在生成执行计划时产生了一个嵌套层次很多的算子。

解决方案:

1)把筛选条件对应的值写入一张小表,再一次性join到主表;

2)或者写个udf(user-defined function,用户定义函数),把这些预设值读取进去,udf来完成这个and数据过滤操作。

场景10:分组结果很多,但是你只需要topK

原写法:

SELECT mid, url, COUNT(1) AS cntFROM (

SELECT *

FROM r_atpanel_log WHERE pt = '20190610'

AND pagetype = 'normal') subqGROUP BY mid, urlORDER BY cnt DESCLIMIT 15;

优化写法:

SELECT *FROM (

SELECT mid, url, COUNT(1) AS cnt FROM (

SELECT *

FROM r_atpanel_log WHERE pt = '20190610'

AND pagetype = 'normal'

) subq GROUP BY mid, url) subq2WHERE cnt > 100ORDER BY cnt DESCLIMIT 15;

可以看出,我们先过滤掉无关的内容,再进行排序,这样子快很多。

? References

本文作者:SAMshare 来源:CIO之家的朋友们

CIO之家 www.ciozj.com 微信公众号:imciow

展开阅读全文

页面更新:2024-03-15

标签:随机数   数据   作业   写法   字符串   场景   分配   情况   数字   商品

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top