Block Manager

2018/02/26 23:14
阅读数 42

在Spark中,将数据抽象为Block(不论是shuffle数据,还是节点本身存储的数据),而每个driver/executor中的block都是由`BlockManager`这个类来负责管理的。对于block实际的读取,`BlockManager`根据block存储位置的不同是交由`ShuffleManager`, `MemoryStore`或`DiskStore`来实际处理的。`BlockManager`管理的block可能是shuffle后的文件,也可能是缓存的数据。对于shuffle文件中的block是通过`ShuffleManager`来管理的。对于节点缓存的数据,对于保存在内存中的block,由`MemoryStore`来管理,对于保存在磁盘的block,由`DiskStore`来管理。Spark在不开启`spark.shuffle.service.enabled`(设为false)配置的情况下,结构如下:

在开启`spark.shuffle.service.enabled`(设为true)配置的情况下,结构如下:

可见在Spark中,每一个driver/executor节点都有一个`BlockManager`与之对应,用来管理节点数据以及向其他节点请求/返回数据。`BlockManager`中主要与两类节点进行通信:一种是executor中的`BlockManager`需要与driver节点通信,来上报executor中`BlockManager`管理的block,同时接受来自driver节点操作executor上block的指令(图中与executor与driver之间的连线)。另一种是executor节点之间的`BlockManager`需要相互通信,来从彼此获取需要的block(图中executor之间,以及executor与`ExternalShuffleService`之间的连线)。

数据结构

在正式介绍Spark的存储结构之前,先来了解一下Spark存储结构中的数据结构,为下文的介绍打下基础。

BlockManagerId

在Spark中,每个`BlockManager`都有一个唯一的id与之对应,而这个id不是一个普通的string或者long型,而是一个特殊的数据结构`BlockManagerId`:

class BlockManagerId private ( private var executorId_ : String, private var host_ : String, private var port_ : Int) 

可见`BlockManagerId`中保存了executorId(executor节点的唯一标识),host(executor节点的地址),port(`NettyBlockTransferService`的端口(`NettyBlockTransferService`在`BlockManager`中称为ShuffleClient,是用来向其他节点请求/提供Block数据的server,关于`NettyBlockTransferService`的分析见下文))

BlockManagerInfo

`BlockManagerInfo`用来记录`BlockManager`的元数据:

class BlockManagerInfo(
    val blockManagerId: BlockManagerId, timeMs: Long, val maxMem: Long, val slaveEndpoint: RpcEndpointRef) 

其中blockManagerId为上文介绍的`BlockManagerId`类型,timeMs为`BlockManager`注册到driver的时间,maxMem为`BlockManager`中`MemoryManager`管理的最大可用的堆内Storage内存大小(关于`MemoryManager`的分析见【Spark内存管理】),slaveEndpoint为`BlockManager`的`BlockManagerSlaveEndpoint`对应的句柄`RpcEndpointRef`(`BlockManagerSlaveEndpoint`分析见下文)。

BlockId

`BlockId`用来唯一标识Spark中的一个Block:

