文档章节

spark内核揭秘-07-DAGScheduler源码解读初体验

stark_summer
 stark_summer
发布于 2015/01/19 20:57
字数 414
阅读 91
收藏 2

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:


进入其构造函数中:





可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:


MapOutputTrackerMaster:


BlockManagerMaster:


通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

private def initializeEventProcessActor() {  // blocking the thread until supervisor is started, which ensures eventProcessActor is  // not null before any job is submitted  // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null  implicit val timeout = Timeout(30 seconds)  val initEventActorReply =    dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))  eventProcessActor = Await.result(initEventActorReply, timeout.duration).
    asInstanceOf[ActorRef]
}

initializeEventProcessActor()

DAGSchedulerEventProcessActor:

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)  extends Actor with Logging {  override def preStart() {    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always    // valid when the messages arrive    // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候    // eventProcessActor已经准备就绪    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
  }  /**   * The main event loop of the DAG scheduler.   */  def receive = {    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,        listener, properties)    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }  override def postStop() {    // Cancel any active jobs in postStop hook    dagScheduler.cleanUpAfterSchedulerStop()
  }
}


可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。

© 著作权归作者所有

stark_summer
粉丝 61
博文 75
码字总数 51050
作品 0
朝阳
CEO
私信 提问
加载中

评论(0)

Spark源码分析:多种部署方式之间的区别与联系(1)

 从官方的文档我们可以知道,Spark的部署方式有很多种:local、Standalone、Mesos、YARN.....不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多。  ...

Ryan-瑞恩
2015/09/02
106
0
王家林大咖清华大学新书Spark第二版已上市:前浪致 Spark + AI 后浪

王家林大咖清华大学新书Spark第二版已上市:前浪 致 Spark + AI 后浪 大咖心声 新书图片 新书介绍 编辑推荐 内容简介 作者简介 新书目录 第二版前言 第一版前言 新书案例讲解 第二版网购链接...

段智华
05/25
0
0
大数据技术学习: Spark一门必须要掌握的语言

大数据Spark技术的普及推广,对专业人才的需求也日益增加。Spark这门语言大数据培训中心也有相对应的课程,学习Spark,也是一个阶段慢慢的学习,通常来讲需要经历以下阶段: 第一阶段:熟练的...

osc_qhb83isy
2019/06/08
7
0
SPARK 源码分析技术分享(带bilibili视频)

SPARK 源码分析技术分享 (带bilibili视频) 【本站点正在持续更新中…2018-12-05…】 SPARK 1.6.0-cdh5.15.0 Hadoop 2.6.0-cdh5.15.0 spark-scala-maven 微信(技术交流) : thinktothings SPA...

thinktothings
2018/12/02
0
0
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Windows 10 中安装 Anaconda 3

首先通过下面链接地址下载 Anaconda 的个人版本。 https://www.anaconda.com/products/individual 从上面下载的地址中,选择你需要的版本,目前 Windows 应该基本上都是 64 位的了。 在你下载...

honeymoose
57分钟前
25
0
如何禁用textarea的resizable属性? - How do I disable the resizable property of a textarea?

问题: I want to disable the resizable property of a textarea . 我想禁用textarea的resizable属性。 Currently, I can resize a textarea by clicking on the bottom right corner of t......

技术盛宴
57分钟前
29
0
即时通信E聊SDK简介(1)

2.简介: E聊SDK是一套适用于PC端, 移动端的即时通讯解决方案,源代码开放。E聊整合了即时通讯的基础能力,使用E聊,您可以让您的应用快速接入即时聊天的功能。E聊现已适配PC Web, 移动Web, ...

E聊
今天
15
0
多个知乎账号一起登陆,同时管理运营的神器!

随着互联网生态的优化,从2016年开始,信息内容产业相当有关注度,其和粉丝互动起来很方便、流量大到惊人、可长远发展等等优势,迅速聚集了无数企业和个人,为了有非常全面的播放数据,大家通...

易媒助手
今天
22
0
403禁止vs 401未经授权的HTTP响应 - 403 Forbidden vs 401 Unauthorized HTTP responses

问题: For a web page that exists, but for which a user does not have sufficient privileges (they are not logged in or do not belong to the proper user group), what is the prope......

fyin1314
今天
19
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部