Spark (三) 性能优化
博客专区 > bigsloth 的博客 > 博客详情
Spark (三) 性能优化
bigsloth 发表于1年前
Spark (三) 性能优化
  • 发表于 1年前
  • 阅读 106
  • 收藏 0
  • 点赞 0
  • 评论 0

参数配置

1、spark-env.sh

2、程序通过SparkConf或System.setProperty

 

性能观察与日志

1)Web UI。
2)Driver程序控制台日志。
3)logs文件夹下日志。
4)work文件夹下日志。
5)Profiler工具。

 

调度与分区优化

1.小分区合并

频繁的过滤或者过滤掉的数据量过大就会产生问题,造成大量小分区的产生。Spark是每个数据分区都会分配一个任务执行,如果任务过多,则每个任务处理的数据量很小,会造成线程切换开销大,很多任务等待执行,并行度不高;

解决方式:可以采用RDD中重分区的函数进行数据紧缩,减少分区数,将小分区合并变为大分区。
通过coalesce函数来减少分区。这个函数会返回一个含有numPartitions数量个分区的新RDD,即将整个RDD重分区。
当分区由10000重分区到100时,由于前后两个阶段的分区是窄依赖的,所以不会产生Shuffle的操作。
但是如果分区数量急剧减少,如极端状况从10000重分区为一个分区时,就会造成一个问题:数据会分布到一个节点上进行计算,完全无法开掘集群并行计算的能力。为了规避这个问题,可以设置shuffle=true

由于Shuffle可以分隔Stage,这就保证了上一阶段Stage中的上游任务仍是10000个分区在并行计算。如果不加Shuffle,则两个上下游的任务合并为一个Stage计算,这个Stage便会在1个分区状况下进行并行计算。

另一个需求,即当前的每个分区数据量过大,需要将分区数量增加,以利用并行计算能力,这就需要把Shuffle设置为true,然后执行coalesce函数,将分区数增大,在这个过程中,默认使用Hash分区器将数据进行重分区。

reparition本质上就是调用的coalesce方法。因此如果用户不想进行Shuffle,就需用coalese配置重分区,为了方便起见,可以直接用repartition进行重分区。

 

2.倾斜问题

倾斜(skew)问题是分布式大数据计算中的重要问题,倾斜有数据倾斜和任务倾斜两种情况,数据倾斜导致的结果即为任务倾斜,在个别分区上,任务执行时间过长。当少量任务处理的数据量和其他任务差异过大时,任务进度长时间维持在99%(或100%),此时,任务监控页面中有少量(1个或几个)reduce子任务
未完成。单一reduce的记录数与平均记录数差异过大,最长时长远大于平均时长,常可能达到3倍甚至更多

数据倾斜
产生数据倾斜的原因大致有以下几种。
1)key的数据分布不均匀(一般是分区key取得不好或者分区函数设计得不好)。
2)业务数据本身就会产生数据倾斜(像TPC-DS为了模拟真实环境负载特意用有倾斜的数据进行测试)。
3)结构化数据表设计问题。
4)某些SQL语句会产生数据倾斜

任务倾斜

任务倾斜的原因较为隐蔽,一般就是那台机器的正在执行的Executor执行时间过长,因为服务器架构,或JVM,也可能是来自线程池的问题,等等。

解决方式:可以通过考虑在其他并行处理方式中间加入聚集运算,以减少倾斜数据量。
数据倾斜一般可以通过在业务上将极度不均匀的数据剔除解决。这里其实还有Skew Join的一种处理方式,将数据分两个阶段处理,倾斜的key数据作为数据源处理,剩下的key的数据再做同样的处理。二者分开做同样的处理

任务执行速度倾斜

产生原因可能是数据倾斜,也可能是执行任务的机器在架构,OS、JVM各节点配置不同或其他原因。
解决方式:设置spark.speculation=true把那些执行时间过长的节点去掉,重新调度分配任务,这个方式和Hadoop MapReduce的speculation是相通的。同时可以配置多长时间来推测执行,spark.speculation.interval用来设置执行间隔进行配置。在源码中默认是配置的100

解决方案

1)增大任务数,减少每个分区数据量:增大任务数,也就是扩大分区量,同时减少单个分区的数据量。
2)对特殊key处理:空值映射为特定Key,然后分发到不同节点,对空值不做处理。
3)广播。
①小数据量表直接广播。
②数据量较大的表可以考虑切分为多个小表,多阶段进行Map Side Join。
4)聚集操作可以Map端聚集部分结果,然后Reduce端合并,减少Reduce端压力。
5)拆分RDD:将倾斜数据与原数据分离,分两个Job进行计算。

 