sealed abstract class BlockId { // 全局唯一的块标识符,用来序列化/反序列化块 def name: String // convenience methods def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None def isRDD: Boolean = isInstanceOf[RDDBlockId] def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] override def toString: String = name override def hashCode: Int = name.hashCode override def equals(other: Any): Boolean = other match { case o: BlockId => getClass == o.getClass && name.equals(o.name) case _ => false } } 

这个类是一个抽象类,在Spark中有10种类型的BlockId,如`RDDBlockId`,`ShuffleBlockId`,`TaskResultBlockId`等。这些子类都继承了`BlockId`并重写了name字段。在Spark中实际就是靠name字段的不同来区分不同类型的`BlockId`。

BlockInfo

`BlockInfo`用来保存Block的元数据:

class BlockInfo(
    val level: StorageLevel, val classTag: ClassTag[_], val tellMaster: Boolean) 

其中level保存了Block的存储级别(存储级别见[rdd-persistence](http://spark.apache.org/docs/latest/programming-guide.html\#rdd-persistence\)),classTag保存了Block用来序列化/反序列化的类型,tellMaster用来标示在Block状态发生变化时是否要上报给driver。

Block数据管理

Block数据管理,包括executor向driver上报`BlockManager`所管理的Block元数据,executor向driver请求获取/更新Block元数据,driver向executor发送指令来删除某个Block,driver向executor返回Block元数据信息等。可见Block数据管理是executor与driver间的相互通信。还记得我们在【Spark Rpc分析】中的介绍,Spark中应用程序要实现节点间的通信只需要实现`RpcEndpoint`类型即可,在这里也不例外。其中在driver节点负责通信的类为`BlockManagerMasterEndpoint`。而在executor节点负责通信的类为`BlockManagerSlaveEndpoint`。这两个类都是在driver/executor启动时注册到各自的`RpcEnv`中,并向其他节点暴露`RpcEndpointRef`句柄。这样executor就可以使用`BlockManagerMasterEndpoint`的`RpcEndpointRef`向driver发送请求,driver可以使用executor `BlockManagerSlaveEndpoint`的`RpcEndpointRef`向executor发送响应。需要注意的是,由于Spark只在driver节点才会建立`TransportServer`,而不会在executor建立`TransportServer`,所以driver是不会主动向executor建立连接发送请求。而只会executor主动向driver建立连接发送请求,driver在收到请求后利用建立的连接向executor发送请求/响应。下面详细分析一下Block数据管理中这两个核心的`RpcEndpoint`类型:

BlockManagerMasterEndpoint

`BlockManagerMasterEndpoint`类型是注册在driver节点上的。在`BlockManagerMasterEndpoint`中维护了几个map用来管理与Block相关的映射关系:

// 保存BlockManagerId到BlockManager的映射
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // 保存executorId到BlockManagerId的映射 private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // 保存BlockId到BlockManagerId set的映射 private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] 

通过blockMangerInfo,driver可以根据`BlockManagerId`找到对应`BlockManager`的元数据`BlockManagerInfo`,而我们知道`BlockManagerInfo`中包含了`BlockManager`与driver通信的`RpcEndpointRef`句柄,driver就可以通过这个与executor的`BlockManager`进行通信。通过blockManagerIdByExecutor,driver可以根据executorId找到对应的`BlockManagerId`。通过blockLocations,driver维护了一个`BlockId`所有副本在集群中的位置,这样在executor发现自身没有某个Block时,可以从driver获取到集群中保存了这个缺少的Block的节点列表,之后executor就可以向保存了Block的节点发送请求来获取Block数据。

BlockManagerSlaveEndpoint

`BlockManagerSlaveEndpoint`类是executor或driver上的`BlockManager`用来接收来自driver `BlockManagerMasterEndpoint`发来的指令的类。主要用来删除`BlockManager`管理的Block,以及返回Block状态等,`BlockManagerSlaveEndpoint`可以处理的消息类型很多,这里我们以`RemoveBlock`消息为例:

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => doAsync[Boolean]("removing block " + blockId, context) { blockManager.removeBlock(blockId) true } } private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) { val future = Future { logDebug(actionMessage) body } future.onSuccess { case response => logDebug("Done " + actionMessage + ", response is " + response) context.reply(response) logDebug("Sent response: " + response + " to " + context.senderAddress) } future.onFailure { case t: Throwable => logError("Error in " + actionMessage, t) context.sendFailure(t) } } 

可见其中的很多方法都调用了`BlockManagerSlaveEndpoint.doAsync`方法,用来实现异步调用,在处理完成时,调用对应的`Future.onSuccess`或`Future.onFailure`方法向driver上报处理结果。

BlockManagerMaster

在Spark中,executor/driver的`BlockManager`在向driver的`BlockManagerMasterEndpoint`发送请求时,不是直接使用`BlockManagerMasterEndpoint`的句柄`RpcEndpointRef`来发送,而是调用了`BlockManagerMaster`这个类来发送请求。`BlockManagerMaster`可以看做是对发送方法做了一层封装,其中的方法绝大部分是阻塞请求,即在发送请求后需要等待driver返回响应才会返回。

总结一下Block数据管理,Block数据管理是executor/driver中的`BlockManager`需要将管理的Block信息上报给driver,在driver中维护了集群中所有Block与executor对应关系,Block所在位置等信息。集群中节点都是通过driver保存的Block信息来实现集群Block相互发现的。Block数据管理主要是网络通信,其中`BlockManager`使用`BlockManagerMaster`来向driver发送请求。driver使用`BlockManagerMasterEndpoint`来接收来自`BlockManager`的请求,以及向`BlockManager`发送响应/指令。`BlockManager`使用`BlockManagerSlaveEndpoint`来接收来自driver的指令。他们的关系如下图所示:

`BlockManager`通过`BlockManagerMaster`将Block元数据上报给driver的`BlockManagerMasterEndpoint`,driver使用`BlockManagerSlaveEndpoint`的句柄向`BlockManager`发送指令。需要注意的是driver不会主动与executor的`BlockManager`建立连接,只有当executor的`BlockManager`主动与driver建立连接发送请求后,driver才可以使用这个建立的连接向executor的`BlockManager`发送指令。

Block数据传输

`BlockManager`管理的Block按照产生来源可以分为两大类:一类是缓存类型的数据(不论是存在内存中还是磁盘上),一类是在进行map-reduce操作时,map阶段生成的shuffle数据。对于缓存类型的Block,可以保存在内存中,也可以保存在磁盘上。而对于shuffle类型的Block,都是保存在磁盘上。Spark使用`MemoryStore`来管理保存在内存的数据(数据的增删改查),使用`DiskStore`来管理保存在磁盘的数据(数据的增删改查)。`BlockManager`针对Block的在内存或磁盘的管理实质上都是委托给`MemoryStore`和`DiskStore`来实现的。

ShuffleClient

在Spark的`BlockManager`中`ShuffleClient`是executor用来从其他executor获取shuffle后的map文件。

`ShuffleClient`是一个抽象类,在Spark中有两种实现:`NettyBlockTransferService`和`ExternalShuffleClient`。还记得我们之前所说,`BlockManager`管理了两类数据:缓存数据和shuffle数据。其中`NettyBlockTransferService`不仅是可以请求/返回shuffle数据,还用来请求/返回缓存数据(如果整个action都没有进行缓存,则最终会触发从数据源获取数据)。而`ExternalShuffleClient`是与`ExternalShuffleService`搭配使用的。 `ExternalShuffleClient`只用来向`ExternalShuffleService`请求shuffle数据。而`ExternalShuffleService`是每台机器在启动时启动的server,用来管理这台机器上所有executor产生的shuffle数据。

那么Spark中为何要实现这两种`ShuffleClient`呢,都是用`NettyBlockTransferService`不就好了?这里我们要说明一下,`NettyBlockTransferService`的生命周期与其所在的executor的生命周期相同,也就是说当executor退出时,`NettyBlockTransferService`也就退出了,不能再向其他节点返回其所管理的shuffle数据或缓存数据。而`ExternalShuffleClient`只是用来向`ExternalShuffleService`请求数据的客户端,虽然`ExternalShuffleClient`随着executor的退出而退出,但是由于shuffle数据是单独由独立于executor的`ExternalShuffleService`管理的,所以其他节点的`ExternalShuffleClient`还可以向`ExternalShuffleService`来获取数据。也就是说`NettyBlockTransferService`可以看做是内嵌在executor中的一个服务器,而`ExternalShuffleClient`配套使用的`ExternalShuffleService`可以看做是独立于executor的服务器,可以在executor退出后依旧提供服务。这里可能读者会有疑问了,executor难道不都是在整个job执行完成后退出的吗,executor退出了,job不也执行完成了吗,使用独立于executor的服务器的意义何在呢?

我们知道Spark的几种集群部署方式(standalone,yarn,mesos)都是支持资源的动态调整的(可以根据集群情况动态的增加或减少executor数量),也就是说在job没有执行完之前,之前使用的executor就可能退出了。这样就会遇到当集群资源充足需要回收executor时,导致executor上的数据丢失。对于executor的`BlockManager`中缓存的数据还好,可以在下次计算时从上游计算出来再次保存到新的executor上。而对于shuffle类型的数据,executor的退出导致shuffle数据的丢失对性能的影响就比较大了。我们知道map-reduce操作是一个极其耗时间的操作,如果有shuffle的文件丢失,会导致shuffle到这个退出节点的executor的操作重算。为了避免这种情况,Spark提供了独立于executor的外部shuffle管理器`ExternalShuffleService`(这个管理器在每个机器上启动一个,而不是每一个executor上,详见[dynamic-resource-allocation](http://spark.apache.org/docs/latest/job-scheduling.html\#dynamic-resource-allocation\))。如果配置了\`spark.shuffle.service.enabled=true\`,则在\`BlockManager\`创建时,会将\`BlockManager\`的\`DiskStore\`管理的目录相关信息上报给机器上的\`ExternalShuffleService\`,目录信息最终会保存在\`ExternalShuffleBlockResolver\`的map中。调用链路如下:

BlockManager.registerWithExternalShuffleService -> ExternalShuffleClient.registerWithShuffleServer -> 向ExternalShuffleService发送RegisterExecutor消息 -> ExternalShuffleBlockHandler.receive -> ExternalShuffleBlockResovler.registerExecutor

最终消息是保存在`ExternalShuffleBlockResovler`中的executors这个map中:

final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;

public static class AppExecId { public final String appId; public final String execId; } public class ExecutorShuffleInfo implements Encodable { /** The base set of local directories that the executor stores its shuffle files in. */ public final String[] localDirs; /** Number of subdirectories created within each localDir. */ public final int subDirsPerLocalDir; /** Shuffle manager (SortShuffleManager) that the executor is using. */ public final String shuffleManager; } 

而在shuffle阶段,在执行reduce任务的executor会调用`BlockManager.shuffleClient`来向执行map任务的executor获取shuffle文件,这时就会根据`ShuffleClient`的不同,来决定是调用`NettyBlockTransferService`中的方法还是`ExternalShuffleClient`中的方法来从其他executor还是机器单独的`ExternalShuffleService`获取shuffle文件。

Block保存

在`BlockManager`中提供了序列化数据的保存方法:`putBytes`,非序列化数据的保存方法:`putIterator`以及Block数据的保存方法:`putBlockData`三种方法,其中`putBlockData`实际就是调用`putBytes`方法。

上文我们提到,Spark可以通过配置来设置数据的保存级别(详见[rdd-persistence](http://spark.apache.org/docs/latest/programming-guide.html\#rdd-persistence\))

在保存序列化数据的方法`putBytes`方法中,直接调用了`doPutBytes`,所以让我们直接看一下`doPutBytes`方法:

private def doPutBytes[T]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { // 省略非关键代码和异常检查 doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => // 由于存储的是bytes(序列化的数据),这里在将数据存储到本地之前就将副本写出 // 由于数据已经是序列化的,所以发送数据会很快 val replicationFuture = if (level.replication > 1) { Future { replicate(blockId, bytes, level, classTag) }(futureExecutionContext) } else { null } val size = bytes.size // 存储级别使用内存存储 if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. // 如果存储时需要反序列化,则首先反序列化数据,并调用memoryStore的putIteratorAsValues方法 val putSucceeded = if (level.deserialized) { val values = serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) memoryStore.putIteratorAsValues(blockId, values, classTag) match { case Right(_) => true case Left(iter) => // If putting deserialized values in memory failed, we will put the bytes directly to // disk, so we don't need this iterator and can close it to free resources earlier. iter.close() false } } else { // 不需要反序列化则调用memoryStore的putBytes直接存储 memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes) } // 如果保存到内存上失败,并且存储级别包括磁盘,则将数据直接存到磁盘上(序列化的数据) if (!putSucceeded && level.useDisk) { diskStore.putBytes(blockId, bytes) } } else if (level.useDisk) { // 如果存储级别只包括磁盘,则直接存到磁盘上 diskStore.putBytes(blockId, bytes) } // 获取存储状态,如果成功向master会报存储状态 val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { info.size = size if (tellMaster) { reportBlockStatus(blockId, info, putBlockStatus) } } // 如果需要写副本,则在这里等待上文调用方法的回调 if (level.replication > 1) { // Wait for asynchronous replication to finish Await.ready(replicationFuture, Duration.Inf) } }.isEmpty } 

可见`doPutBytes`流程如下:

  1. 如果当前配置的存储级别为需要副本,则首先启动一个异步线程在集群中随机选取另一个节点,将数据备份到另一个节点上

  2. 如果当前配置的存储级别包含内存存储,则首先尝试将数据保存到内存中。如果数据在内存中需要以非序列化形式保存,则要先将数据反序列化。如果保存到内存失败(如内存空间不足),且存储级别包含磁盘存储,则将数据保存到磁盘上

  3. 如果当前配置的存储级别只包含磁盘存储,则直接将数据写到磁盘上

  4. 在保存成功后,如果需要向driver节点上报保存结果,则需要将保存的Block信息上报给driver

  5. 如果当前配置的存储级别为需要副本,则在方法返回前需要等待数据备份的完成

在保存非序列化的方法`putIterator`中,实际是调用`doPutIterator`方法:

private def doPutIterator[T]( blockId: BlockId, iterator: () => Iterator[T], level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { // 省略非关键代码和异常检查 doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. if (level.deserialized) { memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match { case Right(s) => size = s case Left(iter) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(iter) } } } else { // !level.deserialized // 如果需要序列化存储,则调用memoryStore的putIteratorAsBytes方法 memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { case Right(s) => size = s case Left(partiallySerializedValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { diskStore.put(blockId) { fileOutputStream => partiallySerializedValues.finishWritingToStream(fileOutputStream) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator) } } } } else if (level.useDisk) { // 首先将数据序列化,然后在存到磁盘上 diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag) } size = diskStore.getSize(blockId) } val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. info.size = size if (tellMaster) { reportBlockStatus(blockId, info, putBlockStatus) } if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) try { replicate(blockId, bytesToReplicate, level, classTag) } finally { bytesToReplicate.dispose() } } } iteratorFromFailedMemoryStorePut } } 

可见`doPutIterator`的流程如下:

  1. 如果当前配置的存储级别包含内存存储,则首先尝试将数据保存到内存中。如果内存中数据需要序列化存储,则首先将数据序列化,否则直接尝试将数据保存到内存。如果保存失败(说明内存空间不足),且当前配置的存储级别包含磁盘存储,则首先将数据序列化后,再保存到磁盘

  2. 如果当前存储级别只包含内存存储,则将数据序列化后保存到磁盘

  3. 在数据保存成功后,如果需要将结果上报给driver,则将保存的Block信息上报给driver

  4. 在数据保存成功后,如果配置的存储级别需要备份,则需要在集群中随机选择另一个节点,将数据备份到另一个节点上

无论是`doPutBytes`还是`doPutIterator`方法中,我们看到实际都是调用了`MemoryStore`或`DiskStore`类来保存数据。对于这两个类以及对应的内存存储/磁盘存储分析见文章【内存存储】和【磁盘存储】。存储级别详见[rdd-persistence](http://spark.apache.org/docs/latest/programming-guide.html\#rdd-persistence\)

Block读取

`BlockManager`提供了五种获取Block的方法:`getBlockData`(获取本地的Block数据),`getLocalValues`(获取本地的非序列化数据),`getLocalBytes`(获取本地的序列化数据),`getRemoteValues`(获取远端的非序列化数据),`getRemoteBytes`(获取远端的序列化数据)。其中`getBlockData`实际是调用了`getLocalBytes`方法,而`getRemoteValues`方法实际是在调用`getRemoteBytes`方法获取序列化数据后,进行反序列化并返回。所以下面我们主要看一下`getLocalValues`,`getLocalBytes`和`getRemoteBytes`这三个方法。

首先我们看一下`getLocalValues`方法:

def getLocalValues(blockId: BlockId): Option[BlockResult] = { // 省略非关键代码和异常检查 val level = info.level if (level.useMemory && memoryStore.contains(blockId)) { val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { serializerManager.dataDeserializeStream( blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { val diskValues = serializerManager.dataDeserializeStream( blockId, diskBytes.toInputStream(dispose = true))(info.classTag) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) .map {_.toInputStream(dispose = false)} .getOrElse { diskBytes.toInputStream(dispose = true) } serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { handleLocalReadFailure(blockId) } } 

可见`getLocalValues`流程如下:

  1. 如果配置的存储级别包含内存存储,且内存中存在查询的Block,则直接从内存查询。如果内存中的数据是以序列化方式保存的,则在返回之前需要反序列化数据,否则直接返回

  2. 如果配置的存储级别只包含磁盘存储,且磁盘中存在查询的Block,则直接从磁盘查询。在查出数据后,需要将数据缓存到内存中。如果内存中的数据以非序列化的方式保存,则首先需要将查出的数据序列化,否则直接保存到内存中。最后将查出的数据反序列化后返回

让我们在看一下`getLocalBytes`方法:

def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { // 省略非关键代码和异常检查 // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. Option(new ChunkedByteBuffer( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } } private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = { val level = info.level // In order, try to read the serialized bytes from memory, then from disk, then fall back to // serializing in-memory objects, and, finally, throw an exception if the block does not exist. if (level.deserialized) { // Try to avoid expensive serialization by reading a pre-serialized copy from disk: if (level.useDisk && diskStore.contains(blockId)) { // Note: we purposely do not try to put the block back into memory here. Since this branch // handles deserialized blocks, this block may only be cached in memory as objects, not // serialized bytes. Because the caller only requested bytes, it doesn't make sense to // cache the block's deserialized objects since that caching may not have a payoff. diskStore.getBytes(blockId) } else if (level.useMemory && memoryStore.contains(blockId)) { // The block was not found on disk, so serialize an in-memory copy: serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get) } else { handleLocalReadFailure(blockId) } } else { // storage level is serialized if (level.useMemory && memoryStore.contains(blockId)) { memoryStore.getBytes(blockId).get } else if (level.useDisk && diskStore.contains(blockId)) { val diskBytes = diskStore.getBytes(blockId) maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes) } else { handleLocalReadFailure(blockId) } } } 

在`getLocalBytes`方法中首先会判断查询的Block是否是shuffle类型的Block。如果是shuffle类型的Block,则直接调用`IndexShuffleBlockResolver`来查询(关于`IndexShuffleBlockResolver`的详细介绍见文章【Shuffle详解】)。如果不是shuffle类型的Block,则调用`doGetLocalBytes`方法。`doGetLocalBytes`方法流程如下:

  1. 如果内存中的存储级别为非序列化,且如果当前配置的存储级别包含磁盘存储,且磁盘中包含查询的Block,则直接从磁盘查询并返回(这里直接查磁盘,而不是内存是为了减少从内存读取时序列化数据的时间)

  2. 如果内存中存储的级别为非序列化,且只有内存中包含查询的Block,则从内存中查询数据,并进行序列化后返回

  3. 如果内存中的存储级别为序列化,且在内存中包含查询的Block,则直接读取内存中的数据并返回。否则如果存储级别包含磁盘存储,且磁盘中包含Block,则从磁盘中读取数据,并将数据缓存到内存中,并返回

最后让我们看一下`getRemoteBytes`方法:

def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { // 省略非关键代码和异常检查 var runningFailureCount = 0 var totalFailureCount = 0 val locations = getLocations(blockId) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { val loc = locationIterator.next() // 直接通过一次网络请求获取到需要的blockId的数据 val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { case NonFatal(e) => runningFailureCount += 1 totalFailureCount += 1 // 如果失败次数超过了含有块的地址数量,则直接抛出异常 if (totalFailureCount >= maxFetchFailures) { // Give up trying anymore locations. Either we've tried all of the original locations, // or we've refreshed the list of locations from the master, and have still // hit failures after trying locations from the refreshed list. throw new BlockFetchException(s"Failed to fetch block after" + s" ${totalFailureCount} fetch failures. Most recent failure cause:", e) } // If there is a large number of executors then locations list can contain a // large number of stale entries causing a large number of retries that may // take a significant amount of time. To get rid of these stale entries // we refresh the block locations after a certain number of fetch failures // 如果失败次数超过了规定的最大尝试次数,则重新获取块的位置信息 if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { locationIterator = getLocations(blockId).iterator runningFailureCount = 0 } // This location failed, so we retry fetch from a different one by returning null here null } if (data != null) { return Some(new ChunkedByteBuffer(data)) } } None } 

可见`getRemoteBytes`的执行流程如下:

  1. 首先调用`getLocations`方法,从driver节点获取保存了查询的Block的节点列表,之后选择第一个节点调用`BlockTransferService.fetchBlockSync`方法向这个节点请求数据,如果请求成功则直接返回

  2. 如果请求失败,重新向driver请求保存了查询的Block的节点列表,然后执行`BlockTransferService.fetchBlockSync`方法,请求数据

  3. 如果重试次数超过重试上限,则直接返回空数据

总结一下Block数据传输:在Spark中,`BlockManager`管理两类的数据,即shuffle数据和缓存数据。其中shuffle数据来自Spark的shuffle操作(也称为map-reduce操作)中map阶段产生的shuffle文件,缓存数据来自用户程序中调用cache或checkpoint方法时将来自上游RDD计算出的数据缓存到本地内存或磁盘,以便在之后迭代时,无需再执行上游RDD生成数据的计算过程。由于`BlockManager`随着所在的executor退出而终止,所以在可以动态分配资源的集群中(如standalone,yarn,mesos),Spark可以在每台机器上启动一个专门管理shuffle数据的服务`ExternalShuffleService`,这样所有读取shuffle文件的操作实际都通过向`ExternalShuffleService`请求来获取的。在`BlockManager`中提供了对于非序列化数据/序列化数据的读写接口,并通过底层的`MemoryStore`和`DiskStore`来实际保存和读取数据。在`BlockManager`中如果设置了需要副本的存储级别,则在保存数据到本节点的同时,需要向集群的另外一个节点备份写入的数据。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部