文档章节

spark 整理的一些知识

tuoleisi77
 tuoleisi77
发布于 06/18 17:28
字数 11087
阅读 10
收藏 0
点赞 0
评论 0

Spark 知识点

  1. 请描述spark RDD原理与特征?

 

RDD全称是resilient distributed dataset(具有弹性的分布式数据集)。一个RDD仅仅是一个分布式的元素集合。在Spark中,所有工作都表示为创建新的RDDs、转换现有的RDDs,或者调用RDDs上的操作来计算结果。

       Spark,一个RDD仅仅是一个不可变的分布式对象集合.每个RDD被切分成多个可以在不同集群节点上进行计算的partitions.RDDs可以包含Python,JavaScala任何类型的对象,包括用户自定义的class文件。

       用户创建RDD的两种方法:通过加载外部数据集或者通过在使用者运行的driver程序中分配一个对象集合。

一旦创建,RDD提供两种操作类型:转换(transformations)和执行(actions)Transformations会根据之前的RDD构造一个新的RDD

Transformationsactions的区别在于Spark计算RDD的方式.尽管你可以随时定义新的RDD,但是Spark只在他们第一次被action使用的时候进行计算。

最终,每当你在SparkRDD上运行一个action,默认会重新计算。如果你想在多个action上重新使用一个RDDRDD.persist()方法可以进行保存。

总的来讲,每个Spark项目或者shell对话都包含以下步骤:

  • 1.从外部数据创建一些输入RDD
  • 2.通过使用transformations如filter()来转换RDD成为新的RDD
  • 3.使用persist()来持久化任何需要被重用的中间RDD
  • 4.启动actions如count()和first()来启动并行计算,然后Spark会对这些计算进行优化和执行

特征:

RDD上的transformation(转换)是惰性求值,意味着Spark除非看到一个action(动作),否则不会开始执行。

  • 它是不变的数据结构存储
  • 它是支持跨集群的分布式数据结构
  • 可以根据数据记录的key对结构进行分区
  • 提供了粗粒度的操作,且这些操作都支持分区
  • 它将数据存储在内存中,从而提供了低延迟性

 

 

  1. 如何理解Spark RDD 宽依赖与窄依赖,并结合示例描述?

 

 

  1. 基于spark-1.6.1版本,请描述spark统一内存管理机制,和各部分内存区域的用途?

 

Spark统一内存管理主要是指Executor 端的内存模型,分为堆内内存和堆外内存两大区域。

(1),堆内内存

默认情况下,executor仅仅使用堆内内存(参数spark.memory.offHeap.enabled设置为true可以开启堆外内存),分为4个部分:

Execution memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据;

Storage memory:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;

User Memory:主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。

Reserved Memory:系统预留内存,会用来存储Spark内部对象。

可以概括为下图:

Spark 内存管理

说明如下:

systemMemory: executor的内存总和,通过spark.executor.memory 或--executor-memory 配置

reservedMemory :默认为300M,一般不可修改,测试场景下可以通过spark.testing.reservedMemory进行指定。

usableMemory = systemMemory - reservedMemory,这个就是 Spark任务的可用内存。

通过参数spark.memory.fraction(默认0.6)和spark.memory.storageFraction(默认0.5)参数可以调整User Memory,Execution memory,Storage memory的大小。

 

Execution memory Storage memory的内存具有共享特性,概述如下:

程序提交时我们都会设定基本的 Execution 内存和 Storage 内存区域;

在程序运行时,如果双方的空间都不足时,则存储到硬盘;

Execution memory的空间充足,而storage memory不足时,storage可以占用execution memory。如果后续execution需要用到被占用的内存时,可以要求storage将相关数据转存到硬盘来归还内存

Storage memory的空间充足,而Execution memory不足时,Execution可以占用storage内存,但是在storage需要内存而execution仍然在占用时,则无法要求归还,只能等待execution主动释放。

 

(2),堆外内存

