文档章节

Spark 之SparkContext 源码精读1

柯里昂
 柯里昂
发布于 2016/04/04 19:29
字数 1465
阅读 368
收藏 1
点赞 1
评论 1

前文可知,SparkContext是Spark的核心中的核心。

SparkContext 是Spark功能的入口,表示与Spark 集群的连接。用于创建 RDD、广播变量、累加器变量;

 

接下来,咱们一起从源码开始,强烈建议读者把源码下载下来,和我一起分析源码。

引用老师的话,"源码能说明一切问题"

创建核心组件:TaskScheduler和SchedulerBackend

// SparkContext.scala line 521
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
// SparkContext.scala line 2592
  /**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)
        
// ... 若干模式匹配
// line 2629
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
        
// ... 若干模式匹配

      case zkUrl if zkUrl.startsWith("zk://") =>
        logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
          "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
        createTaskScheduler(sc, "mesos://" + zkUrl)

      case _ =>
        throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
  }
}

这里着重看 SPARK_REGEX模式 。line 2630。首先创建 TaskSchedulerImpl 

 

// TaskSchedulerImpl.scala line 110
// default scheduler is FIFO
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
val schedulingMode: SchedulingMode = try {
  SchedulingMode.withName(schedulingModeConf.toUpperCase)
} catch {
  case e: java.util.NoSuchElementException =>
    throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
}

在创建TaskScheduler时,指定了调度模型,默认是FIFO:先入先出。

其他的变量都是初始化,暂先不细究。

其他的方法都是需要从实例的对象中去调用的,也暂不细究。

 

此时TaskSchedulerImpl创建成功,然后将创建的TaskScheduler实例作为构造参数,创建SchedulerBackend。见SparkContext.scala line 2632。

此时创建的SchedulerBackend实例实际上是 SparkDeploySchedulerBackend 类。

// SparkDeploySchedulerBackend.scala line 30
private[spark] class SparkDeploySchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  with AppClientListener
  with Logging

看SparkDeploySchedulerBackend 的定义可知,是继承自父类CoarseGrainedSchedulerBackend。让我们看看CoarseGrainedSchedulerBackend的定义。

// CoarseGrainedSchedulerBackend.scala line 31
/**
 * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
 * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
 * executors whenever a task is done and asking the scheduler to launch a new executor for
 * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
 * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
 * (spark.deploy.*).
 */
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
  extends ExecutorAllocationClient with SchedulerBackend with Logging

从上面的注释可知,这个类是粗粒度【coarse grained】的实现。扩展下,fine-grained【细粒度】的实现是MesosSchedulerBackend

大家应该知道,在创建一个对象实例是,会先执行父类的构造函数。这里,创建SparkDeploySchedulerBackend会先调用CoarseGrainedSchedulerBackend的构造。

至此SchedulerBackend创建成功。

值得注意的是:SchedulerBackend实例中有一个TaskScheduler类型的成员变量。后续有一些TaskScheduler关于调度的方法,会在SchedulerBackend中被调用。

紧接着,看上面代码,调用了

// SparkContext.scala line 2633
scheduler.initialize(backend)

上述代码清楚的说明了,将SchedulerBackend作为参数传进来。

至此,咱们可以简单的这么认为,SchedulerBackend中Backend 实际上就是指TaskScheduler。SchedulerBackend是TaskScheduler的后端。

再来看下TaskSchedulerImpl的initialize方法

// TaskSchedulerImpl.scala line 125
def initialize(backend: SchedulerBackend) {
  this.backend = backend
  // temporarily set rootPool name to empty
  rootPool = new Pool("", schedulingMode, 0, 0)
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
    }
  }
  schedulableBuilder.buildPools()
}

很显然,TaskScheduler将SchedulerBackend实例作为成员变量保存了。后续源码分析中可见,此成员变量被频繁的使用。

同时,初始化了一个调度池,然后根据上面提到的调度模式初始化了调度建造器,之后就创建了调度池。调度模式后面会单独提到。尽请期待。

 

createTaskScheduler调用之后,会返回一个Tuple,具体是SchedulerBackend[SparkDeploySchedulerBackend] 和 TaskScheduler[TaskSchedulerImpl]

之后,将SparkContext作为参数,创建DAGScheduler。

DAG是Directed Acyclic Graph的缩写,是指有向无环图。

