文档章节

spark工作的一些总结

tuoleisi77
 tuoleisi77
发布于 2018/06/19 14:08
字数 3991
阅读 189
收藏 0

「深度学习福利」大神带你进阶工程师,立即查看>>>

 

  1. 请描述spark RDD原理与特征?

spark RDD原理:是一个容错的、并行的(弹性分布式)数据结构,可以控制数据存储至磁盘或者内存,能够获取数据的分区。其具体特征,如下:

1)创建:rdd创建有2种方式,一种为从稳定存储中读取创建;另一种从父RDD转换得到新的RDD。

2)只读:状态不可变,不能修改。

3)分区:支持使 RDD 中的元素根据那个 key 来分区 ( partitioning ) ,保存到多个节点上。

4)容错:在 RDD 中血统 ( lineage ) ,即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的。

5)持久化:支持将会被重用的 RDD 缓存 ( 如 in-memory 或溢出到磁盘 )。

6)延迟计算: transformation操作,RDD都不会真正执行运算(记录lineage),只有当action操作被执行时,运算才会触发。。

7)操作:丰富的转换(transformation)和动作 ( action ) , count/reduce/collect/save 等。

  1. 如何理解Spark RDD 宽依赖与窄依赖,并结合示例描述?

窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作会产生窄依赖;宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作会产生宽依赖。如:spark map算子,其父RDD 的每个partition只被子RDD的一个partition所使用,因此,map算子数据不需求重洗,即不需要进行shuffle操作。而groupByKey,其父RDD的partition被多个子RDD的partition使用,其过程需要进行shuffle操作。

  1. 基于spark-1.6.1版本,请描述spark统一内存管理机制,和各部分内存区域的用途?

从Spark 1.6版本推出以后,Spark采用了统一内存管理模型。通过spark.memory.useLegacyMode配置,可以控制选择的内存管理器模式。在统一内存管理下,Spark一个executor的JVM Heap内存主要分成了三大部分:ReservedMemory、UserMemory、SparkMemory。

​Reserved Memory区域的内存是Spark内部保留内存,会存储一些spark的内部对象等内容,也是我们无法使用的部分,默认大小是300MB。

User Memory区域的内存是用户在程序中开的对象存储等一系列非Spark管理的内存开销所占用的内存(默认值为(JVM Heap Size - Reserved Memory) * (1-spark.memory.fraction))。

Spark Memory区域的内存是用于Spark管理的内存开销。主要分成了两个部分,Execution Memory和Storage Memory,通过spark.memory.storageFraction来配置两块各占的大小(默认值0.5)。1)Storage Memory。主要用来存储我们Cache的数据和临时空间序列化时Unroll的数据,以及Broadcast变量Cache级别存储的内容。2)Execution Memory。主要用来存储Spark Task执行时使用的内存(比如Shuffle时排序所需要的临时存储空间)。

为了提高内存利用率,Spark统一内存管理模型针对StorageMemory 和 Execution Memory有如下策略:1)一方空闲、一方内存不足情况下,内存不足一方可以向空闲一方借用内存。2)Execution Memory可以强制拿回StorageMemory在Execution Memory空闲时,借用的Execution Memory的部分内存(强制取回,而Storage Memory数据丢失,重新计算即可)。3)Storage Memory只能等待Execution Memory主动释放占用的StorageMemory空闲时的内存(不强制取回,因为如果task执行,数据丢失就会导致task 失败)。

 

  1. Spark 中的rdd、dataframe和dataset有什么区别?

Rdd以分区、只读等特性的弹性分布式数据集,并提供多种数据类型数据转换操作;

Dataframe在spark RDD基础上,按照行为对象组织的结构化分布式数据集,类似于传统关系数据库的表,同时,相比RDD优化部分性能。

dataframe为DataSet的特列,即dataframe可以采用该方式表示dataSet<Row>。

 

  1. 请详细阐述spark运行时job、stage、task的划分原则,以及其相互之间的关系?