这种模式不在 JVM 内申请内存,而是调用 Java unsafe 相关 API 直接向操作系统申请内存,由于这种方式不进过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。

默认情况下堆外内存是关闭的,可以通过spark.memory.offHeap.enabled参数开启,通过spark.memory.offHeap.size参数设置堆外内存大小。

堆外内存只分为:Execution  memory Storage memory,如下图

Spark 内存管理

如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互补影响,这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也一样。

 

 

  1. Spark 中的rdd、dataframe和dataset有什么区别?

 

RDDDataFrameDataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;都有惰性机制,都会根据spark的内存情况自动缓存运算,三者都有partition的概念

区别如下:

 

RDD:

编译时类型安全;支持非结构化数据;直接通过类名点的方式来操作数据;默认采用的是java序列化方式方式,序列化结果较大效率较低;数据存储在java堆内存中,有可能导致频繁gc;

 

DataFrame

编译时不能进行类型转化安全检查DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,每一列的值没法用类点名直接访问;支持结构化数据;采用Kryo进行序列化,而且序列化时不需要带上元信息,大大的减少了序列化大小;数据可以保存在堆外内存中,减少了gc次数;支持Rpython

Dataset

支持结构化和非结构化数据;编译时类型安全;可以采用堆外内存进行存储;Dataset中,每一行是什么类型是不一定的

 

  1. 请详细阐述spark运行时job、stage、task的划分原则,以及其相互之间的关系?

Spark通过action切分出job,每个job里面通过shuffle切分stage,而每一个stage里面有若干task,task与RDD的partition对应,一个RDD有若干个partition,stage对输入RDD的处理时,每个partition就对应了一个task。

 

  1. 请描述spark hash shuffle 与 sort shuffle 之间的区别与优劣点

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

(1)HashShuffleManager运行原理

未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

http://tech.meituan.com/img/spark-tuning/hash-shuffle-common.png

 

优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

(2)SortShuffleManager运行原理

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

http://tech.meituan.com/img/spark-tuning/sort-shuffle-common.png

 

bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

•       shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

•       不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

http://tech.meituan.com/img/spark-tuning/sort-shuffle-bypass.png

 

  1. spark 计算的容错性如何保证?

 

spark的容错性依赖于Lineage机制与Checkpoint机制。RDDLineage记录的是粗颗粒度的特定数据Transformation操作(如filtermapjoin等)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区;RDDLineage依赖方面分为两种:窄依赖与宽依赖,用来解决数据容错的高效性。

第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD

第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长血统链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制

RDD计算中,通过检查点机制进行容错,传统做检查点有两种方式:通过冗余数据和日志记录更新操作。在RDD中的doCheckPoint方法相当于通过冗余数据来缓存数据,而之前介绍的血统就是通过相当粗粒度的记录更新操作来实现容错的。

检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。

 

 

  1. 请描述spark 广播变量原理与特点?

 

Spark广播变量可以解决闭包函数引用外部大变量引起的性能问题。Spark使用高效的广播算法分发广播变量,以降低通信成本。广播变量会以只读形式缓存在每个机器的本地,可以使用它们以高效的方式为每个节点提供大型输入数据集的副本。

Broadcast的实现有两种:HTTPBroadcastTorrentBroadcast;HTTPBroadcast是通过http来传输,该方式可能会造成Driver所在的节点网络堵塞。TorrentBroadcast。类似于常见的BitTorrent技术,基本思想是将data切分为一组blocks存储于DriverBlockManager中。假设现在有一些Executors获取到了一些blocks,那么这些Executors就可以成为data server。随着fetchExecutor越来越多,也就意味着更多的data server加入,那么data就很快能够传输到所有Executor

TorrentBroadcast具体实现的原理是:

Driver端:

1. Driver先把data序列化为byte array,然后再切割成blockSize大小的data blocksArray[ByteBuffer])。

2. 完成切割后,将每个分块数据存储到driver自己的BlockManager中,StorageLevelMEMORY_AND_DISK_SER,当存储完毕后会向BlockManagerMaster进行汇报。

 

