文档章节

Spark Programming--- Shuffle operations

o
 osc_mervd488
发布于 2018/04/23 18:19
字数 2234
阅读 0
收藏 0

精选30+云产品,助力企业轻松上云!>>>

 

Spark Programming--- Shuffle operations

http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations

一些spark的特定操作将会触发被称为shuffle的事件。Shuffle是Spark用于重新分布数据的机制,这样可以在不同的分区来分组。这通常涉及到在executor和机器之间进行拷贝数据,所以shuffle是一个很复杂并且消耗高的操作。

背景

为了了解shuffle期间发生了什么,我们可以考虑reduceByKey操作作为例子。reduceByKey操作生成了一个新的RDD通过所有的单个键值组合为一个元组-关键字和针对与该关键字相关的所有值执行reduce函数的结果。这里的挑战不是所有的值对一个单独的键都在同一个分区上或者甚至说在一台机器上,而是它们必须被重新分布来计算结果。
在Spark,数据通常不会跨分区分布到特定操作的必要位置。在计算中,一个单独的任务将会在一个单独的分区上操作-然而为了组织所有的数据来被一个的单独reduceByKey 的reduce任务来执行,Spark需要来执行一个all-to-all操作。它必须读取所有分区来找到所有键的值,然后将它们带到一起跨分区来为每一个键计算最终的结果---这个被称为shuffle。
尽管在每一个分区中的新的shuffled数据的元素集是很重要的,同样分区自己的顺序也很重要,而元素之间的顺序就不是了。如果一个想要预测shuffle中的顺序数据那么可以使用:

  1. mapPartitions 来排序每一个分区,比如,.sorted
  2. repartitionAndSortWithinPartitions 来有效分区同时同步重新分区。
  3. sortBy 创造一个全局的排序的RDD

可以引起一个shuffle 的操作包括:repartition 和 coalesce,ByKey的操作,除了counting之外的比如:groupByKey 和reduceByKey,以及join操作比如cogroup 和 join。

性能影响

Shuffle是一个昂贵的操作因为它涉及到磁盘I/O,数据序列化和网络I/O。为了给shuffle组织数据,spark生成一系列任务-maps用于组织数据,以及一系列reduce任务来聚集它。这个命名系统来自于MapReduce而且并不直接和SparK的map,reduce操作有关。
在内部,单独的map任务的结果会被保存在内存中直到它们不适用。然后这些结果会被根据目标分区排序并且写向单一的文件。在reduce方面,任务读取相关的排序块。
一定的shuffle操作会消耗明显的数量的堆内存因为它们使用的是在内存中的数据结构来组织记录在传输之前或者之后。明显的,reduceByKey和AggregateByKey创造了这些结构在map阶段,以及 ‘Bykey的操作生成了它们在reduce阶段。当数据不能放进内存中时,Spark将会将这些表散落到硬盘中,会引起而外的磁盘I/O和增加垃圾回收次数。
Shuffle同样会生成大量的中间文件在磁盘中。从Spark1.3开始,这些文件被保存直到对应的RDDs不再被使用以及已经被垃圾回收了。这样做是为了shuffle文件不需要被重新创造如果lineage被重新计算时。垃圾回收也许会发生只有在一段很长时间,如果这个应用保留了对RDD的引用或者如果GC没有频繁的发生。这意味着长期运行的spark任务也许会消耗大量的磁盘空间。这个零时的磁盘目录会被spark.local.dir参数所指定。
Shuffle行为可以被调整通过一系列的参数。可以参考 Spark Configuration Guide.‘Shuffle Behavior’章节。

Shuffle Behavior