3.并行度

Spark会根据文件的大小,默认配置Map阶段任务数量,也就是分区数量(也可以通过SparkContext.textFile等方法进行配置)。而Reduce的阶段任务数量配置可以有两种方式

第一种方式:写函数的过程中通过函数的第二个参数进行配置

第二种方式:通过配置spark.default.parallelism来进行配置。它们的本质原理一致,均是控制Shuffle过程的默认任务数量

Spark官方推荐选择每个CPU Core分配2~3个任务,即cpu corenum*2(或3)数量的并行度。
如果并行度太高,任务数太多,就会产生大量的任务启动和切换开销。
如果并行度太低,任务数太小,就会无法发挥集群的并行计算能力,任务执行过慢,同时可能会造成内存combine数据过多占用内存,而出现内存溢出(out of memory)的异常。

 

3. DAG调度执行优化

1)同一个Stage中尽量容纳更多的算子,以减少Shuffle的发生。

由于Stage中的算子是按照流水线方式执行的,所以更多的Transformation放在一起执行能够减少Shuffle的开销和任务启动和切换的开销

2)复用已经cache过的数据。可以使用cache和persist函数将数据缓存在内存,其实可以按单机的方式理解,存储仍然是多级存储,数据存储在访问快的存储设备中,提高快速存储命中率会提升整个应用程序的性能

 

内存存储优化

1.JVM调优

内存调优过程的大方向上有三个方向是值得考虑的。
1)应用程序中对象所占用的内存空间。
2)访问这些内存对象的代价。
3)垃圾回收的开销。
通常状况下,Java的对象访问速度是很快的,但是相对于对象中存储的原始数据,Java对象整体会耗费2~5倍的内存空间。

1)不同的Java对象都会有一个对象头(object header),这个对象头大约为16byte,包含指向这个对象的类的指针等信息,对一些只有少量数据的对象,这是极为不经济的。例如,只有一个Int属性的对象,这个头的信息所占空间会大于对象的数据空间。
2)Java中的字符串(String)占用40byte空间。String的内存是将真正字符串的信息存储在一个char数组中,并且还会存储其他的信息,如字符串长度,同时如果采用UTF-16编码,一个字符就占用2byte的空间。综合以上,一个10字符的字符串会占用超过60byte的内存空间。
3)常用的一些集合类,如LinkedList等是采用链式数据结构存储的,对底层的每个数据项进行了包装,这个对象不只存储数据,还会存储指向其他数据项的指针,这些指针也会产生数据空间的占用和开销。
4)集合类中的基本数据类型常常采用一些装箱的对象存储,如java.lang.Ingeger。装箱与拆箱的机制在很多程序设计语言中都有,Java中装箱意味着将这些基本数据类型包装为对象存储在内存的Java堆中,而拆箱意味着将堆中对象转换为栈中存储的数据。

计算数据在集群内存占用的空间的大小的最好方法是创建一个RDD,读取这些数据,将数据加载到cache,在驱动程序的控制台查看SparkContext的日志。这些日志信息会显示每个分区占用多少空间

调整数据结构

减少对象嵌套;使用数字的ID或者枚举对象;序列化存储RDD;

当内存小于32GB时,官方推荐配置JVM参数-XX:+UseCompressedOops,进而将指针由8byte压缩为4byte。OOP的全称是ordinary object pointer,即普通对象指针。在64位HotSpot中,OOP使用32位指针,默认64位指针会比32位指针使用的内存多1.5倍,启用CompressOops后,会压缩的对象如下。
①每个Class的属性指针(静态成员变量)。
②每个对象的属性指针。
③普通对象数组每个元素的指针。
但是,指向PermGen的Class对象指针、本地变量、堆栈元素、入参、返回值、NULL指针不会被压缩。可以通过配置文件spark-env.sh配置这个参数,从而在Spark中启用JVM指针压缩。

JVM垃圾回收(GC)调优

