文档章节

Spark SQL 中 Broadcast Join 一定比 Shuffle Join 快?那你就错了。

o
 osc_k8v7r34l
发布于 07/06 11:07
字数 1370
阅读 13
收藏 0

行业解决方案、产品招募中!想赚钱就来传!>>>

本资料来自 Workday 的软件开发工程师 Jianneng Li 在 Spark Summit North America 2020 的 《On Improving Broadcast Joins in Spark SQL》议题的分享。

背景

相信使用 Apache Spark 进行数据分析的同学对 Spark 中的 Broadcast Join 比较熟悉,其在 Join 之前会把一端比较小的表广播到参与 Join 的 worker 端,具体如下:

On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

相比 Shuffle Join,Broadcast Join 的优势主要有:

  • 避免把大表的数据 shuffle 到其他节点;
  • 很自然地处理数据倾斜
On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

很多人得出结论:在 Broadcast Join 适用的情况下,Broadcast Join 是要比 Shuffle Join 快!但事实是这样的吗?

TPC-H 测试

在得出结论之前我们先来进行 TPC-H 测试,来看下是不是 Broadcast Join 一定要比 Shuffle Join 快。测试条件如下:

  • 数据集 10GB;
  • 查询:6千万条数据的 lineitem 表 join 1.5千万的 orders 表
  • Driver 的配置:1 core, 12 GB
  • Executor 的配置:一个 instance,18 cores, 102 GB
On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

从上面可以结果可以看出,Broadcast Join 比 Shuffle Join 跑的慢!

Broadcast Join 机制

在理解上面结果之前,我们先来看下 Broadcast Join 的运行机制。

On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

在进行 Broadcast Join 之前,Spark 需要把处于 Executor 端的数据先发送到 Driver 端,然后 Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多,比如我们把 spark.sql.autoBroadcastJoinThreshold 这个参数设置到 1G,但是我们的 Driver 端的内存值设置为 500M,那这种情况下会导致 Driver 端出现 OOM。
根据前面的分析,上面 TPC-H 结果慢是因为:

  • Driver 端需要 collects 1.5千万条的数据;
  • Driver 端构建 hashtable;
  • Driver 把构建好的 hashtable 发送到 Executor 端;
  • Executor deserializes hashtable。

所以说由于当前 Broadcast Join 的运行机制,这就导致即使在 Broadcast Join 适用的情况下,Broadcast Join 不一定比 Shuffle Join 快。

过往记忆大数据提示,大家如果对这部分代码感兴趣可以参看 BroadcastExchangeExec.scala 类的相关代码,其先调用 org.apache.spark.sql.execution.SparkPlan 类里面的 executeCollectIterator 方法,其主要是将数据从 Executor 发送到 Driver,大家可以看到里面调用了 getByteArrayRdd().collect()

private[spark] def executeCollectIterator(): (Long, Iterator[InternalRow]) = {
    val countsAndBytes = getByteArrayRdd().collect()
    val total = countsAndBytes.map(_._1).sum
    val rows = countsAndBytes.iterator.flatMap(countAndBytes => decodeUnsafeRows(countAndBytes._2))
    (total, rows)
}

然后到 relationFuture 变量初始化:

private1 lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
    SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
      sqlContext.sparkSession, BroadcastExchangeExec.executionContext) {
          try {
            // 这个地方就是前面说的将数据 Collect 到 Driver 端:
            val (numRows, input) = child.executeCollectIterator()
            // 这里省去了一部分代码

            // Construct the relation.
            val relation = mode.transform(input, Some(numRows))

            // 这里省去了一部分代码            

            val beforeBroadcast = System.nanoTime()
            longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast - beforeBuild)

            // Broadcast the relation
            // 这个地方就是前面说的需要先 broadcast 数据到 Executor 端
            val broadcasted = sparkContext.broadcast(relation)
            longMetric("broadcastTime") += NANOSECONDS.toMillis(
              System.nanoTime() - beforeBroadcast)
            val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
            SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
            promise.trySuccess(broadcasted)
            broadcasted
          } catch {

            // 这里省去了一部分代码   
          }
    }
}

提升 Broadcast Join 的性能

针对上面的分析,我们能不能不把数据 collect 到 Driver 端,而直接在 Executor 端之间进行数据交换呢?这就是 Workday 的工程师团队给我们带来的 Executor 端的 broadcast,这项工作可以参见 SPARK-17556。我们来看看 Executor 端的 broadcast 工作原理:

  • Executors 把 Join 需要的数据 broadcasted 给其他 Executors;
  • Driver 端只负责记录 Executors 端的 block 信息,这样其他 Executor 就可以知道 block 可以从哪些 Executor 获取。

