spark-性能调优

原创
2016/10/29 11:03
阅读数 109

问题:
1、分配哪些资源?
2、在哪里分配这些资源?
3、为什么多分配了这些资源以后,性能会得到提升?

  • 分配哪些资源?executor、cpu per executor、memory per executor、driver memory
  • 在哪里分配这些资源?在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数
    /usr/local/spark/bin/spark-submit \
    --class cn.spark.sparktest.core.WordCountCluster \
    --num-executors 3 \  配置executor的数量
    --driver-memory 100m \  配置driver的内存(影响很大)
    --executor-memory 100m \  配置每个executor的内存大小
    --executor-cores 3 \  配置每个executor的cpu core数量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

     

  • 调节到多大,算是最大呢?
    • 第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你心里应该清楚每台机器还能够给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的情况,去调节每个spark作业的资源分配。比如说你的每台机器能够给你使用4G内存,2个cpu core;20台机器;executor,20;平均每个executor:4G内存,2个cpu core。

    • 第二种,Yarn。资源队列。资源调度。应该去查看,你的spark作业,要提交到的资源队列,大概有多少资源?500G内存,100个cpu core;executor,50;平均每个executor:10G内存,2个cpu core。

    • 设置队列名称:spark.yarn.queue default

    • 一个原则,你能使用的资源有多大,就尽量去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)

  • 为什么调节了资源以后,性能可以提升?

    • 增加executor:

      如果executor数量比较少,那么,能够并行执行的task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。比如有3个executor,每个executor有2个cpu core,那么同时能够并行执行的task,就是6个。6个执行完以后,再换下一批6个task。增加了executor数量以后,那么,就意味着,能够并行执行的task数量,也就变多了。比如原先是6个,现在可能可以并行执行10个,甚至20个,100个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~数十倍。

      有时候数据量比较少,增加大量的task反而性能会降低,为什么?(想想就明白了,你用多了,别人用的就少了。。。。)

    • 增加每个executor的cpu core:

      也是增加了执行的并行能力。原本20个executor,每个才2个cpu core。能够并行执行的task数量,就是40个task。现在每个executor的cpu core,增加到了5个。能够并行执行的task数量,就是100个task。执行的速度,提升了2.5倍。

      SparkContext,DAGScheduler,TaskScheduler,会将我们的算子,切割成大量的task,
      提交到Application的executor上面去执行。

    • 增加每个executor的内存量:

      增加了内存量以后,对性能的提升,有三点:
      1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。
      2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,
      甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
      3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

Spark并行度指的是什么?

Spark作业,Application,Jobs,action(collect)触发一个job,1个job;每个job拆成多个stage,
发生shuffle的时候,会拆分出一个stage,reduceByKey。

stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)

stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()

reduceByKey,stage0的task,在最后,执行到reduceByKey的时候,会为每个stage1的task,都创建一份文件(也可能是合并在少量的文件里面);每个stage1的task,会去各个节点上的各个task创建的属于自己的那一份文件里面,拉取数据;每个stage1的task,拉取到的数据,一定是相同key对应的数据。对相同的key,对应的values,才能去执行我们自定义的function操作(_ + _)

并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段(stage)的并行度。

如果不调节并行度,导致并行度过低,会怎么样?

  • task没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,ok,那么同时在运行的task,只有100个,每个executor只会并行运行2个task。每个executor剩下的一个cpu core,就浪费掉了。
  • 你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,而且每个task主要处理1G的数据就可以。
  • 很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。
    1. task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况,比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)

    2. 官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,比如150个cpu core,基本要设置task数量为300~500;实际情况,与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task,可能会慢一点,要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。

    3. 如何设置一个Spark Application的并行度?

      spark.default.parallelism 
      SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

       

默认情况下,多次对一个RDD执行算子,去获取不同的RDD;都会对这个RDD以及之前的父RDD,全部重新计算一次;读取HDFS->RDD1->RDD2-RDD4这种情况,是绝对绝对,一定要避免的,一旦出现一个RDD重复计算的情况,就会导致性能急剧降低。比如,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟

  1. RDD架构重构与优化尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。
  2. 公共RDD一定要实现持久化。就好比北方吃饺子,现包现煮。你人来了,要点一盘饺子。馅料+饺子皮+水->包好的饺子,对包好的饺子去煮,煮开了以后,才有你需要的熟的,热腾腾的饺子。现实生活中,饺子现包现煮,当然是最好的了。但是Spark中,RDD要去“现包现煮”,那就是一场致命的灾难。对于要多次计算和使用的公共RDD,一定要进行持久化。持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。
  3. 持久化,是可以进行序列化的如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM内存溢出。当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。内存+磁盘,序列化。
  4. 为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足.

持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别

  • 如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()方法来替代
    • StorageLevel.MEMORY_ONLY_SER(),第二选择
    • StorageLevel.MEMORY_AND_DISK(),第三选择
    • StorageLevel.MEMORY_AND_DISK_SER(),第四选择
    • StorageLevel.DISK_ONLY(),第五选择
  • 如果内存充足,要使用双副本高可靠机制,选择后缀带_2的策略
    • StorageLevel.MEMORY_ONLY_2()
展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部