文档章节

spark-性能调优

crayzer_yixiu
 crayzer_yixiu
发布于 2016/10/29 11:03
字数 2902
阅读 100
收藏 1

问题:
1、分配哪些资源?
2、在哪里分配这些资源?
3、为什么多分配了这些资源以后,性能会得到提升?

  • 分配哪些资源?executor、cpu per executor、memory per executor、driver memory
  • 在哪里分配这些资源?在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数
    /usr/local/spark/bin/spark-submit \
    --class cn.spark.sparktest.core.WordCountCluster \
    --num-executors 3 \  配置executor的数量
    --driver-memory 100m \  配置driver的内存(影响很大)
    --executor-memory 100m \  配置每个executor的内存大小
    --executor-cores 3 \  配置每个executor的cpu core数量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

     

  • 调节到多大,算是最大呢?
    • 第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你心里应该清楚每台机器还能够给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的情况,去调节每个spark作业的资源分配。比如说你的每台机器能够给你使用4G内存,2个cpu core;20台机器;executor,20;平均每个executor:4G内存,2个cpu core。

    • 第二种,Yarn。资源队列。资源调度。应该去查看,你的spark作业,要提交到的资源队列,大概有多少资源?500G内存,100个cpu core;executor,50;平均每个executor:10G内存,2个cpu core。

    • 设置队列名称:spark.yarn.queue default

    • 一个原则,你能使用的资源有多大,就尽量去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)

  • 为什么调节了资源以后,性能可以提升?

    • 增加executor:

      如果executor数量比较少,那么,能够并行执行的task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。比如有3个executor,每个executor有2个cpu core,那么同时能够并行执行的task,就是6个。6个执行完以后,再换下一批6个task。增加了executor数量以后,那么,就意味着,能够并行执行的task数量,也就变多了。比如原先是6个,现在可能可以并行执行10个,甚至20个,100个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~数十倍。

      有时候数据量比较少,增加大量的task反而性能会降低,为什么?(想想就明白了,你用多了,别人用的就少了。。。。)

    • 增加每个executor的cpu core:

      也是增加了执行的并行能力。原本20个executor,每个才2个cpu core。能够并行执行的task数量,就是40个task。现在每个executor的cpu core,增加到了5个。能够并行执行的task数量,就是100个task。执行的速度,提升了2.5倍。

      SparkContext,DAGScheduler,TaskScheduler,会将我们的算子,切割成大量的task,
      提交到Application的executor上面去执行。

    • 增加每个executor的内存量:

      增加了内存量以后,对性能的提升,有三点:
      1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。
      2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,
      甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
      3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

Spark并行度指的是什么?

Spark作业,Application,Jobs,action(collect)触发一个job,1个job;每个job拆成多个stage,
发生shuffle的时候,会拆分出一个stage,reduceByKey。

stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)

stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()

reduceByKey,stage0的task,在最后,执行到reduceByKey的时候,会为每个stage1的task,都创建一份文件(也可能是合并在少量的文件里面);每个stage1的task,会去各个节点上的各个task创建的属于自己的那一份文件里面,拉取数据;每个stage1的task,拉取到的数据,一定是相同key对应的数据。对相同的key,对应的values,才能去执行我们自定义的function操作(_ + _)

并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段(stage)的并行度。

如果不调节并行度,导致并行度过低,会怎么样?

  • task没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,ok,那么同时在运行的task,只有100个,每个executor只会并行运行2个task。每个executor剩下的一个cpu core,就浪费掉了。
  • 你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,而且每个task主要处理1G的数据就可以。
  • 很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。
    1. task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况,比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)

    2. 官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,比如150个cpu core,基本要设置task数量为300~500;实际情况,与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task,可能会慢一点,要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。

    3. 如何设置一个Spark Application的并行度?

      spark.default.parallelism 
      SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

       

