文档章节

Spark 之SparkContext 源码精读2

柯里昂
 柯里昂
发布于 2016/04/05 08:48
字数 858
阅读 181
收藏 1
点赞 1
评论 0

前文说到SparkContext的3大核心对象被创建。

分别是TaskScheduler、SchedulerBackend、DAGScheduler。

 

这三大对象创建完成后,紧接着,调用了TaskScheduler的start方法。

// SparkContext.scala line 527

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()

上面的注释也很好的说明了,在调用TaskScheduler.start 前,需要在DAGScheduler的构造中,设置TaskScheduler的DAGScheduler的引用。这个在前文创建DAGScheduler的时候已经说明,他们之间是相互引用的。

 

当TaskScheduler.start被调用时,我们看看具体发生了什么。

// TaskSchedulerImpl.scala line 143
override def start() {
  backend.start()

  if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    logInfo("Starting speculative execution thread")
    speculationScheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
        checkSpeculatableTasks()
      }
    }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
}

首先:beakend.start被调用了。这里提到,TaskScheduler中有ShedulerBackend的引用。我们也认为SchedulerBackend就是TaskScheduler的Backend。因此在start的时候,Backend要先start。这样应该会容易理解。

那跟踪进SchedulerBackend的start方法中瞧瞧吧。这里的SchedulerBackend是SparkDeploySchedulerBackend的实例。

// SparkDeploySchedulerBackend.scala line 52
override def start() {
  super.start()
  launcherBackend.connect()

  // The endpoint for executors to talk to us
  val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
    RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
    CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
  val args = Seq(
    "--driver-url", driverUrl,
    "--executor-id", "{{EXECUTOR_ID}}",
    "--hostname", "{{HOSTNAME}}",
    "--cores", "{{CORES}}",
    "--app-id", "{{APP_ID}}",
    "--worker-url", "{{WORKER_URL}}")
  val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
    .map(Utils.splitCommandString).getOrElse(Seq.empty)
  val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
    .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
  val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
    .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

  // When testing, expose the parent class path to the child. This is processed by
  // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
  // when the assembly is built with the "*-provided" profiles enabled.
  val testingClassPath =
    if (sys.props.contains("spark.testing")) {
      sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
    } else {
      Nil
    }

  // Start executors with a few necessary configs for registering with the scheduler
  val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  val javaOpts = sparkJavaOpts ++ extraJavaOpts
  val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
    args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
  val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
    command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
  client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start()
  launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  waitForRegistration()
  launcherBackend.setState(SparkAppHandle.State.RUNNING)
}

在start方法中,先调用了super.start,上文可知,super是CoarseGrainedSchedulerBackend。

在父类的start方法中,除了设置spark.开头的配置外,做了一件很重要的事情:创建了一个Endpoint,为DriverEndpoint。

// CoarseGrainedSchedulerBackend.scala line 303

override def start() {
  val properties = new ArrayBuffer[(String, String)]
  for ((key, value) <- scheduler.sc.conf.getAll) {
    if (key.startsWith("spark.")) {
      properties += ((key, value))
    }
  }

  // TODO (prashant) send conf instead of properties
  driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) // 创建DriverEndpoint并向RpcEnv注册
}

Endpoint的名字叫:CoarseGrainedScheduler

// CoarseGrainedSchedulerBackend.scala line 510

private[spark] object CoarseGrainedSchedulerBackend {
  val ENDPOINT_NAME = "CoarseGrainedScheduler"
}

而这里创建的DriverEndpoint是实现了ThreadSafeRpcEndpoint trait,而ThreadSafeRpcEndpoint 又继承了 RpcEndpoint trait。Spark Rpc通信请移步于此

// CoarseGrainedSchedulerBackend.scala line 303

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
  new DriverEndpoint(rpcEnv, properties)
}

了解了生命周期之后,我们再回到 DriverEndpoint的部分。当DriverEndpoint创建之后,按照生命周期顺序,下一步就会调用onStart方法。

在onStart方法中,是每隔一段时间发送ReviveOffers 类型的消息,默认是每隔1秒。

// CoarseGrainedSchedulerBackend.scala line 95

override def onStart() {
  // Periodically revive offers to allow delay scheduling to work
  val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

  reviveThread.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
      Option(self).foreach(_.send(ReviveOffers))
    }
  }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
// org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessage.scala line 67
// Internal messages in driver
case object ReviveOffers extends CoarseGrainedClusterMessage

这个类型的消息挺有意思,上面注释也很明确的告知,driver内部的消息

可以这样理解,Driver启动之后,会定时掐一下自己,给自己发送一个ReviveOffers的消息。这里的send方法是self的方法,回忆Spark Rpc通信的内容,可知self是一个RpcEndPointRef对象。

 

那这个消息发送之后,由DriverEndpoint的receive接收后处理

// CoarseGrainedSchedulerBackend.scala line 122

case ReviveOffers =>
  makeOffers()

至此,DriverEndpoint创建完成,进入运行时状态了。同时CoarseGrainedSchedulerBackend也创建完成。具体的运行时,咱后续再分析。

 

下篇将介绍Driver向Master注册信息

 

© 著作权归作者所有

共有 人打赏支持
柯里昂
粉丝 24
博文 163
码字总数 78966
作品 0
徐汇
技术主管
1、Spark预编译版本下载安装与启动

1、下载 2、安装 3、hadoop 4、spark-shell scala> file.first() 5、Resilient Distributed Dataset 6、通过编译方式安装spark 轻量级高速集群计算。针对大规模的数据处理快速通用的引擎。比...

chenkangyao
2017/10/26
0
0
SparkContext 初始化内部原理