Executor端:

1. 首先从本地查询是否缓存了完整的data,如果查询到了,则立即返回;否则调用readBlocksDriver或者其他Executor拉取 Torrent 块。

2. 新建一组Array[BlockData](numBlocks)用于存储从远程拉取过来的Block,接着按照随机的索引顺序(假设有5个块,那么打乱之后,拉取的索引顺序可能为3-1-4-2-5)一个个的去fetch block data

3. 每个block data都有一个唯一ide.g. broadcast_xx_piece_0),首先会根据这个idBlockManager查询本地是否有数据,如果有则将数据放到数组中索引对应的位置;否则,根据idBlockManager从远程拉取数据。

4. BlockManager首先会向Driver查询该id对应的块在哪些locations上存在,当前Executor就会随机选择一个location进行下载,下载完成后,会报告给BlockManagerMaster。随着下载的Executor越多,那么这个location也就越多,data block服务器也就越多。

5. 当获取到所有的BlockData之后(所有的BlockData都将存储在Array[BlockData]中),接着会对该array中的块数据进行反序列化得到原始的data。最后,会将data放到当前ExecutorBlockManager中,那么以后再次获取data时,就可以直接从本地得到。

 

广播变量的一个最重要的特点就是,在同一个执行器上的所有任务都可以共享此Broadcast,而不是每个任务使用一个变量副本;还有在使用广播变量做join操作时,不会产生shuffle

 

 

  1. Spark中的任务调度的调度模式有几种,分别描述其实现原理?

 

spark的调度模式分为两种:FIFO(先进先出)和FAIR(公平调度)。默认是FIFO,即谁先提交谁先执行,而FAIR支持在调度池中再进行分组,可以有不同的权重,根据权重、资源等来决定谁先执行, 例如可以为重要的jobs创建高优先级的池,或者把不同用户的job放到不同的组,然后给用户配置相同的资源量从而不至于某些用户的作业少而得到更少的资源。spark的调度模式可以通过spark.scheduler.mode进行设置。

在DAGScheluer对job划分好stage并以TaskSet的形式提交给TaskScheduler后,TaskScheduler的实现类会为每个TaskSet创建一个TaskSetMagager对象,并将该对象添加到调度池中。schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。程序会根据配置来创建不同的调度池,schedulableBuilder有两种实现,分别是FIFOSchedulableBuilder和FairSchedulableBuilder。

 

FIFOSchedulableBuilder啥也没干。FairSchedulableBuilder的buildPools方法中会先去读取FAIR模式的配置文件默认位于SPARK_HOME/conf/fairscheduler.xml,也可以通过参数spark.scheduler.allocation.file设置用户自定义配置文件。FAIR可以配置多个调度池,即rootPool里面还是一组Pool,Pool中包含了TaskSetMagager。 FairSchedulableBuilder会根据配置文件创建buildFairSchedulerPool。根据每个字段值(未设置则为默认值)来实例化一个Pool对象,并添加到rootPool中。

 

一个spark应用程序包含一个TaskScheduler,一个TaskScheduler包含一个唯一的RootPool,FIFO只有一层Pool,包含TaskSetMagager,而FARI包含两层Pool,RootPool包含子Pool,子Pool包含TaskSetMagager,RootPool都是在实例化SchedulableBuilder的时候创建的。两种调度模式的最终实现都是一样,不过FAIR会在添加之前会获取需要使用的调度池,默认为名字为default的调度池。对于FIFO而言,parentPool都是RootPool,而FAIR,TaskSetMagager的parentPool都是RootPool的子Pool。

 

FIFO模式的算法类是FIFOSchedulingAlgorithm:

1、先比较priority,在FIFO中该优先级实际上是Job ID,越早提交的job的jobId越小,priority越小,优先级越高。

2、若priority相同,则说明是同一个job里的TaskSetMagager,则比较StageId,StageId越小优先级越高。

 