具体流程如下:

On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

测试结果

Workday 的工程师分别测试了以下三种测试场景:

  • 数据量不变,分别测试不同 core 的性能;
  • lineitem 表大小不同测性能;
  • 加大 orders 表的大小

结果如下:

On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop
On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop
On Improving Broadcast Joins in Spark SQL
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

总结起来就是:

  • 在数据量一样的情况下,如果 core 的个数比较多,Shuffle Join 是有优势的;
  • 如果非广播的表数据量数据量越来越大,Broadcast Join 是有优势的;
  • 如果加大广播表的数据量,Driver 端的 Broadcast 是跑不出结果,Executor 的 Broadcast Join 是比较快的。

根据上面的结论,所以大家要知道 Broadcast 不一定比 Shuffle 快。另外,Executor 端的 Broadcast 特性是2016年9月就提的,截止到最新的 Apache Spark 3.0.0 这个功能还没有合并到主分支,如果大家有需要这个,可以自行合并。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark SQL 中 Broadcast Join 一定比 Shuffle Join 快?那你就错了。】(https://www.iteblog.com/archives/9837.html)

喜欢 (0) 分享 (0)
o
粉丝 0
博文 68
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
数据库代码辅助工具--MaoCaiJun.Database

MaoCaiJun.DataBase 是一个用于 Microsoft Visual Studio 的数据库代码生成组件。它是基于 xml 文件的代码创建工具,支持sql2000,sql2005,sql2008,access, SQLite MaoCaiJun.Database 数据库...

mccj
2013/02/06
2.2K
1
SQLet

SQLet 可以让你通过简单的一个命令就从文本文件中执行多个 SQL 语句,同时也可以同时执行多个文件文件。 使用方法:>sqlet.py -d';' -A file1.txt -B file2.txt 'select * from A,B where a2...

匿名
2013/03/13
833
0
浏览器中的scheme解释器--SchemeScript

一个用javascript实现的scheme解释器,可以运行在浏览器中或node.js中。 刚刚看到编译原理与实践第二章,一时兴起,想写个以前就想写的scheme的解释器。昨天晚上开始写,到刚才为止,接近一天...

zoowii
2012/11/01
1.1K
0
IIS6.0 日志导出工具

IIS 6.0 日志导入工具是一个服务器日志分析工具,因为我们对文本内容分析起来非常吃力, 通常第一步是先导入数据库,而手工导入到数据库又是一个费时费力的事情, IIS 6.0 日志导入工具专门针...

小鱼干
2012/11/12
2.1K
0
远程的 Shell 客户端--Rlogin

Rlogin 是一个远程的 Shell 客户端,类似 SSH。其设计的理念是快速而且体积小。Rlogin 不是加密的,不适合用于高安全的环境,但是它最大的优点是速度奇快,而且服务器和客户端都不需要占用太...

匿名
2012/11/17
1.4K
0

没有更多内容

加载失败,请刷新页面

加载更多

PHP实现RabbitMQ消息队列

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异. php扩展地址: http://pecl.php.net/package/amqp 具体以官网为准 http://www.rabbitmq.com/getstarted.htm...

PHP圈子
7分钟前
0
0
pdd笔试题

拼多多提前批的笔试没有报名,但昨天听伙伴们说很难,所以一共4道题,挑了2道会的,自己编了一下。 #include<iostream>#include<vector>#include<algorithm>using namespace std;int ma...

osc_tylqml9v
8分钟前
0
0
拓扑排序算法

/** * 拓扑排序算法,拓扑都是有向无环图 * 使用场景:编译的时候,比如,springboot启动的时候要读取docker系统环境变量,还要读取各配置文件按照顺序 * 还有比如,a的包依赖...

osc_94gn551r
9分钟前
0
0
巨微代理MS1581蓝牙无线收发器

上海巨微MS1581包含8位单片机和低功耗、低成本的BLE收发器,内部集成了发射机、接收机、GFSK调制解调器和BLE基带处理。遵循BLE广播通道通信,具有成本低、体积小、控制方便等优点。巨微代理英...

英尚微电子
9分钟前
0
0
链接测试(内部)

1、长链 https://chelun.eclicks.cn/web/information?info_tid=156984 - 文章test http://cjjl-h5-test.chelun.com/2020/big/index.html - 以小博大test 2、scheme : 钱包 supercoach://myw......

osc_hwc3munb
10分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部