Spark(Memory)

原创
2017/06/05 10:54
阅读数 220

Memory管理

MemoryAllocator(Spark Tungsten)

  off-heap => org.apache.spark.unsafe.memory.UnsafeMemoryAllocator: spark.unsafe.offHeap=true, default false
  in-heap => org.apache.spark.unsafe.memory.HeapMemoryAllocator


MemoryManager(Spark Tungsten)

  executor level => org.apache.spark.unsafe.memory.ExecutorMemoryManager
  task/thread level => org.apache.spark.unsafe.memory.TaskMemoryManager
  StaticMemoryManager(<1.5)
  UnifiedMemoryManager(>=1.6), 主要管理user memory和spark memory


Java Heap
    Spark System Reserved Memory -> 300M

spark.testing.reservedMemory

    Spark inHeap/offHeap Memory:

spark.memory.fraction
User Memory -> 
     (("Java Heap"–"Reserved Memory")*(1.0–spark.memory.fraction), default("Java Heap"–300MB)*0.25)
        
Spark Memory ->
     (("Java Heap"–"Reserved Memory")*spark.memory.fraction, default("Java Heap"–300MB)*0.75)
            Storage Memory: cached data + “broadcast” variables + temporary space serialized data  
            Execution Memory: storing the objects required during the execution of Spark tasks

 
CPU管理

spark(yarn)集群: spark master node + spark worker node * n
spark(yarn)管理进程(JVM): yarn ResourceManager + yarn HistoryServer + yarn NodeManager * n 
spark(yarn) application进程(JVM): spark driver(yarn client) + Application Master + Container/Executor * n

spark(standalone)集群: spark master node + spark worker node * n
spark(standalone)管理进程(JVM): spark master + spark worker * n 
spark(standalone) application进程(JVM): spark driver + spark executors

每个application(多个JVM)可以包含一个或者多个job(可以认为一个job就是一次action)
每个job会分为多个stage,各个stage按顺序执行(stage的划分主要依据rdd之间的依赖关系)
每个stage由多个task并行或者顺序执行组成
对于并行的task, 一般一rdd对应多个partion, 一个partion对应需要一个task,即一个partition一个线程

使用partitioner的操作
  combineByKey
  aggregateByKey
  groupByKey
  reduceByKey
  cogroup
  join
  leftOuterJoin
  rightOuterJoin
  fullOuterJoin

单个job需要的executor(jvm进程)个数
    spark.executor.instances 
每个spark节点可以起一个或多个executor(jvm进程)
    执行的并行度:Executor数目 * 每个Executor核数  
    
每个executor拥有固定的核数(jvm线程)以及固定大小的堆(Spark User Memory)
    spark.executor.cores: 核数, 虚拟核(非cpu核心数)
    spark.executor.memory: 堆大小, 默认大小512m,代号sparkheap
        spark.storage.memoryFraction: block cache + broadcasts + task results, 默认0.6      
        spark.storage.safetyFraction:默认值0.8
        sparkheap*spark.storage.memoryFraction*spark.storage.safetyFraction
        
        spark.shuffle.memoryFraction: shuffles + joins + sorts + aggregations, 频繁IO需要的buffer, 默认0.2
        spark.shuffle.safetyFraction:默认值0.8
        sparkheap*spark.shuffle.memoryFraction*spark.shuffle.safetyFraction

        spark.storage.unrollFraction:默认值0.2, 序列化和反序列化
        spark.storage.safetyFraction:默认值0.9
        sparkheap *spark.storage.memoryFraction*spark.storage.unrollFraction*spark.storage.safetyFraction

每个task可用的内存通过这个公式计算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction) / spark.executor.cores 。 
(防止OOM, 可以使用spark.shuffle.safetyFraction)

 

硬盘和IO管理

(只讨论HDFS作为输入)

 Spark读取一个HDFS文件,并指定partition分区数
    一个HDFS文件由多个block(每个block可以认为是一个本地文件)组成
    Spark将同一个HDFS文件的若干个Block合并成一个InputSplit(输入分片),Spark依据这些输入分片生成具体的读取task(即一个InputSplit对应一个读task)
    一个InputSplit或者读task的结果是生成目标rdd的一个partition
    一个读task会被分配到某一个executor中,每个executor的虚拟核只会执行一个task
    一个executor的每个线程只能执行一个task(即task并发度最多是Executor数*Executor虚拟核数)
    Map阶段一个rdd的partition数目不变
    Reduce阶段一个rdd的partition数目不确定,依赖具体算子,某些不变,某些变少,某些可配置

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部