FAIR模式的算法实现类是FairSchedulingAlgorithm:

1、调度池运行的task数小于minShare的优先级比不小于的优先级要高。

2、若两者运行的task个数都比minShare小,则比较minShare使用率,使用率约低优先级越高。

3、若两者的minShare使用率相同,则比较权重使用率,使用率约低优先级越高。

4、若权重也相同,则比较名字。

 

 

  1. 请描述spark 算子reduce与reduceByKey的区别?

 

reduce(binary_function)

reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。

 

val c = sc.parallelize(1 to 4)

c.reduce((x, y) => x + y)

//结果10

 

具体过程,RDD有1 2 3 4个元素,

1+2=3

3+3=6

6+4=10

 

reduceByKey(binary_function)

reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

 

val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))

a.reduceByKey((x,y) => x + y).collect    

//结果 Array((1,5), (3,10))

具体过程,

(1,2),(1,3)被分到一组进行求和得到(1,5)

(3,4),(3,6) 被分到一组进行求和(3,10)

 

 

  1. 请阐述spark yarn-client与yarn-cluster模式的原理,并分析其特点?

 

答: Yarn中,每个application都有一个Application Master进程,负责从ResourceManager中申请资源。

 

Yarn-cluster模式下,driver运行在被YARN管理的Appliaction Master进程中。Client将其Application交给RM后可以立即关闭而不必持续到Application的生命周期。

http://www.aboutyun.com/data/attachment/forum/201503/31/152454ecw4pe8vweo8aw65.png

 

Yarn-client中,driver运行在client进程中,Application Master仅仅从Yarn中申请资源给Executor,之后client会跟container通信进行作业的调度。所以client得一直运行直到Application结束。

http://www.aboutyun.com/data/attachment/forum/201503/31/152418d6mesxsa9k0awcsc.png

In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

 

一般来说,如果提交任务的节点和Worker集群在同一个网络内,此时client mode比较合适。如果提交任务的节点和Worker集群相隔比较远,就会采用cluster mode来最小化DriverExecutor之间的网络延迟。

 

 

  1. Spark on yarn模式下,Spark executor oomjava.lang.OutOfMemoryError)时,请列举有哪些可以解决/调优的方式,请具体描述?

Diagnostic Messages for this Task:

Container [pid=28020,containerID=container_1459428698943_31555_01_004570] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.0 GB of 16.8 GB virtual memory used. Killing container

 

答:

1) map过程产生大量对象导致内存溢出:

这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。

 

2)shuffle后内存溢出:

    shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。

 

3) standalone模式下资源分配不均匀导致内存溢出:

在standalone的模式下如果配置了--total-executor-cores 和 --executor-memory 这两个参数,但是没有配置--executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置--executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。

 

4) 在内存不足的使用,使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():

    rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

 

 

  1. 请分析以下异常发生原因,并概述spark 监听器机制?

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler

 

  1. 举例分析spark数据倾斜现象、原因及解决方法?

 

  1. Spark streaming中Dstream是如何产生,请描述其过程?

 

根据时间实例产生RDDs,和batchDuration对齐的,如:timer实例就是1秒,1秒生成一个RDD,

每个RDD对应一个Job,因为RDD就是DStream操作的时间间隔的最后一个RDD,后面的RDD对前面

的RDD有依赖关系,后面对前面有依赖可以推到出整个依赖链条。

 

  1. Spark streaming与kafka集成应用时,从kafka读取数据有几种获取方式,且各有什么不同?

 

一、有两种方式:

①Receiver-base

②Direct

Receiver-base这种方式是先把数据从kafka中读取出来,然后缓存在内存,再定时处理。Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行

Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积

 

二、两种方式的优缺点如下:

Direct方式具有以下方面的优势:

1、简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应

2、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。

3、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。

4、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

5、降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。

6、鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

Direct方式的缺点:

提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。

监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。

 

Receive-base优点:

1、Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。

Receive-base的缺点:

1、防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。

2、单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。

