文档章节

第12课:Spark Streaming源码解读之Executor容错安全性

 大家风范
发布于 2016/05/22 12:05
字数 1714
阅读 11
收藏 0

这一课我们将从安全角度来考虑sparkstreaming。因为sparkstreaming会不断的接收数据、不断的产生job、不断的提交job。所以有一个至关重要的问题就是数据安全性。由于sparkstreaming是基于sparkcore的,如果我们可以确保数据安全可靠的话(sparkstreaming生产job的时候里面是基于RDD),即使运行的时候出现错误或者故障,也可以基于RDD的容错的能力自动进行恢复。所以要确保数据的安全性。

对于executor的安全容错主要是数据的安全容错。Executor计算时候的安全容错是借助spark core的RDD的,所以天然是安全的。

数据安全性的一种方式是存储一份副本,另一种方式是不做副本,但是数据源支持重放(也就是可以反复的读取数据源的数据),如果之前读取的数据出现问题,可以重新读取数据。

做副本的方式可以借助blockmanager做备份。Blockmanager存储数据的时候有很多storagelevel,Receiver接收数据后,存储的时候指定storagelevel为MEMORY_AND_DISK_SER_2的方式。Blockmanager早存储的时候会先考虑memory,只有memory不够的时候才会考虑disk,一般memory都是够的。所以至少两个executor上都会有数据,假设一个executor挂掉,就会马上切换到另一个executor。

ReceiverSupervisorImpl在存储数据的时候会有两种方式,一种是WAL的方式,究竟是不是WAL得方式是通过配置修改的。默认是false。如果用WAL的方式必须有checkpoint的目录,因为WAL的数据是放在checkpoint的目录之下的。

def enableReceiverLog(conf: SparkConf): Boolean = {
  conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}

Storagelevel是在构建inputDstream的时候传入的,默认就是MEMORY_AND_DISK_SER_2。

* @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */

def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

 

现在来看ReceiverSupervisorImpl在存储数据的另一种方式(副本方式)。注释中说的很清楚,根据指定的storagelevel把接收的blocks交给blockmanager。也就是通过blockmanager来存储。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */

private[streaming] class BlockManagerBasedBlockHandler(
    blockManager: BlockManager, storageLevel: StorageLevel)

 

Blockmanager存储的时候会分为多种不同的数据类型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。

Blockmanager存储数据前面已经讲过了。Receiver在接收到数据后除了在自己这个executor上面存储,还会在另外一个executor上存储。如果一个executor出现问题会瞬间切换到另一个executor。

WAL的方式原理:在具体的目录下会做一份日志,假设后续处理的过程中出了问题,可以基于日志恢复,日志是写在checkpoint下。在生产环境下checkpoint是在HDFS上,这样日志就会有三份副本。

下面就是用WAL存储数据的类,先写日志再交给blockmanager存储。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks in both, a write ahead log and a block manager.
 */

private[streaming] class WriteAheadLogBasedBlockHandler(

 

如果采用WAL的方式,存储数据的时候就不需要有两份副本,这样太浪费内存,如果storagelevel.replication大于1就会打印警告日志。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

这里采用两条线程的线程池,使得blockmanager存储数据和write ahead log可以并发的执行。

 

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
  ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

 

这个是把日志写入WAL中

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
  writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

负责读写WAL的是WriteAheadLog,这是一个抽象类,负责写入、读取、清除数据的功能。在写入数据后会返回一个句柄,以供读取数据使用。

看一下具体写入数据的实现。如果失败并且失败次数小于最大的失败次数就会重试。确实是返回了一个句柄。

/**
 * Write a byte buffer to the log file. This method synchronously writes the data in the
 * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
 * to HDFS, and will be available for readers to read.
 */

def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
  var fileSegment: FileBasedWriteAheadLogSegment = null
  var failures = 0
  var lastException: Exception = null
  var succeeded = false
  while (!succeeded && failures < maxFailures) {
    try {
      fileSegment = getLogWriter(time).write(byteBuffer)
      if (closeFileAfterWrite) {
        resetWriter()
      }
      succeeded = true
    catch {
      case ex: Exception =>
        lastException = ex
        logWarning("Failed to write to write ahead log")
        resetWriter()
        failures += 1
    }
  }
  if (fileSegment == null) {
    logError(s"Failed to write to write ahead log after $failures failures")
    throw lastException
  }
  fileSegment
}

下面就是把数据写入HDFS的代码

/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
  assertOpen()
  data.rewind() // Rewind to ensure all data in the buffer is retrieved
  val lengthToWrite = data.remaining()
  val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
  stream.writeInt(lengthToWrite)
  if (data.hasArray) {
    stream.write(data.array())
  } else {
    // If the buffer is not backed by an array, we transfer using temp array
    // Note that despite the extra array copy, this should be faster than byte-by-byte copy
    while (data.hasRemaining) {
      val array = new Array[Byte](data.remaining)
      data.get(array)
      stream.write(array)
    }
  }
  flush()
  nextOffset stream.getPos()
  segment
}

 