当Spark程序产生大数据量的RDD时,JVM的垃圾回收就会成为一个问题。当Spark任务的工作内存空间和RDD的缓存数据空间产生干扰时,垃圾回收同样会成为一个问题,可以通过控制分给RDD的缓存来缓解这个问题。GC来说,一个重要的配置参数就是内存给RDD用于缓存的空间大小。默认情况下,Spark用配置好的Executor 60%的内存(spark.executor.memory)缓存RDD。这就意味着40%的剩余内存空间可以让Task在执行过程中缓存新创建的对象。在有些情况下,用户的任务变慢,而且JVM频繁地进行垃圾回收或者出现内存溢出(out of memory异常),这时可以调整这个百分比参数为50%。这个百分比参数可以通过配置spark-env.sh中的变量spark.storage.memoryFraction=0.5进行配置。同时结合序列化的缓存存储对象减少内存空间占用,将会更加有效地缓解垃圾回收问题

度量GC的影响:-verbose:gc-XX:+PrintGCDetails-XX:+PrintGCTime-Stamps

如果任务是从HDFS读取数据,内存空间的占用可以通过从HDFS读取的数据块大小和数量估计。需要注意的是,一般情况下,压缩的数据压缩之后通常为原来数据块大小的2~3倍。因此如果一个JVM中要执行3~4个任务,同时HDFS的数据块大小是64MB,就可以估计需要的Eden代大小是4×3×64MB大小的空间。

OOM的原因还很可能是Shuffle类操作符在任务执行过程中在内存建立的Hash表过大。在这种情况下,可以通过增加任务数,即分区数来提升并行性度,减小每个任务的输入数据,减少内存占用来解决

2.磁盘临时目录空间优化

配置参数spark.local.dir能够配置Spark在磁盘的临时目录,默认是/tmp目录。在Spark进行Shuffle的过程中,中间结果会写入Spark在磁盘的临时目录中,或者当内存不能够完全存储RDD时,内存放不下的数据会写到配置的磁盘临时目录中。这个临时目录设置过小会造成No space left on device异常。也可以配置多个盘块spark.local.dir=/mn1/spark,/mnt2/spar,/mnt3/spark来扩展Spark的磁盘临时目录,让更多的数据可以写到磁盘,加快I/O速度

 

网络传输优化

1.大任务分发优化

在任务的分发过程中会序列化任务的元数据信息,以及任务需要的jar和文件。任务的分发是通过AKKA库中的Actor模型之间的消息传送的。因为Spark采用了Scala的函数式风格,传递函数的变量引用采用闭包方式传递,所以当需要传输的数据通过Task进行分发时,会拖慢整体的执行速度。配置参数spark.akka.frameSize(默认buffer的大小为10MB)可以缓解过大的任务造成AKKA缓冲区溢出的问题,但是这个方式并不能解决本质的问题。

spark.akka.frameSize控制Spark框架内使用的AKKA框架中,Actor通信消息的最大容量(如任务(Task)的输出结果),因为整个Spark集群的消息传递都是通过Actor进行的,默认为10MB。当处理大规模数据时,任务的输出可能会大于这个值,需要根据实际数据设置一个更高的值。如果是这个值不够大而产生的错误,则可以从Worker节点的日志中排查。通常Worker上的任务失败后,主节点Master的运行日志上提示“Lost TID:”,可通过查看失败的Worker日志文件$SPARK_HOME/work/目录下面的日志文件中记录的任务的Serializedsize of result是否超过10MB来确定通信数据超过AKKA的Buffer异常

2.Broadcast在调优场景的使用

Spark的Broadcast(广播)变量对数据传输进行优化,通过Broadcast变量将用到的大数据量数据进行广播发送,可以提升整体速度。Broadcast主要用于共享Spark在计算过程中各个task都会用到的只读变量,Broadcast变量只会在每台计算机器上保存一份,而不会每个task都传递一份,这样就大大节省了空间,节省空间的同时意味着传输时间的减少,效率也高。在Spark的HadoopRDD实现中,就采用Broadcast进行Hadoop JobConf的传输。官方文档的说法是,当task大于20KB时,可以考虑使用Broadcast进行优化,还可以在控制台日志看到任务是多大,进而决定是否优化。还需要注意,每次迭代所传输的Broadcast变量都
会保存在从节点Worker的内存中,直至内存不够用,Spark才会把旧的Broadcast变量释放掉,不能提前进行释放。BroadCast变量有一些应用场景,如MapSideJoin中的小表进行广播、机器学习中需要共享的矩阵的广播等

3.Collect结果过大优化

当收集的最终结果数据过大时,可以将数据存储在分布式的HDFS或其他分布式持久化层上。将数据分布式地存储,可以减小单机数据的I/O开销和单机内存存储压力。或者当数据不太大,但会超出AKKA传输的Buffer大小时,需要增加AKKA Actor的buffer,可以通过配置参数spark.akka.frameSize(默认大小为10MB)进行调整。

 