// SparkContext.scala line 521
// Create and start the scheduler ,实际上,方法调用完,只是创建了实例,并没有start,start是在 line 530
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this) // line 525 
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()  // line 530

大家可能会问,明明上面创建了TaskScheduler和SchedulerBackend,却有没有马上start,是因为什么呢?

请看 line 530 上面的注释,已经很清楚的说明了原因。那么让我们看看DAGScheduler的构造中做了什么?

// DAGScheduler.scala line 131
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
// DAGScheduler.scala line 121
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
  this(
    sc,
    taskScheduler,
    sc.listenerBus,
    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    sc.env.blockManager.master,
    sc.env)
}

    最后的目的,就是为了将TaskScheduler传进去

// DAGScheduler.scala line 110
private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging

并在后续构造中,将DAGScheduler赋值给TaskScheduler的成员变量

// DAGScheduler.scala line 185
taskScheduler.setDAGScheduler(this)

由此可见,TaskScheduler和DAGScheduler相互之间是有引用的。

那这两个Scheduler之间有什么区别呢?

DAGScheduler可以认为是总监,他是负责阶段性进展的管控。

TaskScheduler可以认为是监工,负责具体的一个任务的进度。

比如一个项目有一期二期。一期下面有A、B、C,二期下面有D、E、F。

DAGScheduler是管控项目每一期的进展的。也就是一期、二期项目顺利完成。

而TaskScheduler是管控每一期中的子任务的进展的。也就是一期中的A、B、C。二期中的D、E、F 任务顺利完成。

当然顺利完成也代表着,如果遇到任务中断了,需要调整,重新开始。即失败重试机制。

而这两者的共同点是,他们都是负责不同粒度的任务顺利完成的。换言之,他们是不管完成任务所需资源从哪来的。

 

当DAGScheduler也创建完成后,SparkContext的3大核心对象已创建完成;分别是TaskScheduler,SchedulerBackend,DAGScheduler。

 

2016-04-04 ,时间不早了,今天就更新到此。

 

下篇介绍Driver的创建

 

© 著作权归作者所有

共有 人打赏支持
柯里昂
粉丝 22
博文 160
码字总数 78816
作品 0
徐汇
技术主管
加载中

评论(1)

柯里昂
柯里昂
DAGScheduler中只包含TaskScheduler的引用,没有SchedulerBackend的引用,再次说明了,SchedulerBackend中Scheduler是指TaskScheduler
Spark的运行架构分析(一)之架构概述

本博客转载自:https://blog.csdn.net/gamer_gyt/article/details/51822765 1:Spark的运行模式 2:Spark中的一些名词解释 3:Spark的运行基本流程 4:RDD的运行基本流程 一:Spark的运行模式...

lubin2016 ⋅ 04/18 ⋅ 0

Spark2.1.0之剖析spark-shell

通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢? 脚本分析 在Spark安装目录的bin文件夹下可以找...

beliefer ⋅ 04/20 ⋅ 0

Spark中的一些基本概念,及运行流程

1 Spark中的一些基本概念 话不多说,先来一个官网地址:链接 1. Application:用户基于spark的代码,由一个Driver和多个Excutor组成。 2. Application jar:将用户基于spark的代码打包成的jar...

yu0_zhang0 ⋅ 04/23 ⋅ 0

Spark 的Core深入(二)

Spark 的 Core 深入(二) 标签(空格分隔): Spark的部分 一、日志清洗的优化: 1.1 日志清洗有脏数据问题 rdd.partitions.length rdd.cacherdd.count 一个分区默认一个task 分区去处理默认...

flyfish225 ⋅ 05/08 ⋅ 0

Spark的运行架构分析(二)之运行模式详解

在上一篇博客 spark的运行架构分析(一)中我们有谈到Spark的运行模式是多种多样的,那么在这篇博客中我们来具体谈谈Spark的运行模式 本博客转载自https://blog.csdn.net/gamer_gyt/article...

lubin2016 ⋅ 04/18 ⋅ 0

Spark初识入门core (一)

Spark初识入门core (一) 标签(空格分隔): Spark的部分 一:spark 简介 1.1 spark 的来源 Hive Storm Mahout Griph Spark Core Spark SQL Spark Streaming Spark ML Spark GraphX Spark......

flyfish225 ⋅ 05/07 ⋅ 0

Spark初识入门Core (一)

Spark初识入门core (一) 标签(空格分隔): Spark的部分 一:spark 简介 1.1 spark 的来源 Hive Storm Mahout Griph Spark Core Spark SQL Spark Streaming Spark ML Spark GraphX Spark......

flyfish225 ⋅ 05/07 ⋅ 0

pycharm pyspark 配置

1 、安装了pycharm,下载spark(官网下载,我下的是spark-2.1.1-bin-hadoop2.7.tgz,解压缩后为文件夹spark-2.1.1-bin-hadoop2.7,我将文件放在了/Applications/spark/下,这个文件夹里面有p...

张欢19933 ⋅ 05/09 ⋅ 0

Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D ⋅ 06/15 ⋅ 0

Spark核心原理 - 消息通信分析

Spark将通信层设计的非常巧妙,融合了各种设计/架构模式,将一个分布式集群系统的通信层细节完全屏蔽,这样在上层的计算框架的设计中能够获得很好的灵活性。同时,如果上层想要增加各种新的特...

oitebody ⋅ 05/30 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

OSChina 周六乱弹 —— 假如你被熊困到树上

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @小小编辑:推荐歌曲《如果写不出好的和弦就该在洒满阳光的钢琴前一起吃布丁》 《如果写不出好的和弦就该在洒满阳光的钢琴前一起吃布丁》- 谢...

小小编辑 ⋅ 40分钟前 ⋅ 4

vbs 取文件大小 字节

dim namedim fs, s'name = Inputbox("姓名")'msgbox(name)set fs = wscript.createobject("scripting.filesystemobject") 'fs为FSO实例if (fs.folderexists("c:\temp"))......

vga ⋅ 今天 ⋅ 1

高并发之Nginx的限流

首先Nginx的版本号有要求,最低为1.11.5 如果低于这个版本,在Nginx的配置中 upstream web_app { server 到达Ip1:端口 max_conns=10; server 到达Ip2:端口 max_conns=10; } server { listen ...

算法之名 ⋅ 今天 ⋅ 0

Spring | IOC AOP 注解 简单使用

写在前面的话 很久没更新笔记了,有人会抱怨:小冯啊,你是不是在偷懒啊,没有学习了。老哥,真的冤枉:我觉得我自己很菜,还在努力学习呢,正在学习Vue.js做管理系统呢。即便这样,我还是不...

Wenyi_Feng ⋅ 今天 ⋅ 0

博客迁移到 https://www.jianshu.com/u/aa501451a235

博客迁移到 https://www.jianshu.com/u/aa501451a235 本博客不再更新

为为02 ⋅ 今天 ⋅ 0

win10怎么彻底关闭自动更新

win10自带的更新每天都很多,每一次下载都要占用大量网络,而且安装要等得时间也蛮久的。 工具/原料 Win10 方法/步骤 单击左下角开始菜单点击设置图标进入设置界面 在设置窗口中输入“服务”...

阿K1225 ⋅ 今天 ⋅ 0

Elasticsearch 6.3.0 SQL功能使用案例分享

The best elasticsearch highlevel java rest api-----bboss Elasticsearch 6.3.0 官方新推出的SQL检索插件非常不错,本文一个实际案例来介绍其使用方法。 1.代码中的sql检索 @Testpu...

bboss ⋅ 今天 ⋅ 0

informix数据库在linux中的安装以及用java/c/c++访问

一、安装前准备 安装JDK(略) 到IBM官网上下载informix软件:iif.12.10.FC9DE.linux-x86_64.tar放在某个大家都可以访问的目录比如:/mypkg,并解压到该目录下。 我也放到了百度云和天翼云上...

wangxuwei ⋅ 今天 ⋅ 0

PHP语言系统ZBLOG或许无法重现月光博客的闪耀历史[图]

最近在写博客,希望通过自己努力打造一个优秀的教育类主题博客,名动江湖,但是问题来了,现在写博客还有前途吗?面对强大的自媒体站点围剿,还有信心和可能型吗? 至于程序部分,我选择了P...

原创小博客 ⋅ 今天 ⋅ 0

IntelliJ IDEA 2018.1新特性

工欲善其事必先利其器,如果有一款IDE可以让你更高效地专注于开发以及源码阅读,为什么不试一试? 本文转载自:netty技术内幕 3月27日,jetbrains正式发布期待已久的IntelliJ IDEA 2018.1,再...

Romane ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部