文档章节

基于案例贯通Spark Streaming流计算框架运行源码12

柯里昂
 柯里昂
发布于 2016/05/12 15:24
字数 1042
阅读 60
收藏 2

上文提到Job已经创建完成。

先贴案例

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
object StreamingWordCountSelfScala {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据
    val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口
    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce
    words.print() // 打印结果
    ssc.start() // 启动
    ssc.awaitTermination()
    ssc.stop(true)
  }
}

再来回溯下触发过程。

定时器定时触发执行某个方法。这里是 longTime => eventLoop.post(GenerateJobs(new Time(longTime))),将一个 GenerateJobs 类型的事件消息发送到 eventLoop的 队列中。

// JobGenerator.scala line 58
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

另一方便,eventLoop一直循环取出队列中的事件消息,当取出 GenerateJobs类型的事件消息时。会调用onReceive(event) 。

// EventLoop.scala line 48
    onReceive(event)

此时的onReceive(event)在 JobGenerator实例化 eventLoop时已经override了。

// JobGenerator.scala line 87
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

调用generatorJobs(time)

// JobGenerator.scala line 181
      case GenerateJobs(time) => generateJobs(time)

graph.generateJobs

// JobGenerator.scala line 248
graph.generateJobs(time)

通过outputStream.generateJob 还原出RDD的整个依赖,并创建出Job。这个outputStream就是ForEachDStream。

// DStreamGraph.scala line 115
        val jobOption = outputStream.generateJob(time)
在本案例中,按照 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream 的依赖关系
调用parent.getOrCompute,此getOrCompute只在DStream中有定义,所有子类都没重写过此方法。
在此方法中,会调用当前DStream的compute方法,而compute中又调用了parent.getOrCompute,同时将当前的DStream的func加入到串联的RDD之后。

一直循环,直到inputStream,本例中为SocketInputDStream的compute被执行,实际上执行的是ReceiverInputDStream.compute,创建出BlockRDD。

至此整个RDD被还原出来。作为参数传入Job的构造中。 

至此Job创建成功,但是此Job为Spark Core中的Job,而且也并没有被提交到spark集群中。

获取给定时间对应的输入数据的信息,此时得到的都是元数据,即输入数据的元数据。

再创建成JobSet,并提交JobSet

// JobGenerator.scala line 251
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

jobScheduler.submitJobSet

交由jobExecutor线程池来处理,这里显然可以推测出,JobHandler一定是一个Runnable或者Callable接口的实现。

另外jobExecutor默认的线程数量是1,从并发性考虑,建议与outputStreams的数量保持一致:DStreamGraph.outputStreams.size

// JobScheduler.scala line 122
  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }
JobHandler中封装的run方法
  1. 发送JobStarted事件消息,用于监控
  2. job.run,真正的Job提交,注意,这里的Job提交是指提交Streaming的Job到Spark 集群,类似普通Spark程序将RDD提交给Spark集群运行
// JobScheduler.scala line 202
    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }

job.run

// Job.scala line 38
  def run() {
    _result = Try(func())
  }

执行func(),而此时的func就是在ForEachDStream中封装Job的第二个参数。

在本例中,即为

() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)

至于如何推导出此RDD,可参考前文

读者们,至此,是否有很熟悉的感觉,很明显,上面的代码就是一个函数,函数没有参数,方法体中,执行的代码中,从new BlockRDD开始,就是我们普通的Spark的程序:新建RDD,然后一连串transform,最后将结果交给foreachFunc 处理。

由此,SparkStreaming最终是转变为普通的Spark Application来提交给Spark 集群来执行。是否也可以理解Spark Streaming其实就是Spark 的一个应用程序。而已。

 

至此,Spark Streaming的主体运行框架已经过滤完。

 

© 著作权归作者所有

柯里昂
粉丝 25
博文 175
码字总数 83106
作品 0
徐汇
技术主管
私信 提问
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第1节②

三、你为什么需要Spark; 你需要Spark的十大理由: 1,Spark是可以革命Hadoop的目前唯一替代者,能够做Hadoop做的一切事情,同时速度比Hadoop快了100倍以上: Logistic regression in Hadoo...

Spark亚太研究院
2014/12/16
432
2
Spark Streaming入门

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。 什么...

腾讯云加社区
2018/05/16
0
0
什么是 Apache Spark?大数据分析平台如是说

自从 Apache Spark 2009 年在 U.C. Berkeley 的 AMPLab 默默诞生以来,它已经成为这个世界上最重要的分布式大数据框架之一。Spark 可以用多种方式部署,它为 Java、Scala、Python,和 R 编程...

oschina
2017/11/22
713
0
如何成为云计算大数据Spark高手

Spark采用一个统一的技术堆栈解决了云计算大数据的如流处理、图技术、机器学习、NoSQL查询等方面的所有核心问题,具有完善的生态系统,这直接奠定了其一统云计算大数据领域的霸主地位; 要想...

Spark亚太研究院
2014/06/13
2.1K
3
教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据
2018/05/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

CRM、DMP、CDP都是什么?有什么区别?

Markter对CRM系统(Customer Relationship Management System,客户关系管理系统),营销自动化等概念都已经比较熟悉,也许DMP(Data Management Platform,数据管理平台)也多多少少有些了解。...

怡海软件-CRM
8分钟前
1
0
中台是什么,到底要解决什么问题?

故事的开始 这个最早由阿里在2015年提出的“大中台,小前台”战略中延伸出来的概念,最近在国内大热。阿里、腾讯、百度、京东、美团、滴滴等一众互联网巨头,从去年到今年,接连开始组织架构...

喵二狸
19分钟前
2
0
Linux Centos 7 - MySQL 5.7离线安装

内部网络通过离线包的方式进行安装。 一、下载 下载地址:https://dev.mysql.com/downloads/mysql/ 进入页面后,点击右侧链接。 下载对应版本。 通过xftp6等工具上传到服务器上。 二、安装和...

华山猛男
20分钟前
2
0
EventBus 3 全解

EventBus 3 全解 [TOC] 使用 一个基于观察者模式的事件发布/订阅框架. 用于模块间通信和解耦, 使用方便,性能高. 基本使用 1. gradle导入依赖库 implementation 'org.greenrobot:eventbus:3....

马湖村第九后羿
22分钟前
3
0
HTTP 协议

什么是HTTP协议? HTTP是hypertext transport protocol的缩写,即超文本传输协议。 是用于万维网服务器与本地浏览器之间传输超文本的传送协议。可以使浏览器更加高效,使网络传输减少。能够保...

彩色泡泡糖
32分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部