如果编写Spark程序,那么第⼀⾏代码就是new SparkContext().setMaster(“”).setAppName(“xx”),可以说SparkContext是整个Spark 计算的启动器,只有将sparkContext 启动起来,后续的关于调 ...

大数据之路
2012/11/12
0
0
Spark的运行架构分析(一)之架构概述

本博客转载自:https://blog.csdn.net/gamer_gyt/article/details/51822765 1:Spark的运行模式 2:Spark中的一些名词解释 3:Spark的运行基本流程 4:RDD的运行基本流程 一:Spark的运行模式...

lubin2016
04/18
0
0
【Spark亚太研究院系列丛书】Spark实战高手之路-第一章 构建Spark集群(第四步)(1)

第一步:通过Spark的shell测试Spark的工作 Step1:启动Spark集群,这一点在第三讲讲的极为细致,启动后的WebUI如下: Step2:启动Spark Shell: 此时可以通过如下Web控制台查看shell的情况: ...

Spark亚太研究院
2014/09/11
0
0
Spark的运行架构分析(二)之运行模式详解

在上一篇博客 spark的运行架构分析(一)中我们有谈到Spark的运行模式是多种多样的,那么在这篇博客中我们来具体谈谈Spark的运行模式 本博客转载自https://blog.csdn.net/gamer_gyt/article...

lubin2016
04/18
0
0
Spark中的一些基本概念,及运行流程

1 Spark中的一些基本概念 话不多说,先来一个官网地址:链接 1. Application:用户基于spark的代码,由一个Driver和多个Excutor组成。 2. Application jar:将用户基于spark的代码打包成的jar...

yu0_zhang0
04/23
0
0
Spark 的Core深入(二)

Spark 的 Core 深入(二) 标签(空格分隔): Spark的部分 一、日志清洗的优化: 1.1 日志清洗有脏数据问题 rdd.partitions.length rdd.cacherdd.count 一个分区默认一个task 分区去处理默认...

flyfish225
05/08
0
0
Spark2.1.0之剖析spark-shell

通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢? 脚本分析 在Spark安装目录的bin文件夹下可以找...

beliefer
04/20
0
0
Apache Spark源码分析-- Job的提交与运行

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。 实验环境搭建 在进行后续操作前,确保下列条件已满足。 1. 下载spark binary 0.9.1 2. 安装scala 3. ...

超人学院
2015/05/28
0
0
Spark源码分析:多种部署方式之间的区别与联系(1)

 从官方的文档我们可以知道,Spark的部署方式有很多种:local、Standalone、Mesos、YARN.....不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多。  ...

Ryan-瑞恩
2015/09/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

vue基础知识练习

一、Hello World <div id="itany">{{msg}} <!-- 两对大括号{{}}称为模板,用来进行数据的绑定显示在页面中 --> </div><script src="js/vue.js"></script><script>var vm=new Vue({......

一个yuanbeth
11分钟前
0
0
spring @Transactional注解参数详解

原文:事物注解方式: @Transactional 当标于类前时, 标示类中所有方法都进行事物处理 , 例子: 1 @Transactional public class TestServiceBean implements TestService {} 当类中某些方法不需...

binhu
14分钟前
0
0
CORS 跨域实践

本文首发于个人微信公众号《andyqian》,期待你的关注~ 前言 系统通常都是由单体应用逐渐演化而来,演化成为前后端分离的分布式应用。在享受分布式系统带来的诸多好处之时,随之而来的也有不...

andyqian
21分钟前
7
0
开源 java CMS - FreeCMS2.8 会员管理

项目地址:http://www.freeteam.cn/ 会员组管理 会员管理 会员管理 从左侧管理菜单点击会员管理进入。 添加会员 在会员列表下方点击“添加”按钮。 填写相关属性后点击“保存”按钮即可。 编...

freeteam
22分钟前
0
0
bboss升级至 v5.0.6.8版本,改善对Elasticsearch SQL 的支持

v5.0.6.8功能改进如下: (1)持久层支持支持Elasticsearch SQL,使用参考文档:玩转Elasticsearch SQL功能 (2)解决持久层/elasticsearch模板变量解析多层级不起作用问题 (3)完善国际化功能 (4...

linux-tao
23分钟前
1
0
扫码二维码跳转到某个网站

添加maven依赖 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.0.0</version></dependency><dependency><groupId>com.goog......

gaomq
30分钟前
0
0
Windows平台下搭建Git服务器的图文教程

Git没有客户端服务器端的概念,但是要共享Git仓库,就需要用到SSH协议(FTP , HTTPS , SFTP等协议也能实现Git共享,此文档不讨论),但是SSH有客户端服务器端,所以在windows下的开发要把自己...

MKChan
36分钟前
0
0
告警系统主脚本&告警系统配置文件&告警系统监控项目

20.20 告警系统主脚本 准备工作 定义监控系统的各个目录,然后再去定义主脚本,因为是分布式的,所以需要每一台机器都需要定义,事先创建好各个脚本和各个目录,随后脚本直接拷贝过去即可,然...

影夜Linux
36分钟前
0
0
谈谈神秘的ES6——(一)初识ECMAScript

谈谈神秘的ES6——(一)初识ECMAScript 在《零基础入门JavaScript》我们就说过,ECMAScript是JavaScript的核心,是JavaScript语法和语义的解释器,同时也是一个标准。而ECMAScript标准其实也...

JandenMa
今天
1
0
第16章 Tomcat配置

16.1 Tomcat介绍 ####Tomcat介绍 LNMP架构针对的开发语言是PHP语言,php 是一门开发web程序非常流行的语言,早些年流行的是asp,在Windows平台上运行的一种编程语言,但安全性差,就网站开发...

Linux学习笔记
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部