文档章节

第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

葛晨鑫
 葛晨鑫
发布于 2016/05/14 15:38
字数 1732
阅读 98
收藏 5

本期内容:

1,DStream与RDD关系彻底研究

2,Streaming中RDD的生成彻底研究

 

RDD是怎么生成的?RDD依靠什么生成?RDD生成的依据是什么?Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同?运行之后我们对RDD怎么处理?

 

RDD本身也是基本的对象,例如说BatchInterval为1秒,那么每一秒都会产生RDD,内存中不能完全容纳该对象。每个BatchInterval的作业执行完后,怎么对已有的RDD进行管理。

 

ForEachDStream不一定会触发Job的执行,和Job的执行没有关系。

Job的产生是由Spark Streaming框架造成的。

foreachRDD是Spark Streaming的后门,可以直接对RDD进行操作。

DStream就是RDD的模板,后面的DStream与前面的DStream有依赖。

 

val lines = jsc.socketTextStream("127.0.0.1", 9999)这里产生了SocketInputDStream。

lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).print()这里由SocketInputDStream转换为FlatMappedDStream,再转换为MappedDStream,再转换为ShuffledDStream,再转换为ForEachDStream。

 

对于DStream类,源码中是这样解释的。

* DStreams internally is characterized by a few basic properties:
*  - A list of other DStreams that the DStream depends on
*  - A time interval at which the DStream generates an RDD
*  - A function that is used to generate an RDD after each time interval

大致意思是:

1.DStream依赖于其他DStream。

2.每隔BatchDuration,DStream生成一个RDD

3.每隔BatchDuration,DStream内部函数会生成RDD

 

DStream是从后往前依赖,因为DStream代表Spark Streaming业务逻辑,RDD是从后往前依赖的,DStream是lazy级别的。DStream的依赖关系必须和RDD的依赖关系保持高度一致。

 

DStream类中generatedRDDs存储着不同时间对应的RDD实例。每一个DStream实例都有自己的generatedRDDs。实际运算的时候,由于是从后往前推,计算只作用于最后一个DStream。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDDs是如何获取的。DStream的getOrCompute方法,先根据时间判断HashMap中是否已存在该时间对应的RDD,如果没有则调用compute得到RDD,并放入到HashMap中。

