文档章节

spark 数据倾斜问题

西二旗之猫
 西二旗之猫
发布于 2017/04/21 17:29
字数 1374
阅读 0
收藏 0

spark数据倾斜问题

数据倾斜问题的描述

在Spark中,同一个Stage的不同Partition可以并行处理,而具有依赖关系的不同Stage之间是串行处理的。假设某个Spark Job分为Stage 0和Stage 1两个Stage,且Stage 1依赖于Stage 0,那Stage 0完全处理结束之前不会处理Stage 1。而Stage 0可能包含N个Task,这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成,而另外一个Task却耗时1分钟,那该Stage的总时间至少为1分钟。换句话说,一个Stage所耗费的时间,主要由最慢的那个Task决定。换句话说 木桶能装多少水取决于最短的那个木板.

由于同一个stage的所有task都执行相同的计算逻辑,所有在机器节点性能相同的情况下,不同task消耗的时间主要取决于计算的数据数量的大小,所有如果某一个task的数据量明显大于其他的,那么这种情况就是数据发生了倾斜,

数据倾斜的解决方法

数据倾斜引起的原因很多,所有不能说某一个方法就能解决数据倾斜,其实数据倾斜本身不是问题,即使发生了数据倾斜计算任务一样可以完成并且计算出正确的结果,但是代价是浪费了资源和时间,解决数据倾斜本质上是对计算过程的调优,

解决数据倾斜的第一步应该是定位问题发生的位置,打开spark 任务监控ui找到发生倾斜的task,即明显输入数量大于平均值的task或者用时明显超过平均值的task,接下来就是找到 这个task对应的stage 进而找到对应的spark算子,这样就可以定位到引起数据倾斜的代码,下面就是参考下文中的解决方式解决

引起数据倾斜的原因

在进行shuffle的时候需要把形同的key拉取到某一个节点的task中进行处理,比如聚合操作或者join操作,如果某一个key对应的数据量远大于平均值就发生了数据倾斜

常见数据倾斜的原因和解决方式

  • 第一种 ,提高shuffle并行度,给shuffle算子传入一个值 比如比如reduceByKey(1000),可以让原本分配给一个task的多个key分配给多个task,进而减少倾斜问题
  • 第二种 对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,采用分阶段聚合(局部聚合+全局聚合),第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)
  • 将 reduce join 转换为map join,类似于hive的map join 在小表join大表的时候把小表读入内存在map中完成对每个大表的join,具体的操作是不适用join操作,将数据量较小的rdd直接通过collect算子拉取到Driver端的内存中,然后创建一个broadcast(广播)变量,然后将小的rdd赋值给广播变量,然后对目标rdd执行map算子,在map内部读取广播变量里面小的rdd的全部数据进行计算,及与当前rdd的key进行连接计算,这种方式从根本上避免了shuffle,但是只是用于大表和小表的join的情况
  • 采样倾斜key并且拆分join操作之后再union合并到一起,具体操作先用sample算子采样,之后统计一个每个key的数量,找到数据量比较大的那几个key,将这几个key对应的数据单独拆分出一个rdd,之后给每个key打上n以内随机数的前缀,之后把需要join的rdd中的对应的key也拆分出一个新的rdd,然后把每个key对应的数据膨胀为原来的n倍,注意这里的n对应n以内随机数的n,并且这些膨胀后的数据前面顺序加上0-n,之后对这两个rdd进行join,因为已经将原来key拆分成了n分,所以会分散到多个task中去执行join,再这之后再将原有的普通rdd照常join就好了,再两个join执行完之后 union 合并到一起

 

 

© 著作权归作者所有

西二旗之猫
粉丝 7
博文 54
码字总数 39074
作品 0
私信 提问
如何避免Spark SQL做数据导入时产生大量小文件

什么是小文件? 生产上,我们往往将Spark SQL作为Hive的替代方案,来获得SQL on Hadoop更出色的性能。因此,本文所讲的是指存储于HDFS中小文件,即指文件的大小远小于HDFS上块(dfs.block.s...

Kent_Yao
02/01
0
0
Apache Spark 系列技术直播 - Spark SQL进阶与实战

Spark SQL进阶与实战 Spark相关组件介绍 Spark及其依赖组件 Hive Metastore介绍 Spark Thrift Server介绍 表与ETL Spark表基本概念 Spark建表最佳实践 Spark ETL最佳实践 动态分区表示例分析...

开源大数据
2018/12/05
0
0
Spark 从 Kafka 读数并发问题

经常使用 Apache Spark 从 Kafka 读数的同学肯定会遇到这样的问题:某些 Spark 分区已经处理完数据了,另一部分分区还在处理数据,从而导致这个批次的作业总消耗时间变长;甚至导致 Spark 作...

Kafka
2018/09/09
0
0
漫谈千亿级数据优化实践:数据倾斜

0x00 前言 引用 数据倾斜是大数据领域绕不开的拦路虎,当你所需处理的数据量到达了上亿甚至是千亿条的时候,数据倾斜将是横在你面前一道巨大的坎。 迈的过去,将会海阔天空!迈不过去,就要做...

GordonNemo
2018/11/14
23
0
Spark性能优化篇三:数据倾斜调优

Spark性能优化篇三:数据倾斜调优 前言 1.数据倾斜调优 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 某个task执行特别慢的情况 某个task莫名其妙内存溢出...

u010262291
2018/05/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
5
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部