属性名称Property Name 默认值Default 含义Meaning
spark.reducer.maxSizeInFlight 48m 从每一个reduce任务中同步获取的map输出的最大值。由于每一个输出需要我们创造一个缓存来接受它,这个代表了每个任务的固定的内存开销,所以尽量保证它较小除非你有很多内存。
spark.reducer.maxReqsInFlight Int.MaxValue 这个配置限制了任意给定点远程请求获取块数。当集群中的主机数量增加的时候,它也许会导致一个非常大数量的内部连接到一到多个节点,引起worker在负载下失败。通过允许它来限制获取请求的数量,这个情况也许会缓解
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 这个配置限制了每一个从给定端口里的的reduce任务可以获取的远程端口数量。当一个大量的block被一个给定的地址在一次单独获取或者同步获取所请求时,可能会冲垮服务的executor或者Node Manager。这个配置对于减少Node Manager的负载尤为有用当外部的shuffle是被允许的。你可以通过设定一个较低值来减轻这个情况。
spark.maxRemoteBlockSizeFetchToMem Long.MaxValue 远程的块将会被获取到磁盘中,当这个块的大小超过了这个配置的值在byte单位上。这个用于避免一个巨大的请求占据了太多的内存。我们可以将这个配置为一个指定的值(比如,200M)。注意到这个配置将会影响到shuffle的获取以及远程块获取的块管理。对于允许了外部shuffle服务的用户,这个特性只会在外部shuffle服务版本高于Spark2。2时有效。
spark.shuffle.compress true 是否压缩map的输出文件,通常是一个好想法。压缩将会使用spark.io.compression.codec.
spark.shuffle.file.buffer 32k 对每一个shuffle文件输出流的在内存中的缓存大小,单位是KiB除非有其他的特别指定。这些缓存减少了硬盘查找和系统调用创建中间shuffle文件的过程。
spark.shuffle.io.maxRetries 3 (Netty only)最大自动重复尝试的次数如果这个值没有被设置为0.这个重试逻辑有助于稳定大型的shuffle在长时间的GC暂停或者暂时的网络连接问题上。
spark.shuffle.io.numConnectionsPerPeer 1 (Netty only) 节点之间的连接的重复使用为了减少大型集群中重复建立连接的情况。对于有很多硬盘和很少主机的集群,这个将会导致并发行不足以饱和所有硬盘,因此用户可能会考虑增加这个值。
spark.shuffle.io.preferDirectBufs true (Netty only) 堆外缓冲区在shuffle和缓存块转移期间被用于减少垃圾回收。对于对外缓存内存数量有限的环境,用户也许想要关掉这个来强迫所有的来自于Netty的分配都是在堆上。
spark.shuffle.io.retryWait 5s (Netty only) 在每一次重试直接需要等待多久。最大的延迟时间默认是15秒,maxRetries * retryWait.
spark.shuffle.service.enabled false 允许外部shuffle服务。这个服务保存了通过executor所写的shuffle文件,这样这个executor可以安全的被移除。这个配置必须被允许如果spark.dynamicAllocation.enabled是“true”。这个外部的shuffle服务必须被启动。查看dynamic allocation configuration and setup documentation 来获得更多信息。
spark.shuffle.service.port 7337 外部shuffle服务将会运行的端口。
spark.shuffle.service.index.cache.size 100m 缓存条目限制在指定的内存占用空间中,以字节为单位
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE 在shuffle服务中同一时间最大允许传输的块数量。注意到新来的连接将会被关闭如果达到了最大数量。这个客户端将会尝试重新连接根据shuffle的重试配置(see spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait),如果这个限制也被达到了,那么这个任务将会失败。
spark.shuffle.sort.bypassMergeThreshold 200 (Advanced)在基于排序的shuffle管理中,避免合并排序数据如果这里没有map-side的聚合和这里最多有配置的这么多的reduce分区。
spark.shuffle.spill.compress true 是否压缩溢出的数据在shuffle期间
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 阀值是以bytes为单位,高于此值将准确记录HighlyCompressedMapStatus中的shuffle块的大小。这个用于帮助阻止OOM通过避免错误估计了shuffle块大小当获取了shuffle块时。
spark.shuffle.registration.timeout 5000 注册外部shuffle服务的超时时间,单位是毫秒
spark.shuffle.registration.maxAttempts 3 当注册外部shuffle服务失败的时候,我们会重复尝试的最大次数
spark.io.encryption.enabled false 允许IO编码。目前支持所有的模式除了Mesos。当使用这个特性的时候,我们推荐RPC编码。
spark.io.encryption.keySizeBits 128 IO编码的值大小单位为bit。支持的值有128,192和256.
spark.io.encryption.keygen.algorithm HmacSHA1 当生成一个IO编码键值时使用的算法。被支持的算法在Java Cryptography Architecture Standard Algorithm Name 文档的KeyGenerator章节中被描述。
o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
Spark 1.2.0发布啦

阿里云智能数据库事业部 OLAP 平台团队现向 2021 届海内外院校应届毕业生(毕业时间在2020年11月-2021年10月之间)招聘大数据系统工程师,有机会转正,有意请加 iteblog 微信内推。   Spa...

Spark
2014/12/19
0
0
Spark学习笔记

Spark Core 1.1 RDD 概念:The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the clus......

osc_nd4uekgu
2019/06/15
1
0
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
2018/11/09
0
0
Spark Streaming + Kafka Integration Guide

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partition......

刺猬一号
2018/07/18
301
0
来自 Facebook 的 Spark 大作业调优经验

文章目录 1 Facebook Spark 的使用情况 2 扩展 Spark Driver 3 扩展 Spark Executor 4 扩展 External Shuffle Service 5 应用程序调优 Facebook Spark 的使用情况 在介绍下面文章之前我们来看...

osc_wcq210y3
06/15
3
0

没有更多内容

加载失败,请刷新页面

加载更多

深入浅出Zabbix 3.0 -- 第二章 Zabbix Web操作与定义

第二章 Zabbix Web操作与定义 本章介绍Zabbix 中一些基本概念的定义和web前端页面的操作,包括Zabbix中使用的一些术语的定义,Web页面中用户管理、主机和主机组的管理,以及监控项、模板、触...

osc_5zaxkz1e
32分钟前
14
0
深入浅出Zabbix 3.0 -- 第一章 Zabbix 安装与配置

第一章 Zabbix 安装与配置 1.1 Zabbix 介绍 Zabbix是一个企业级的开源监控软件,可以监控IT基础架构的可用性和应用的性能,为用户提供集中管理、分布式监控的一站式(all in one)监控解决方...

osc_nvkeo9cj
32分钟前
10
0
PHP 实现抽奖逻辑

public static function get_rand($proArr) { $result = ''; //概率数组的总概率精度 $proSum = array_sum($proArr); //概率数组循环 forea......

chenhongjiang
32分钟前
18
0
struts2 上传 下载

东方部落: http://11144439.blog.51cto.com struts中上传文件功能小测试。这里jar是 2.5 版本。 项目结构图 废话不多说,直接代码。 2. web.xml配置 <?xml version="1.0" encoding="UTF-8......

osc_1qix3fyb
34分钟前
31
0
SVN管理系统安装及其操作

SVN管理系统安装及操作 防伪码:学习永远不晚! 前言: SVN是Subversion的简称,是一个开放源代码的版本控制系统,相较于RCS、CVS,它采用了分支管理系统,它的设计目标就是取代CVS。互联网上...

osc_afifi2qt
35分钟前
24
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部