Spark job以spark action算子,触发产生具体spark job;根据spark 宽窄依赖关系,确定spark shuffle的过程,且shuffle之间形成不同的stage;不同stage下,spark 每个RDD的分区,将形成spark task。因此,spark job包含多个stage,而stage又包含多个task。

 

  1. 请描述spark hash shuffle 与 sort shuffle 之间的区别与优劣点?

spark shuffle是把一组无规则的数据尽量转换成一组具有一定规则的数据,其将上一个stage数据传至下一个stage。Spark 1.2以前,默认采用hash shuffle

优缺点:hash shuffle优点:小规模计算时,效率高;不需要过多内存用户数据排序,且shuffle过程中,数据读写各只有一次。缺点:大规模数据计算时,将产生大量的临时文件和磁盘随机IO,降低计算性能。

Sort shuffle在hash shuffle基础上优化,其在map阶段合并大量的临时文件,大规模计算时,提升整体计算性能。缺点:涉及数据的排序,存在内存溢出的风险。

  1. spark 计算的容错性如何保证?

Spark选择容错性采用“记录更新”的方式。RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

Lineage根据spark RDD的依赖关系,当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。

  1. 请描述spark 广播变量原理与特点?

Broadcast主要用于共享Spark在计算过程中各个task都会用到的只读变量,Broadcast变量只会在每台计算机器上保存一份,而不会每个task都传递一份,这样就大大节省了空间,节省空间的同时意味着传输时间的减少,效率也高。

Broadcast(广播)变量对数据传输进行优化,通过Broadcast变量将用到的大数据量数据进行广播发送,可以提升整体速度

  1. Spark中的任务调度的调度模式有几种,分别描述其实现原理?

任务调度模式有2种:FIFO、FAIR模式。其中,FIFO模式的基本原理:每个Job被切分为多个Stage。第一个Job优先获取所有可用的资源,接下来第二个Job再获取剩余资源。以此类推,如果第一个Job并没有占用所有的资源,则第二个Job还可以继续获取剩余资源,这样多个Job可以并行运行。如果第一个Job很大,占用所有资源,则第二个Job就需要等待第一个任务执行完,释放空余资源,再申请和分配Job。FAIR模式的基本原理:Spark在多Job之间以轮询(round robin)方式为任务分配资源,所有的任务拥有大致相当的优先级来共享集群的资源。这就意味着当一个长任务正在执行时,短任务仍可以分配到资源,提交并执行,并且获得不错的响应时间。这样就不用像以前一样需要等待长任务执行完才可以。这种调度模式很适合多用户的场景。用户可以通过配置spark.scheduler.mode方式来让应用以FAIR模式调度。FAIR调度器同样支持将Job分组加入调度池中调度,用户可以同时针对不同优先级对每个调度池配置不同的调度权重。这种方式允许更重要的Job配置在高优先级池中优先调度。

  1. 请描述spark 算子reduce与reduceByKey的区别?

reduce是Action操作,reduceByKey是Transformation操作。

Reduce操作会将结果汇集的driver节点,当任务很多,任务的结果数据又比较大时Driver容易造成性能瓶颈。reduceByKey不是把数据汇集到Driver节点,是分布式进行的,因此不会存在reduce那样的性能瓶颈。

  1. 请阐述spark yarn-client与yarn-cluster模式的原理,并分析其特点?

client模式的原理,Client提交任务给resourceManager,在提交任务的时候,在提交任务的那台机器上面开启一个driver服务进程。resourceManager在接收到client提交的任务以后,在集群中随机选择一台机器分配一个container,在该container里面开启一个applicationmaster服务进程。然后,driver去找applicationmaster,applicationmaster又找resourcemanager申请资源,resourcemanager会分配container,在其中开启excuter,excuter会反向向driver注册,driver把task放入到excuter里面执行。Cluster模式的原理,Spark集群会在集群中开启一个driver,此时开启就是applicationmaster和driver合二为一了。其他的都相同。Yarn-client模式适合spark常驻应用,便于应用管理;yarn-cluster模式适合批量场景应用,减少client端压力。

  1. Spark on yarn模式下,Spark executor oom(java.lang.OutOfMemoryError)时,请列举有哪些可以解决/调优的方式,请具体描述?