3、在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费。

4、提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。5、Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。

6、采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度

 

  1. 请描述Spark thriftServer增量(配置参数spark.sql.thriftServer.incrementalCollect)取数机制

 

SparkthirfServer是一个基于HiveServer2开发的用于多用户访问Spark的服务器,提供了类JDBC接口使得任何使用BI工具的用户能够方便接入Spark.

 

配置参数spark.sql.thriftServer.incrementalCollect的取值为boolean类型,默认取值为false

 

当值为false的时候,采用collect取数机制,这种机制会一次性从spark获取所有的数据返回给JDBC,这就意味着一个JDBC请求返回的大量数据集全部放在Spark sql thriftServer的堆内存里面,这样可能会导致内存溢出,可通过增加堆内存和手动分页来规避风险。

 

当取值为true时,采用叫做tolocalIterator的取数机制,具体为一次只返回一个Spark分区的数值,这样会对性能表现有所影响,但是会减少thriftSerer堆内存消耗。

 

 

  1. 以“select count(*) from table”的sql语言为例,描述spark task产生过程?

 

总共有7个步骤:

  1. SQL语句经过SqlParser解析成Unresolved LogicalPlan;

SalParser匹配select 语句,获取Distinct语句,投影字段projection,表relation,依次将匹配的字符串层层封装,最终形成一颗LogicPlan的Tree

  1. 使用analyzer结合数据字典(catalog)进行绑定,生成ResolvedLogicalPlan
  2. 使用optimizer对ResolvedLogicalPlan进行优化,生成Optimized LogicalPlan
  3. 使用SparkPlan将LogicalPlan转化成PhysicalPlan
  4. 使用prepareForExecution将PhysicalPlan转换成可执行物理计划
  5. 使用execute() 执行可执行物理计划,生成SchemaRDD
  6. 创建task

 

 

  1. 请说明Spark-sql产生小文件的原因,请描述其调优/解决方法?

 

答:产生小文件原因:Spark默认在执行聚合(即shuffle)时,会多线程并行往hdfs写数据(因为每个DataFrame/RDD分成若干个Partition,这些partition可以被并行处理,默认有200个分区,由conf变量“spark.sql.shuffle.partitions”定义)。其结果就是一个存下来的文件,其实是hdfs中一个目录,在这个目录下才是众多partition对应的文件,最坏的情况是出现好多size为0的文件。

调优/解决办法:调节参数spark.sql.shuffle.partitions,适当降低它的并行度;使用reparation(num)=coalesce(num, true)函数重分区coalesce和repartition,合并小文件

 

 

  1. 请列举spark 常用的优化参数,并阐述其意义和应用场景?

 

①num-executors

 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

 

②executor-memory

 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

 

③executor-cores

 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

 

④driver-memory

 参数说明:该参数用于设置Driver进程的内存。

 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

 

⑤spark.default.parallelism

 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

 

⑥spark.storage.memoryFraction

 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

 

⑦spark.shuffle.memoryFraction

 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

 

 

© 著作权归作者所有

共有 人打赏支持
tuoleisi77
粉丝 3
博文 28
码字总数 43655
作品 0
深圳
程序员
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
【DataMagic】如何在万亿级别规模的数据量上使用Spark

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文首发在云+社区,未经许可,不得转载。 作者:张国鹏 | 腾讯 运营开发工程师 一、前言 Spark作为大数据计算引擎,凭借其快速、...

04/18
0
0
spark1.2.0版本搭建伪分布式环境

、下载scala2.11.5版本,下载地址为:http://www.scala-lang.org/download/2.11.5.html 2、安装和配置scala: 第一步:上传scala安装包 并解压 第二步 配置SCALAHOME环境变量到bashprofile 第...

stark_summer
2015/02/09
0
2
2018最新版本的spark面试题及答案

Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,Spark提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)...

嘿你好夏天
04/03
0
0
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836
04/12
0
0
Hadoop CDH5 Spark部署