序列化与压缩

1.通过序列化优化

序列化的本质作用是将链式存储的对象数据,转化为连续空间的字节数组存储的数据

1)对象可以以数据流方式进行进程间传输(包含网络传输),同样可以以连续空间方式存储到文件或者其他持久化层中。
2)连续空间的存储意味着可以进行压缩。这样减少数据存储空间和传输时间。
3)减少了对象本身的元数据信息和基本数据类型的元数据信息的开销。
4)对象数减少也会减少GC的开销和压力。

通过spark.serializer="org.apache.spark.serializer.KryoSerializer"来配置是否使用Kyro进行序列化

Kyro相对于Java序列化库能够更加快速和紧凑地进行序列化(通常有10倍的性能优势),但是Kyro并不能支持所有可序列化的类型,如果对程序有较高的性能优化要求,就需要自定义注册类。官方推荐对于网络传输密集型(network-intensive)计算,采用Kyro序列化性能更好。

如果对象占用空间很大,需要增加Kryo的缓冲区容量,就需要增加配置项spark.kryoserializer.buffer.mb的数值,默认是2MB,但参数值应该足够大,以便容纳最大的序列化后对象的传输。如果用户不注册自定义的类,Kyro仍可以运行,但是它会针对每个对象存储一次整个类名,这样会造成很大的空间浪费。

2.通过压缩方式优化

在Spark应用中,有很大一部分作业是I/O密集型的。数据压缩对I/O密集型的作业带来性能的大大提升,但是如果用户的jobs作业是CPU密集型的,那么再压缩就会降低性能,这就要判断作业的类型,权衡是否要压缩数据。

Spark目前支持LZF和Snappy两种解压缩方式。Snappy提供了更高的压缩速度,LZF提供了更高的压缩比,用户可以根据具体的需求选择压缩方式

 

批处理优化

调用外部资源,如数据库连接等,这些连接通过JDBC或者ODBC与外部数据源进行交互。将单条记录写转化为数据库的批量写,每个分区的数据写一次,这样可以利用数据库的批量写优化减少开销和减轻数据库压力

否则,因为整个RDD的数据项很大,整个集群会在短时间内产生高并发写入数据库的操作,对数据库压力很大,将产生很大的写入开销

 

reduce和reduceByKey的优化

reduce是Action操作,reduceByKey是Transformation操作

reduce是一种聚合函数,可以把各个任务的执行结果汇集到一个节点,还可以指定自定义的函数传入reduce执行。Spark也对reduce的实现进行了优化,可以把同一个任务内的结果先在本地Worker节点执行聚合函数,再把结果传给Driver执行聚合。但最终数据还是要汇总到主节点,而且reduce会把接收到的数据保存到内存中,直到所有任务都完成为止。因此,当任务很多,任务的结果数据又比较大时Driver容易造成性能瓶颈,这样就应该考虑尽量避免reduce的使用,而将数据转化为Key-Value对,并使用reduceByKey实现逻辑,使计算变为分布式计算。
reduceByKey也是聚合操作,是根据key聚合对应的value。同样的,在每一个mapper把数据发送给reducer前,会在Map端本地先合并(类似于MapReduce中的Combiner)。与reduce不同的是,reduceByKey不是把数据汇集到Driver节点,是分布式进行的,因此不会存在reduce那样的性能瓶颈。

 

Shuffle操作符的内存使用

有时候OOM并不是因为内存大小不能够容纳RDD,而是因为执行任务中使用的数据集合太大(如groupByKey)。Spark的Shuffle操作符(sortByKey、groupByKey、reduceByKey、join等都可以算是Shuffle操作符,因为这些操作会引发Shuffle)在执行分组操作的过程中,会在每个任务执行过程中,在内存创建Hash表来对数据进行分组,而这个Hash表在很多情况下通常变得很大。最简单的一种解决
方案就是增加并行度,即增加任务数量和分区数量。这样每轮次每个Executor执行的任务数是固定的,每个任务接收的输入数据变少会减少Hash表的大小,占用的内存就会减少,从而避免内存溢出OOM的发生。
Spark通过多任务复用Worker的JVM,每个节点所有任务的执行是在同一个JVM上的线程池中执行的,这样就减少了线程的启动开销,可以高效地支持单个任务200ms的执行时间。通过这个机制,可以安全地将任务数量的配置扩展到超过集群的整体的CPU core数,而不会出现问题。

共有 人打赏支持
粉丝 5
博文 53
码字总数 47326
×
bigsloth
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: