文档章节

Spark Driver启动流程 代码解析

 大胖和二胖
发布于 2016/08/19 11:02
字数 1495
阅读 442
收藏 0

SparkContext.scala

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)

    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

我们再看一下 SparkContext.createTaskScheduler 当中究竟做了些什么

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

我们看到 _taskScheduler 是 TaskSchedulerImpl 的实例, _schedulerBackend 是 StandaloneSchedulerBackend 的实例,而会把 _schedulerBackend 通过 scheduler.initialize 给到 _taskScheduler。

然后再来看一下 _taskScheduler.start() 究竟干了些什么

  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

我们看到首先是对 backend.start() 的调用,我们可以在 StandaloneSchedulerBackend 当中找到start的实现:

override def start() {
    super.start()
    launcherBackend.connect()

    // The endpoint for executors to talk to us
    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
    // ExecutorAllocationManager will send the real initial limit to the Master later.
    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

我们看一下client.start()当中究竟了做了些什么:

  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

endpoint是一个AtomicReference, rpcEnv.setupEndpoint 做了2件事,一个是注册一个endpoint,另外把它的ref返回回来。这里哪里体现start了?我知道一定会进到 ClientEndpoint 的start方法当中去,可是究竟是怎么进去的????

下面这段代码我们在Rpc机制的文章当中提到过,红色代码部分,当时并没有太在意,现在看来,每个End Point注册到Rpc Env当中的时候,都会自动触发它的start事件。

  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

之后,让我们找到ClientEndpoint( StandaloneAppClient的一个内部类 ),看它的onStart方法:

    override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster 当中也存在递归调用,不过这个递归,是为了retry服务的,所以我们直接看 tryRegisterAllMasters()

    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }

    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))

          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

红色2行代码,注册End Point,并发送消息。这里的master end point,应该是一个位于spark集群,master节点上的end point,相对于driver上的Rpc Env来讲,应该是一个remote的end point。

我们找到master.scala,先看它的类声明:

private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

再找到它的receive方法:

override def receive: PartialFunction[Any, Unit]

只需要看其中一段:

    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))

        schedule()
      }

1、创建app,2、注册app,3、持久化app,4、向driver的endpoint发送消息,5、schedule()

step4, 其中driver是跟着Rpc Message一起过来的,需要给driver发一个注册app的响应。

我们再回到 ClientEndpoint.receive,

    override def receive: PartialFunction[Any, Unit] = {
      case RegisteredApplication(appId_, masterRef) =>
        // FIXME How to handle the following cases?
        // 1. A master receives multiple registrations and sends back multiple
        // RegisteredApplications due to an unstable network.
        // 2. Receive multiple RegisteredApplication from different masters because the master is
        // changing.
        appId.set(appId_)
        registered.set(true)
        master = Some(masterRef)
        listener.connected(appId.get)

step5,我们看看当中做了些什么事情:

  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

launchDriver(worker, driver) 我们理解为,在worder上为当前的driver启动一个线程。

再看一下 startExecutorsOnWorkers()

  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // Now that we've decided how many cores to allocate on each worker, let's allocate them
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }

    }
  }

  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val exec = app.addExecutor(worker, coresToAssign)
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))

    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))

  }

1、在一个本地的worker变量当中添加一个exec

2、通知worker,启动一个executor

3、通知driver,executor added

到此为止,一个spark driver的启动流程,基本被我们梳理完毕。后续随着理解的进一步深入,我们会逐步的补充和整理这个文档,大家敬请期待。

 

 

© 著作权归作者所有

粉丝 23
博文 69
码字总数 50842
作品 0
沈阳
架构师
私信 提问
Spark Standalone架构设计要点分析

原文链接:http://www.uml.org.cn/bigdata/2017011105.asp?artid=18894 全栈工程师开发手册 (作者:栾鹏) 架构系列文章 Apache Spark是一个开源的通用集群计算系统,它提供了High-level编程...

数据架构师
08/06
0
0
Spark的运行架构分析(二)之运行模式详解

在上一篇博客 spark的运行架构分析(一)中我们有谈到Spark的运行模式是多种多样的,那么在这篇博客中我们来具体谈谈Spark的运行模式 本博客转载自https://blog.csdn.net/gamer_gyt/article...

lubin2016
2018/04/18
0
0
扣丁学堂大数据培训Spark架构运行及优势详解

  今天扣丁学堂大数据培训给大家介绍一下关于大数据开发中Spark架构运行详解及其优势详解,首先spark是一种分布式的计算框架。类似于大数据开发中Hadoop生态圈的MapReduce,计算思想和MR非...

扣丁学堂
2018/08/14
0
0
Spark中的一些基本概念,及运行流程

1 Spark中的一些基本概念 话不多说,先来一个官网地址:链接 1. Application:用户基于spark的代码,由一个Driver和多个Excutor组成。 2. Application jar:将用户基于spark的代码打包成的jar...

yu0_zhang0
2018/04/23
0
0
【Spark】Spark On Yarn 环境搭建及 WordCount 程序原理深度剖析

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/86652088 1、Spark On Yarn 环境搭建 参考文献:http://spark.apache.org/doc...

魏晓蕾
01/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

程序设计基础(C)第06讲例程

1summing.c /* summing.c -- 根据用户键入的整数求和 */#include <stdio.h>int main(void){ long num; long sum = 0L; /* 把sum 初始化为0 */ int status; p......

树人大学数字媒体吴凡
6分钟前
3
0
聊聊nacos config的publishConfig

序 本文主要研究一下nacos config的publishConfig ConfigController nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java @Controller@R......

go4it
33分钟前
3
0
Eureka应用注册与集群数据同步源码解析

在之前的EurekaClient自动装配及启动流程解析一文中我们提到过,在构造DiscoveryClient类时,会把自身注册到服务端,本文就来分析一下这个注册流程 客户端发起注册 boolean register() t...

Java学习录
43分钟前
11
0
Java描述设计模式(15):责任链模式

本文源码:GitHub·点这里 || GitEE·点这里 一、生活场景描述 1、请假审批流程 公司常见的请假审批流程:请假天数 当 day<=3 天,项目经理审批当 3<day<=5 天,部门经理审批当 day>5 天...

知了一笑
53分钟前
10
0
总结:数组与链表

1、内存申请:数组在内存上是连续的空间;链表,内存地址上可以是不连续的。 2、查询速度:数组可以随机访问,链表必须顺序访问,即从首个元素开始遍历,逐个查找,所以数组查询很快。 3、写入...

浮躁的码农
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部