默认情况下,多次对一个RDD执行算子,去获取不同的RDD;都会对这个RDD以及之前的父RDD,全部重新计算一次;读取HDFS->RDD1->RDD2-RDD4这种情况,是绝对绝对,一定要避免的,一旦出现一个RDD重复计算的情况,就会导致性能急剧降低。比如,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟

  1. RDD架构重构与优化尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。
  2. 公共RDD一定要实现持久化。就好比北方吃饺子,现包现煮。你人来了,要点一盘饺子。馅料+饺子皮+水->包好的饺子,对包好的饺子去煮,煮开了以后,才有你需要的熟的,热腾腾的饺子。现实生活中,饺子现包现煮,当然是最好的了。但是Spark中,RDD要去“现包现煮”,那就是一场致命的灾难。对于要多次计算和使用的公共RDD,一定要进行持久化。持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。
  3. 持久化,是可以进行序列化的如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM内存溢出。当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。内存+磁盘,序列化。
  4. 为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足.

持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别

  • 如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()方法来替代
    • StorageLevel.MEMORY_ONLY_SER(),第二选择
    • StorageLevel.MEMORY_AND_DISK(),第三选择
    • StorageLevel.MEMORY_AND_DISK_SER(),第四选择
    • StorageLevel.DISK_ONLY(),第五选择
  • 如果内存充足,要使用双副本高可靠机制,选择后缀带_2的策略
    • StorageLevel.MEMORY_ONLY_2()

© 著作权归作者所有

上一篇: spark-shuffle调优
下一篇: 8-Storm
crayzer_yixiu
粉丝 26
博文 57
码字总数 87921
作品 0
杭州
高级程序员
私信 提问
spark submit参数及调优

spark submit参数介绍 你可以通过spark-submit --help或者spark-shell --help来查看这些参数。 使用格式: ./bin/spark-submit --class --master --deploy-mode --conf = ... # other option......

citibank
2018/07/17
0
0
Spark 通用的性能配置方法:内存和CPU的配置

前言 本文主要介绍关于通过配置Spark任务运行时的内存和CPU(Vcore)来提升Spark性能的方法。通过配置内存和CPU(Vcore)是比较基础、通用的方法。本文出现的Demo以X-Pack Spark数据工作台为...

云hbase+spark
2019/07/08
0
0
Spark性能优化篇一:资源调优

Spark性能优化篇一:资源调优 所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参...

u010262291
2018/05/30
0
0
高性能Spark作业基础:你必须知道的调优原则及建议

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操...

李雪蕤
2018/08/22
0
0
巨杉Tech | SparkSQL+SequoiaDB 性能调优策略

当今时代,企业数据越发膨胀。数据是企业的价值,但数据处理也是一种技术挑战。在海量数据处理的场景,即使单机计算能力再强,也无法满足日益增长的数据处理需求。所以,分布式才是解决该类问...

巨杉数据库
2019/10/31
35
0

没有更多内容

加载失败,请刷新页面

加载更多

每天AC系列(六):有效的括号

1 题目 LeetCode第20题,这题比较简单,匹配括号. 2 栈 这是栈的典型应用,括号匹配,当然不需要直接使用栈,使用一个StringBuilder即可: if(s.isEmpty()) return true;char a = s.charAt(0);...

Blueeeeeee
今天
27
0
Spring AOP-06-切入点类型

切入点是匹配连接点的拦截规则。之前使用的是注解@Pointcut,该注解是AspectJ中的。除了这个注解之外,Spring也提供了其他一些切入点类型: • 静态方法切入点StaticMethodMatcherPointcut •...

moon888
昨天
90
0
Class Loaders in Java

1. Introduction to Class Loaders Class loaders are responsible for loading Java classes during runtime dynamically to the JVM (Java Virtual Machine). Also, they are part of the ......

Ciet
昨天
96
0
以Lazada为例,看电商系统架构演进

什么是Lazada? Lazada 2012年成立于新加坡,是东南亚第一电商,2016年阿里投资10亿美金,2017年完成对lazada的收购。 业务模式上Lazada更偏重自营,类似于亚马逊,自建仓储和为商家提供服务...

春哥大魔王的博客
昨天
62
0
【自用】 Flutter Timer 简单用法

dart: void _startTime() async { _timer = Timer(Duration(seconds: sec), () { fun(xxx,yyy,zzz); }); } @override void dispose() { _timer.cancel()......

Tensor丨思悟
昨天
65
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部