不管是WAL还是直接交给blockmanager都是采用副本的方式。还有一种是数据源支持数据存放,典型的就是kafka。Kafka已经成为了数据存储系统,它天然具有容错和数据副本。

Kafka有receiver和direct的方式。Receiver的方式其实是交给zookeper来管理matadata的(偏移量offset),如果数据处理失败后,kafka会基于offset重新读取数据。为什么可以重新读取?如果程序崩溃或者数据没处理完是不会给zookeper发ack。Zookeper就认为这个数据没有被消费。实际生产环境下越来越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。这就可以保证有且只有一次的容错处理。DirectKafkaInputDstream,它会去看最新的offset,并把这个内容放入batch中。

获取最新的offset,通过最新的offset减去上一个offset就可以确定读哪些数据,也就是一个batch中的数据。

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
  // Either.fold would confuse @tailrec, do it manually
  if (o.isLeft) {
    val err = o.left.get.toString
    if (retries <= 0) {
      throw new SparkException(err)
    } else {
      log.error(err)
      Thread.sleep(kc.config.refreshLeaderBackoffMs)
      latestLeaderOffsets(retries - 1)
    }
  } else {
    o.right.get
  }
}

容错的弊端就是消耗性能,占用时间。也不是所有情况都不能容忍数据丢失。有些情况下可以不进行容错来提高性能。

 

假如一次处理1000个block,但是有1个block出错,就需要把1000个block进行重新读取或者恢复,这也有性能问题。

 1、DT大数据梦工厂微信公众号DT_Spark 王家林
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

 

© 著作权归作者所有

粉丝 0
博文 19
码字总数 18058
作品 0
万州
私信 提问
Spark Streaming源码解析之容错

---title: sparkStreaming源码解析之容错subtitle: sparkStream的数据容错机制description: sparkStream的数据容错思维脑图keywords: [spark,streaming,源码,容错]date: 2018-12-09tags: [s......

freeli
2018/12/07
89
0
探秘Hadoop生态10:Spark架构解析以及流式计算原理

导语 spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,对spark技术的...

你的猫大哥
2017/03/08
0
0
Spark_Streaming源码解析之思维脑图

sparkStreaming源码思维脑图: 脑图详解: 此博文共分为四个部分: DAG定义 Job动态生成 数据的产生与导入 容错 脑图制作参考

freeli
2018/12/07
40
0
Spark及Spark Streaming核心原理及实践

  【IT168 技术】Spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,...

中国大数据
2018/05/31
0
0
7.Spark Streaming

Spark Streaming是Spark核心api的一个拓展,可以实现高吞吐量/具备容错机制的实时流数据的处理 Spark Streaming 与 Spark Core 的关系可以用下面的经典部件图来表述: 基于Spark做Spark Str...

山间浓雾有路灯
06/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

VMware vSphere ESXi主机的访问控制

在vShpere中,访问ESXi主机的途径很多,如下: ESXi DCUI ESXi Shell ESXi SSH ESXi Host Client vCenter --> vSphere web client / vSphere Client VMware vSphere ESXi主机的访问控制,除了......

大别阿郎
22分钟前
3
0
大神讲解CGI、FastCGI和PHP-FPM关系图解

参考资料 概念了解:CGI,FastCGI,PHP-CGI与PHP-FPM:http://www.nowamagic.net/librarys/veda/detail/1319 php中fastcgi和php-fpm是什么东西:https://www.zybuluo.com/phper/note/50231 ......

网络小虾米
31分钟前
3
0
《DNS攻击防范科普系列3》 -如何保障 DNS 操作安全

引言 前两讲我们介绍了 DNS 相关的攻击类型,以及针对 DDoS 攻击的防范措施。这些都是更底层的知识,有同学就来问能否讲讲和我们的日常操作相关的知识点,今天我们就来说说和我们日常 DNS 操...

Mr_zebra
32分钟前
3
0
zk中ServerCnxn

实现接口Stats, Watcher 内部类 DisconnectReason CloseRequestException EndOfStreamException(流关闭) 属性 方法 getSessionTimeout 获取session失效时间 sendResponse 发送回复数据 se......

writeademo
36分钟前
3
0
如何将 Redis 用于微服务通信的事件存储

来源:Redislabs 作者:Martin Forstner 翻译:Kevin (公众号:中间件小哥) 以我的经验,将某些应用拆分成更小的、松耦合的、可协同工作的独立逻辑业务服务会更易于构建和维护。这些服务(也...

中间件小哥
40分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部