/**
 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if (isTimeValid(time)) {

      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }

      rddOption.foreach { case newRDD =>
        // Register the generated RDD for caching and checkpointing
        if (storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
        }
        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
        }
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else {
      None
    }
  }
}

 

拿DStream的子类ReceiverInputDStream来说明compute方法,内部调用了createBlockRDD这个方法。

/**
 * Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
  val blockRDD = {
    if (validTime < graph.startTime) {
      // If this is called for any time before the start time of the context,
      // then this returns an empty RDD. This may happen when recovering from a
      // driver failure without any write ahead log to recover pre-failure data.
      new BlockRDD[T](ssc.sc, Array.empty)
    } else {
      // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
      // for this batch
      val receiverTracker = ssc.scheduler.receiverTracker
      val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

      // Register the input blocks information into InputInfoTracker
      val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

      // Create the BlockRDD
      createBlockRDD(validTime, blockInfos)
    }
  }
  Some(blockRDD)
}

createBlockRDD会返回BlockRDD,由于ReceiverInputDStream没有父依赖,所以自己生成RDD。

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
  if (blockInfos.nonEmpty) {
    val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

    // Are WAL record handles present with all the blocks
    val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

    if (areWALRecordHandlesPresent) {
      // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
      val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
      val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
      new WriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
    } else {
      // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
      // others then that is unexpected and log a warning accordingly.
      if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
        if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
          logError("Some blocks do not have Write Ahead Log information; " +
            "this is unexpected and data may not be recoverable after driver failures")
        } else {
          logWarning("Some blocks have Write Ahead Log information; this is unexpected")
        }
      }
      val validBlockIds = blockIds.filter { id =>
        ssc.sparkContext.env.blockManager.master.contains(id)
      }
      if (validBlockIds.size != blockIds.size) {
        logWarning("Some blocks could not be recovered as they were not found in memory. " +
          "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
          "for more details.")
      }
      new BlockRDD[T](ssc.sc, validBlockIds)
    }
  } else {
    // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
    // according to the configuration
    if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
      new WriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, Array.empty, Array.empty, Array.empty)
    } else {
      new BlockRDD[T](ssc.sc, Array.empty)
    }
  }
}

再拿DStream的子类MappedDStream来说,这里的compute方法,是调用父RDD的getOrCompute方法获得RDD,再使用map操作。

private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

从上面两个DStream的子类,可以说明第一个DStream,即InputDStream的comput方法是自己获取数据并计算的,而其他的DStream是依赖父DStream的,调用父DStream的getOrCompute方法,然后进行计算。

 

以上说明了对DStream的操作最后作用于对RDD的操作。

 

接着看下DStream的另一个子类ForEachDStream,发现其compute方法没有任何操作,但是重写了generateJob方法。

private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

 

从Job生成入手,JobGenerator的generateJobs方法,内部调用的DStreamGraph的generateJobs方法。

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    //根据特定的时间获取具体的数据
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //调用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

DStreamGraph的generateJobs方法调用了OutputStream的generateJob方法,OutputStream就是ForEachDStream。

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

 

总结:DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例。DStream的依赖构成了RDD依赖关系,即从后往前计算时,只要对最后一个DStream计算即可。JobGenerator每隔BatchDuration调用DStreamGraph的generateJobs方法,调用了ForEachDStream的generateJob方法,其内部先调用父DStream的getOrCompute方法来获取RDD,然后在进行计算,从后往前推,第一个DStream是ReceiverInputDStream,其comput方法中从receiverTracker中获取对应时间段的metadata信息,然后生成BlockRDD对象,并放入到generatedRDDs中。

 

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

© 著作权归作者所有

葛晨鑫
粉丝 9
博文 25
码字总数 22470
作品 0
杭州
私信 提问
Spark Streaming 是如何提交任务的

赵法宪 大数据爱好者,对部分大数据源码有过研究。熟悉Hdfs, HBase, Spark等相关开发。 Spark Streaming是如何提交任务的? Spark Streaming 是现在实时消息处理的解决方案之一,本文是简单介...

Qunar技术沙龙
2018/06/20
0
0
新手入门:Spark 部署实战入门

Spark简介 整体认识 Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。 Spark在整...

景龙Edward
2016/07/05
16.3K
5
18小时内掌握Spark,全面提升Spark技能!

伴随着大数据相关技术和产业的逐步成熟,继Hadoop之后,Spark技术以其无可比拟的优势,发展迅速,将成为替代Hadoop的下一代云计算、大数据核心技术。   Spark是基于内存,是云计算领域的继...

Spark亚太研究院
2014/06/12
90
0
Spark(五) -- Spark Streaming介绍与基本执行过程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45770881 Spark Streaming作为Spark上的四大子框架之一,肩负着实时流计算的重...

jchubby
2015/05/16
0
0
如何使用spark处理数据集

基本不会编代码,毕设选了“基于测量的在线视频流媒体质量因素分析”,导师要求使用spark进行分析,可是我是个对大数据,机器学习,rdd,hdfs,完全是一头雾水,因为导师和研究生也没有会这个的...

kevinlu
2014/05/15
1K
3

没有更多内容

加载失败,请刷新页面

加载更多

对比ubuntu与centos系统 ​​​​

CentOS与Ubuntu该如何选择,哪个更好用。笔者在自媒体平台收集了一些网友的观点,较为经典,分享给大家。至于应该选择哪个,希望看完本文章后,读者心中有数。 观点1:CentOS适用于服务器,U...

老孟的Linux私房菜
今天
12
0
Java的基本类型

一、Java的基本类型 Java的基本类型有哪些? boolean char byte short int long float double ? 延伸思考 思考一:为什么有的书籍说Java存在第9种数据类型? 实际上,JAVA中还存在另外一种基...

yumoop
今天
14
0
OSChina 周四乱弹 —— 富婆小时候

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @猿敲月下码 :#今日歌曲推荐# 分享陈慧娴的单曲《夜机》 《夜机》- 陈慧娴 手机党少年们想听歌,请使劲儿戳(这里) @開源中國張學友 :早上...

小小编辑
今天
23
0
MyMinimad ── Linux下用libmad写的mp3解码播放程序(四)

优化了内存使用,不再使用mmap映射整个文件到内存 /* * 本程序是从 minimad 改进而来,如要更详细的说明请参看 minimad.c * * Gu Zhou, 2009/12/25, SiChuan University, China *...

代码强国
今天
18
0
Minikube安装

1、下载Minikube二进制文件 sudo curl -Lo minikube https://kubernetes.oss-cn-hangzhou.aliyuncs.com/minikube/releases/latest/minikube-linux-amd64 2、设置权限 sudo chmod +x minikube......

RogueQ
今天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部