Diagnostic Messages for this Task:

Container [pid=28020,containerID=container_1459428698943_31555_01_004570] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.0 GB of 16.8 GB virtual memory used. Killing container。

减少每个cup核(线程)处理的数据量,可调整参数:spark.sql.shuffle.partitions、spark.default.parallelism ;增加每个CPU核,可使用的内存:增大executor-memory或者executor-cores减少。

  1. 请分析以下异常发生原因,并概述spark 监听器机制?

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler

当消息队列中的消息数超过其spark.scheduler.listenerbus.eventqueue.size设置的数量(如果没有设置,默认为10000)时,会将最新的消息移除,这些消息本来是通知任务运行状态的,由于你移除了,状态无法得到更新,所以会出现上面描述的现象。

Spark监听器模型由以下几个部分组成:sparkListenerEvent、LiveListenerBus、SparkListener。SparklistenerEnent将事件发送至liveListerBus中,sparkListener从LiveListerBus消费事件。

  1. 举例分析spark数据倾斜现象、原因及解决方法?

现象:绝大多数task执行得很快,个别task执行很慢或处理数据大很多;历史任务,某天突然出现OOM。

原因:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。

常见解决方法:从数据角度,将数据的key增加部分随机,以避免某一key的数据量较大;从参数调优角度,增加shuffle过程的分区数(spark.default.parallelism/ spark.sql.shuffle.partitions),减少单个task任务的数据量。减少executor的核数(executor-cores),以增加每个task可用的内存。整体避免数据倾斜导致的OOM。

  1. Spark streaming中Dstream是如何产生,请描述其过程?

Dstream表示从数据源获取持续性的数据流以及经过转换后的数据流,且由持续的RDD序列组成。

从开发的角度,Dstream可认为是对RDD的封装。其按照时间分片,比如此采用1分钟为时间间隔,那么在连续的1分钟内收集到的数据作为一个RDD处理单元,且随着时间的推移,形成一系列离散的RDD处理单元。

  1. Spark streaming与kafka集成应用时,从kafka读取数据有几种获取方式,且各有什么不同?

Spark streaming从kafka读取数据的方式有2种:基于Receiver的方式、Direct方式。其中,基于Receiver的方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。

Direct方式相较于Receiver方式的优势在于:

简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。

高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。

精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

  1. 请描述Spark thriftServer增量(配置参数spark.sql.thriftServer.incrementalCollect)取数机制?

Spark thriftserver增量取数的基本原理:driver从executor,按照增量和顺序的方式获取每个分区数据,并发送给client端。而非增量取数方式,采用collect算子,将所有数据汇集在driver进程下的数组,再发送给client端。增量取数方式效率相当低,但减少driver内存溢出的风险。

  1. 以“select count(*) from table”的sql语言为例,描述spark task产生过程?

1)sql语句经过sqlParser解析成Unresolved LogicalPlan;

2)使用analyzer结合数据字典(catalog)进行绑定,生产Resloved LogicalPlan;

3)使用optimizer对Resloved LogicalPlan进行优化,生Optimized LogicalPlan;

4)使用SparkPlan将LogicalPlan转换成PhysicalPlan;

5)使用prepareForExecution将physicalPlan转换成可执行物理计划;

6)使用execute()执行可执行物理计划,生产schemaRDD。

  1. 请说明Spark-sql产生小文件的原因,请描述其调优/解决方法?

原因:spark shuffle过程中,下游stage的task数量由partition参数决定,一个task操作最后在hive/hdfs落地为一个文件。如果分区表下,hive表分区数*spark shuffle partition数。

