Spark(Storage)

原创
2017/06/17 11:57
阅读数 53

存储管理

    diskstore
    memorystore


一大组件

  BlockManager

 

消息通信(Actor模型)

    master(BlockManager) to slave(BlockManager)
    slave(BlockManager) to master(BlockManager)


    
RDD/Block

  在调度层, RDD由多个partition/bucket构成
  在存储层, RDD又是以block为单位进行存取,
     对于DiskStore,则一个blcok一个物理文件,再由hash map管理id和路径
     对于MemoryStore,则直接由HashMap管理id和block
  在task的rdd.iterator()中,即partition/bucket要进行运算时,会处理通过BlockManager管理block

  (rdd的partition和block为一一对应)

 

spark 中的block是rdd在被task执行之前,其基本组成partition被blockManage映射而来的一种抽象
spark 中,在storage模块里面所有的操作都是和block相关的,但是在RDD里面所有的运算都是基于partition的
如果当前RDD的storage level不是NONE的话,表示该RDD在BlockManager中有存储,那么调用CacheManager中的getOrCompute()函数计算RDD,
在这个函数中partition和block发生了关系:
  首先根据RDD id和partition index构造出block id (rdd_xx_xx),接着从BlockManager中取出相应的block
需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束

(hdfs 中的 block 是存储的最小单元)

spark中的RDD-Cache, Shuffle-output, 以及broadcast的实现都是基于BlockManager来实现, BlockManager提供了数据存储(内存/文件存储)接口.
这里的Block和HDFS中谈到的Block块是有本质区别:
  HDFS中是对大文件进行分Block进行存储,Block大小固定为512M等;
  Spark中的Block是用户的操作单位, 一个Block对应一块有组织的内存,一个完整的文件或文件的区间端,并没有固定每个Block大小的做法;
  
(hdfs 中的 block 是存储的最小单元)

trait BlockDataManager {
  def getBlockData(blockId: String): Option[ManagedBuffer]
  def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
}

Spark中Block类型

    RDDBlock:"rdd_" + rddId + "_" + splitIndex; 即每个RDD block表示一个特定rdd的一个分片
    ShuffleBlock:关于shuffle,在Spark的1.1版本中发布一个sort版本的shuffle,原先的版本为hash,因此两种类型的shuffle也对应了两种数据结构
        Hash版本,ShuffleBlock:"shuffle_" + shuffleId + "" + mapId + "" + reduceId
        Sort版本,对于每一个bucket(shuffleId + "" + mapId + "" + reduceId组合)由ShuffleDataBlock和ShuffleIndexBlock两种block组成
            "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data"
            "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".index"
    BroadcastBlock:"broadcast_" + broadcastId + "_" + field)
    TaskResultBlock:"taskresult_" + taskId;Spark中task运行的结果也是通过BlockManager进行管理
    StreamBlock: "input-" + streamId + "-" + uniqueId应该是用于streaming中
    TempBlock: "temp_" + id

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部