文档章节

第13课:Spark Streaming源码解读之Driver容错安全性

葛晨鑫
 葛晨鑫
发布于 2016/05/22 14:18
字数 714
阅读 113
收藏 2

本期内容:

1,ReceivedBlockTracker容错安全性

2,DStreamGraph和JobGenerator容错安全性

从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。

从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。

ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然后在写入内存中。

根据batchTime分配属于当前BatchDuration要处理的数据到timToAllocatedBlocks数据结构中。

Time类的是一个case class,记录时间,重载了操作符,隐式转换,值得借鉴。

case class Time(private val millis: Long) {
  def milliseconds: Long = millis
  def < (that: Time): Boolean = (this.millis < that.millis)
  def <= (that: Time): Boolean = (this.millis <= that.millis)
  def > (that: Time): Boolean = (this.millis > that.millis)
  def >= (that: Time): Boolean = (this.millis >= that.millis)
  def + (that: Duration): Time = new Time(millis + that.milliseconds)
  def - (that: Time): Duration = new Duration(millis - that.millis)
  def - (that: Duration): Time = new Time(millis - that.milliseconds)
  // Java-friendlier versions of the above.
  def less(that: Time): Boolean = this < that
  def lessEq(that: Time): Boolean = this <= that
  def greater(that: Time): Boolean = this > that
  def greaterEq(that: Time): Boolean = this >= that
  def plus(that: Duration): Time = this + that
  def minus(that: Time): Duration = this - that
  def minus(that: Duration): Time = this - that
  def floor(that: Duration): Time = {
    val t = that.milliseconds
    new Time((this.millis / t) * t)
  }
  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }
  def isMultipleOf(that: Duration): Boolean =
    (this.millis % that.milliseconds == 0)
  def min(that: Time): Time = if (this < that) this else that
  def max(that: Time): Time = if (this > that) this else that
  def until(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }
  def to(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }
  override def toString: String = (millis.toString + " ms")
}
object Time {
  implicit val ordering = Ordering.by((time: Time) => time.millis)
}

跟踪Time对象,ReceiverTracker的allocateBlocksToBatch方法中的入参batchTime是被JobGenerator的generateJobs方法调用的。

JobGenerator的generateJobs方法是被定时器发送GenerateJobs消息调用的。

GenerateJobs中的时间参数就是nextTime,而nextTime+=period,这个period就是ssc.graph.batchDuration.milliseconds。

nextTime的初始值是在start方法中传入的startTime赋值的,即RecurringTimer的getStartTime方法的返回值,是当前时间period的(整数倍+1)。

Period这个值是我们调用new StreamingContext来构造StreamingContext时传入的Duration值。

ReceivedBlockTracker会清除过期的元数据信息,从HashMap中移除,也是先写入磁盘,然后在写入内存。

元数据的生成,消费和销毁都有WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了checkpoint目录,才进行WAL机制。

 

对传入的checkpoint目录来创建日志目录进行WAL。

这里是在checkpoint目录下创建文件夹名为receivedBlockMetadata的文件夹来保存WAL记录的数据。

把当前的DStream和JobGenerator的状态进行checkpoint,该方法是在generateJobs方法最后通过发送DoCheckpoint消息,来调用的。

总结:

ReceivedBlockTracker是通过WAL方式来进行数据容错的。

DStreamGraph和JobGenerator是通过checkpoint方式来进行数据容错的。

© 著作权归作者所有

葛晨鑫
粉丝 9
博文 25
码字总数 22470
作品 0
杭州
私信 提问
Spark Streaming源码解析之容错

---title: sparkStreaming源码解析之容错subtitle: sparkStream的数据容错机制description: sparkStream的数据容错思维脑图keywords: [spark,streaming,源码,容错]date: 2018-12-09tags: [s......

freeli
2018/12/07
90
0
探秘Hadoop生态10:Spark架构解析以及流式计算原理

导语 spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,对spark技术的...

你的猫大哥
2017/03/08
0
0
Spark及Spark Streaming核心原理及实践

  【IT168 技术】Spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,...

中国大数据
2018/05/31
0
0
Spark(五) -- Spark Streaming介绍与基本执行过程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45770881 Spark Streaming作为Spark上的四大子框架之一,肩负着实时流计算的重...

jchubby
2015/05/16
0
0
课时17 第三课Spark内部原理剖析与源码阅读(五)

为何spark shuffle比mapreduce shuffle慢? 主要是spark shuffle的shuffle read阶段还不够优秀,它是基于hashmap实现的,shuffle read会把shuffel write阶段已经排序数据给重新转成乱序的,转...

刀锋
2018/12/13
5
0

没有更多内容

加载失败,请刷新页面

加载更多

读书笔记:深入理解ES6 (五)

第五章 解构:使数据访问更便捷 第1节 为什么使用解构功能?   在ES5中,开发者们从对象、数组中获取特定数据并赋值给变量,编写了很多看起来同质化的代码。例如: 1 let options = {2 ...

张森ZS
2分钟前
0
0
CentOS7 yum方式安装MySQL5.7

在CentOS中默认安装有MariaDB,这个是MySQL的分支,但为了需要,还是要在系统中安装MySQL,而且安装完成之后可以直接覆盖掉MariaDB。 1 下载并安装MySQL官方的 Yum Repository [root@localho...

roockee
10分钟前
0
0
Allegro三种自定义设置快捷键的方法

Allegro自定义设置快捷键的三种方法: 1、在Allegro PCB editor 命令窗口直接定义 2、通过修改用户变量env文件来设置快捷键 3、定义笔画为快捷键 1、在Allegro PCB editor 命令窗口直接定义 ...

demyar
15分钟前
0
0
如何做一张能让人眼前一亮的大屏?

作为在职场驰骋的社会人,提到数据可视化大家应该都不陌生了。数据可视化的作用也不用我多说,主要是利用图形化手段,更清晰直观地将数据展示。多层次、交互式的可视化分析能够方便决策者理解...

朕想上头条
15分钟前
0
0
TL138/1808/6748-EthEVM开发板硬件CPU、FLASH、RAM

TL138/1808/6748-EthEVM是广州创龙基于SOM-TL138/1808/6748核心板开发的一款开发板,具有三个网络接口。由于SOM-TL138/1808/6748核心板管脚兼容,所以此三个核心板共用同一个底板。开发板采用...

Tronlong创龙
20分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部