文档章节

spark内核揭秘-07-DAGScheduler源码解读初体验

stark_summer
 stark_summer
发布于 2015/01/19 20:57
字数 414
阅读 69
收藏 2

当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:


进入其构造函数中:





可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:


MapOutputTrackerMaster:


BlockManagerMaster:


通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

private def initializeEventProcessActor() {  // blocking the thread until supervisor is started, which ensures eventProcessActor is  // not null before any job is submitted  // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null  implicit val timeout = Timeout(30 seconds)  val initEventActorReply =    dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))  eventProcessActor = Await.result(initEventActorReply, timeout.duration).
    asInstanceOf[ActorRef]
}

initializeEventProcessActor()

DAGSchedulerEventProcessActor:

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)  extends Actor with Logging {  override def preStart() {    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always    // valid when the messages arrive    // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候    // eventProcessActor已经准备就绪    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
  }  /**   * The main event loop of the DAG scheduler.   */  def receive = {    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,        listener, properties)    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }  override def postStop() {    // Cancel any active jobs in postStop hook    dagScheduler.cleanUpAfterSchedulerStop()
  }
}


可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。

© 著作权归作者所有

共有 人打赏支持
stark_summer
粉丝 61
博文 75
码字总数 51050
作品 0
朝阳
CEO
私信 提问
Spark源码分析:多种部署方式之间的区别与联系(1)

 从官方的文档我们可以知道,Spark的部署方式有很多种:local、Standalone、Mesos、YARN.....不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多。  ...

Ryan-瑞恩
2015/09/02
0
0
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
05/24
0
0
教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据
05/20
0
0
SparkContext 初始化内部原理

如果编写Spark程序,那么第⼀⾏代码就是new SparkContext().setMaster(“”).setAppName(“xx”),可以说SparkContext是整个Spark 计算的启动器,只有将sparkContext 启动起来,后续的关于调 ...

大数据之路
2012/11/12
0
0
Spark2.1.0之模型设计与基本架构

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80468095 提示:在阅读本文前,最好先阅读《Spark2.1.0之初识Spark》、《Spark2.1....

泰山不老生
06/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

云环境所面临的安全威胁

导读 为了云计算的健康发展和等级保护工作在新形势下顺利推进,就不能忽视对云计算面临的各类安全威胁的研究和分析,并制定和建立相应的等级保护政策、技术体系,应对即将到来的云浪潮。 为了...

问题终结者
45分钟前
1
0
大牛五分钟教你如何创建并运行java线程

Java线程类也是一个object类,它的实例都继承自java.lang.Thread或其子类。 可以用如下方式用java中创建一个线程: 执行该线程可以调用该线程的start()方法: 在上面的例子中,我们并没有为线程...

java知识分子
46分钟前
1
0
监控某进程工作状态的脚本

很多时候,我们需要监控某个脚本是否正在执行,如果不在执行,则需将它重启。 思路: 监测脚本或进程的执行状态,可以通过ps -ef,再加上grep命令进行定位。 如果该进程在执行,则上述命令会...

vinci321
47分钟前
1
0
彻底弃用PHP转入Kotlin阵营

痛定思痛,决定彻底弃用PHP,转入Kotlin,采用Ktor方案。Kotlin从去年年底到现在,我都一直保持关注,这一年下来,从1.0升级到1.3,增加了很多特性和支持。其实,只要是JetBrains扶持着的,想...

曾建凯
51分钟前
1
0
php动态扩展模块安装

11月21日任务 11.32 php扩展模块装安装 1.PHP扩展模块安装 示例一:编译安装一个扩展模块 /usr/local/php/bin/php -m //查看模块 下面安装一个redis的模块 cd /usr/local/src/ 进入该目录下 ...

hhpuppy
55分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部