Spark是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速,Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark ...

China_OS
2014/05/30
0
0
Spark笔记整理(二):RDD与spark核心概念名词

[TOC] Spark RDD 非常基本的说明,下面一张图就能够有基本的理解: Spark RDD基本说明 1、Spark的核心概念是RDD (resilient distributed dataset,弹性分布式数据集),指的是一个只读的,可分...

xpleaf
04/25
0
0
如何在万亿级别规模的数据量上使用Spark?

  【IT168 技术】   一、前言   Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解...

网络大数据
05/30
0
0
如何在万亿级别规模的数据量上使用Spark

一、前言 Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思...

风火数据
05/14
0
0
2014年spark开发者大赛火热进行中!

“发现最有正能量的网络达人”,Spark开发者大赛火热进行! 2014年9月30日,2014 Spark开发者大赛在北京正式启动。本次大赛由Spark亚太研究院联合国内领先的IT技术创新与发展的互联网媒体平台...

Spark亚太研究院
2014/10/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

rabbitmq学习记录(六)交换机Exchange-direct

实现功能:一条消息发送给多个消费者 交换机模式:direct 相比于之前的fanout模式,可以进一步的筛选获取消息的消费者。 fanout模式下,只要消费者监听的队列,已经与接收生产者消息的交换机...

人觉非常君
11分钟前
0
0
Java 之 枚举

Java 中声明的枚举类,均是 java.lang.Enum 类的子类,Enun 类中的常用方法有: name() 返回枚举对象名称 ordinal() 返回枚举对象下标 valueOf(Class enumType, String name) 转换枚举对象 ...

绝世武神
19分钟前
0
0
使用爬虫实现代理IP池之放弃篇

啥叫代理IP以及代理IP池 概念上的东西网上搜索一下就好了,这里简单科普一下(大部分会读这篇文章的人,基本是不需要我来科普的),白话说就是能联网并提供代理访问互联网的服务器,它提供的...

一别丶经年
35分钟前
0
0
sqoop导入数据到Base并同步hive与impala

使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟 基础环境 Sqool和Hive、HBase简介 Sqoop Hive HBase 测试Sqoop 使用Sqoop从MySQL导入数据到Hive 使用复杂SQL 调整Hive数据类型 不断更新 ...

hblt-j
今天
0
0
Dart 服务端开发 文件上传

clent端使用angular组件 upload_component.html form id="myForm" method="POST" enctype="multipart/form-data"> <input type="file" name="fileData"> <!-- file field --></form>......

scooplol
今天
0
0
apache和tomcat同时开启,乱码问题

tomcat和apache同时开启,会走apache的转发,执行的是AJP/1.3协议。所以在tomcat的配置文件server中, <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" useBodyEncodingForU......

Kefy
今天
0
0
使用ssh-keygen和ssh-copy-id三步实现SSH无密码登录 和ssh常用命令

ssh-keygen 产生公钥与私钥对. ssh-copy-id 将本机的公钥复制到远程机器的authorized_keys文件中,ssh-copy-id也能让你有到远程机器的home, ~./ssh , 和 ~/.ssh/authorized_keys的权利 第一步...

xtof
今天
0
0
orcale 查询表结构

SELECT t.table_name, t.colUMN_NAME, t.DATA_TYPE || '(' || t.DATA_LENGTH || ')', t1.COMMENTS FROM User_Tab_Cols t, User_Col_Comments t1WHERE t.table_name......

wertwang
今天
0
0
华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大

华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大!华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大! 在华为最新发布的nova 3手机上,抖音通过华为himedia SDK集成了60fps、超级...

华为终端开放实验室
今天
0
0
多 SSH Key 实现同一台服务器部署多 Git 仓库

本文以以下需求为背景,介绍详细的做法: 需在同一台服务器同时部署两个不同的 Github 仓库(对 Bitbucket 等 git 服务同样适用) root 用户可在远程登录 SSH 后附上预期的 SSH Key 进行 gi...

yeahlife
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部