调优方式:减少partition数值、spark post-shuffle机制,以文件大小决定落地的hive/hdfs、spark应用程序采用reparation算子,重定义分区。

  1. 请列举spark 常用的优化参数,并阐述其意义和应用场景?

Case1:executor lost(executor oom),通常由于executor内存配置内存有限,executor oom或触发rdd 自动清理,导致lost task。解决方法:适当增大executor memory和spark.default.parallelism。

Case2:Kryo serialization failed: Buffer overflow,调大spark.kryoserializer.buffer.max配置参数

Case3:Total size of serialized results of 251 tasks (1504.0 MB) is bigger than spark.driver.maxResultSize (1500.0 MB),调大spark.driver.maxResultSize配置参数。

Case4:spark thriftserver增量数据,配置参数spark.sql.thriftServer.incrementalCollect,减少driver端压力。

tuoleisi77
粉丝 4
博文 28
码字总数 43810
作品 0
深圳
程序员
私信 提问
加载中
请先登录后再评论。
建站引擎--PHPMyWind

PHPMyWind 是基于PHP+MySQL开发符合W3C标准的建站引擎。它将带给人们一系列高效的,成熟的企业网站建设解决方案,让您的信息以更健康的形式高速传递给需要的它的人们,同时让您感受通过PHPMy...

匿名
2013/01/14
4.4K
1
《The Way to Go》中文版

在接触 Go 语言之后,对这门编程语言非常着迷,期间也陆陆续续开始一些帮助国内编程爱好者了解和发展 Go 语言的工作,比如开始录制视频教程《Go编程基础》。但由于目前国内并没有比较好的 Go...

无闻
2013/04/14
2.5W
5
vss2svn2git

这个程序导入Visual SourceSafe(VSS)库到一个git存储库。 这是一个分叉来自vss2svn。再一次,我需要一些方法来从一个旧的VSS 6.0数据库中提取历史,而vss2git做这个事不正确。vss2svn预编译的...

匿名
2013/05/17
626
0
Python 解释器--Nuitka

Nuitka是一个Python的替代编译器。它可以无缝地替代和扩展Python的解释和编译工作。现在支持CPython2.6、2..7、3.2、3.3和3.4版本。它可以执行编译的代码,并能用很兼容的方式将目标代码一起...

匿名
2013/05/20
7.4K
0
分布式计算框架--DPark

DPark 是 Spark 的 Python 克隆,是一个Python实现的分布式计算框架,可以非常方便地实现大规模数据处理和迭代计算。 DPark 由豆瓣实现,目前豆瓣内部的绝大多数数据分析都使用DPark 完成,正...

Davies
2013/06/06
3.6K
1

没有更多内容

加载失败,请刷新页面

加载更多

如何在Git历史记录中grep(搜索)已提交的代码 - How to grep (search) committed code in the Git history

问题: I have deleted a file or some code in a file sometime in the past. 我过去某个时候已经删除了文件或文件中的某些代码。 Can I grep in the content (not in the commit messages)......

技术盛宴
8分钟前
0
0
二进制安装安装mysql 8.0.20

MySQL最新版本8.0.20正式发布。与之前8.0的系列版本一样,这次的发行版除了包含缺陷修复,也同样包括新功能。下面快速浏览一下。关键字:hash join、InnoDB双写缓冲、二进制日志事务压缩。 ...

程序员面试吧
12分钟前
7
0
关于python3.8+ pyside2 pyinstaller打包的一些坑

环境: python 3.8 pyinstaller 3.6 pyside2 5.14 打包过程中出现错误(1):   7607 WARNING: lib not found: pywintypes38.dll dependency of c:\users\have_\appdata\local\programs\pyth......

齐勇cn
13分钟前
0
0
备战秋招!静电的UI设计教室全能课程开始招生~系统进阶!提升核心竞争力

。 本文分享自微信公众号 - 静Design(JingDesign91)。 如有侵权,请联系 support@oschina.cn 删除。 本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。...

静电1983
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部