spark 提交任务详细过程

原创
2018/08/08 18:31
阅读数 1.4K

1、spark submit后都需要action算子来触发作业,观察源码,发现所有的action算子都会调用SparkContext的runJob的方法,以collect为例:

collect调用了SparkContext的runJob方法:

/**

* 返回包含此RDD中所有元素的数组.

获取Rdd的所有数据,然后缓存在Driver端

*/

def collect(): Array[T] = withScope {

val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

Array.concat(results: _*)

}

继续调用runJob的重载方法:

/**

* 在RDD中的所有分区上运行一个作业并以数组的形式返回结果.

*/

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {

runJob(rdd, func, 0 until rdd.partitions.length)

}

继续调用runJob重载方法:

/**

* 在给定的一组RDD分区上运行一个作业,但要使用类型的函数

* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.

*/

def runJob[T, U: ClassTag](

rdd: RDD[T],

func: Iterator[T] => U,

partitions: Seq[Int]): Array[U] = {

val cleanedFunc = clean(func)

runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)

}

继续调用runJob的重载方法:

/**

* 在RDD中对给定的分区集合运行函数并将结果作为数组返回.

*/

def runJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int]): Array[U] = {

val results = new Array[U](partitions.size)

runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)

results

}

继续调用runJob的重载方法,这是最后一个SparkContext的runJob方法:

/**

* 在RDD中在给定的分区集合上运行函数,并将结果传递给给定的处理函数。这是spark中所有动作的主要入口点。

* .

*/

def runJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

resultHandler: (Int, U) => Unit): Unit = {

if (stopped.get()) {

throw new IllegalStateException("SparkContext has been shutdown")

}

val callSite = getCallSite

val cleanedFunc = clean(func)

logInfo("Starting job: " + callSite.shortForm)

if (conf.getBoolean("spark.logLineage", false)) {

logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)

}

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

progressBar.foreach(_.finishAll())

rdd.doCheckpoint()

}

调用了DAGScheduler的runJob方法:

/**

* 在给定的RDD上运行一个作业,并在到达时将所有结果传递给ReulthHand函数。

*

*

* @param rdd target RDD to run tasks on

* @param func a function to run on each partition of the RDD

* @param partitions set of partitions to run on; some jobs may not want to compute on all

* partitions of the target RDD, e.g. for operations like first()

* @param callSite where in the user program this job was called

* @param resultHandler callback to pass each result to

* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name

*

* @throws Exception when the job fails

*/

def runJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

callSite: CallSite,

resultHandler: (Int, U) => Unit,

properties: Properties): Unit = {

val start = System.nanoTime

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

waiter.awaitResult() match {

case JobSucceeded =>

logInfo("Job %d finished: %s, took %f s".format

(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

case JobFailed(exception: Exception) =>

logInfo("Job %d failed: %s, took %f s".format

(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

val callerStackTrace = Thread.currentThread().getStackTrace.tail

exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

throw exception

}

}

该runJob中有submitJob方法,并等待结果,无论成功失败均打印相关日志,

进入submitJob,首先获取的是rdd.partitions.length,校验partitions是否存在,其中eventProcessLoop.post 是其中的关键,

def submitJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

callSite: CallSite,

resultHandler: (Int, U) => Unit,

properties: Properties): JobWaiter[U] = {

// Check to make sure we are not launching a task on a partition that does not exist.

val maxPartitions = rdd.partitions.length

partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

throw new IllegalArgumentException(

"Attempting to access a non-existent partition: " + p + ". " +

"Total number of partitions: " + maxPartitions)

}

 

val jobId = nextJobId.getAndIncrement()

if (partitions.size == 0) {

// Return immediately if the job is running 0 tasks

return new JobWaiter[U](this, jobId, 0, resultHandler)

}

 

assert(partitions.size > 0)

val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

eventProcessLoop.post(JobSubmitted(

jobId, rdd, func2, partitions.toArray, callSite, waiter,

SerializationUtils.clone(properties)))

waiter

}

其中核心的是通过发送一个名为JobSubmit的case class对象给eventProcessLoop,这里的eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现了EventLoop的doOnReceive;而DAGScheduler是EventLoop的子类,EventLoop内部有一个线程EventThread,EventThread的run方法会不断循环消息队列,不断从eventQueue(LinkedBlockingDeque[E]())中获取消息,然后调用DAGScheduler的doOnReceiver方法,同时传入DAGSchedulerEvent类型的event来处理通过post方法传过来的消息。

 

eventProcessLoop.post 方法

/**

* 将事件放入事件队列中。事件线程稍后将对此进行处理。

*/

// 事件队列

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

def post(event: E): Unit = {

eventQueue.put(event)

}

 

// 事件线程方法处理事件队列里的数据,处理方法为 onReceive(event),onReceive(event)为在子类DAGSchedulerEventProcessLoop上重写。

private val eventThread = new Thread(name) {

setDaemon(true)

 

override def run(): Unit = {

try {

while (!stopped.get) {

val event = eventQueue.take()

try {

onReceive(event)

} catch {

case NonFatal(e) => {

try {

onError(e)

} catch {

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

}

} catch {

case ie: InterruptedException => // exit even if eventQueue is not empty

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

 

/**

* DAG调度器的主事件循环。主要方法为doOnReceive(event)

*/

override def onReceive(event: DAGSchedulerEvent): Unit = {

val timerContext = timer.time()

try {

doOnReceive(event)

} finally {

timerContext.stop()

}

}

 

// 执行 dagScheduler.handleJobSubmitted 方法,这是个核心方法,所有生成stage,以及stage之间的依赖关系解析,作业的生成,都是在这里完成的。

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

 

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>

dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

 

case StageCancelled(stageId) =>

dagScheduler.handleStageCancellation(stageId)

 

case JobCancelled(jobId) =>

dagScheduler.handleJobCancellation(jobId)

 

case JobGroupCancelled(groupId) =>

dagScheduler.handleJobGroupCancelled(groupId)

 

case AllJobsCancelled =>

dagScheduler.doCancelAllJobs()

 

case ExecutorAdded(execId, host) =>

dagScheduler.handleExecutorAdded(execId, host)

 

case ExecutorLost(execId) =>

dagScheduler.handleExecutorLost(execId, fetchFailed = false)

 

case BeginEvent(task, taskInfo) =>

dagScheduler.handleBeginEvent(task, taskInfo)

 

case GettingResultEvent(taskInfo) =>

dagScheduler.handleGetTaskResult(taskInfo)

 

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

dagScheduler.handleTaskCompletion(completion)

 

case TaskSetFailed(taskSet, reason, exception) =>

dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

 

case ResubmitFailedStages =>

dagScheduler.resubmitFailedStages()

}

 

dagScheduler.handleJobSubmitted() 方法:

private[scheduler] def handleJobSubmitted(jobId: Int,

finalRDD: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

callSite: CallSite,

listener: JobListener,

properties: Properties) {

var finalStage: ResultStage = null

try {

// 在 hadoop rdd上运行作业,则其新的阶段创建可能引发异常,如:该 RDD的基础HDFS文件已被删除。

// 这里将stage之间的依赖关系解析出来,同时根据依赖关系从小到大生成stage

id。

// finalStage代表的是上一blog的那个最后一个stage即resultStage,resultStage的上一个就是shuffleMapStage,spark中的stage的创建是从后往前根据依赖的类型进行创建

finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

} catch {

case e: Exception =>

logWarning("Creating new stage failed due to exception - job: " + jobId, e)

listener.jobFailed(e)

return

}

 

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

clearCacheLocs()

logInfo("Got job %s (%s) with %d output partitions".format(

job.jobId, callSite.shortForm, partitions.length))

logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

logInfo("Parents of final stage: " + finalStage.parents)

logInfo("Missing parents: " + getMissingParentStages(finalStage))

 

val jobSubmissionTime = clock.getTimeMillis()

jobIdToActiveJob(jobId) = job

activeJobs += job

finalStage.setActiveJob(job)

val stageIds = jobIdToStageIds(jobId).toArray

val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

listenerBus.post(

SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

submitStage(finalStage)

 

submitWaitingStages()

}

其中 newResultStage 是将stage之间的依赖关系解析出来,同时根据依赖关系从小到大生成stage id

/**

* 创建与所提供的JOBID相关联的结果阶段。

*/

private def newResultStage(

rdd: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

jobId: Int,

callSite: CallSite): ResultStage = {

// 获得stage的依赖stage

val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)

val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)

stageIdToStage(id) = stage

updateJobIdStageIdMaps(jobId, stage)

stage

}

其中 getParentStagesAndId 比较重要,是得到父stage与stage id

/**

* 帮助函数在创建新的阶段时消除一些代码重用。

*/

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {

val parentStages = getParentStages(rdd, firstJobId)

val id = nextStageId.getAndIncrement()

(parentStages, id)

}

getParentStages方法:

/**

*

* 获取或创建给定RDD的父级列表。新阶段将由提供的firstJobId创建。

*/

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

val parents = new HashSet[Stage]

val visited = new HashSet[RDD[_]]

// 我们在这里手动维护堆栈以防止递归访问导致的堆栈溢出错误。

val waitingForVisit = new Stack[RDD[_]]

def visit(r: RDD[_]) {

if (!visited(r)) {

visited += r

for (dep <- r.dependencies) {

dep match { // 这里的模式匹配是核心

case shufDep: ShuffleDependency[_, _, _] =>

parents += getShuffleMapStage(shufDep, firstJobId)

case _ =>

waitingForVisit.push(dep.rdd)

}

}

}

}

waitingForVisit.push(rdd)

while (waitingForVisit.nonEmpty) {

visit(waitingForVisit.pop())

}

parents.toList

}

如果是ShuffleDependency就添加到parents里面,如果是窄依赖的话就把当前的rdd压入栈底,然后就pop这个rdd,判断他的dependency是什么依赖。最后把parents转化成list。至于上面的nextStageId很简单,他就是一个整形数字而已:new AtomicInteger(0).简单吧。stageIdToStage(id) = stage就是把对应的id设置成为对应的stage,最后在向当前的job注册当前的stage,毕竟job是需要stage进行一系列的计算的。

 

stage的相关代码结束了。

继续来看dagScheduler.handleJobSubmitted()方法:

spark的jobId是提交之前就生成的,action操作触发SparkContext执行runJob方法,继而执行DAGSchedulder的submitJob方法,其中

val jobId = nextJobId.getAndIncrement() 得到jobId,nextJobId为DAGSchedulder维护的一个AtomicInteger对象,对象名称为nextJobId,该对象通过调用getAndIncrement()生成JobID。

现在看 dagScheduler.handleJobSubmitted()方法中的Activejob。

private[spark] class ActiveJob(

val jobId: Int,

val finalStage: Stage,

val callSite: CallSite,

val listener: JobListener,

val properties: Properties) {

 

/**

* 为此作业需要计算的分区数。注意,结果阶段可能不需要计算目标RDD中的所有分区,例如first()和LoopUp()。

* job在水平方向划分的话,贯穿每一个stage会把job划分成若干个并行的task,finished 就是一个numberTask的数组,用于标记task是否已经运行结束,初始化为false

*/

val numPartitions = finalStage match {

case r: ResultStage => r.partitions.length

case m: ShuffleMapStage => m.rdd.partitions.length

}

 

/** stage的哪个分区已经完成 */

val finished = Array.fill[Boolean](numPartitions)(false)

 

var numFinished = 0

}

现在继续看dagScheduler.handleJobSubmitted()方法,剩下的代码主要用于判断是否在本地执行,下面来看看关键代码:任务的提交函数-submitStage

listenerBus.post(

SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

submitStage(finalStage)

我们来看看submitStage方法:

 

/** 提交阶段,但首先递归提交任何丢失的父母。*/

private def submitStage(stage: Stage) {

val jobId = activeJobForStage(stage)

if (jobId.isDefined) {

logDebug("submitStage(" + stage + ")")

if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

//找到没有提交的父类stage

val missing = getMissingParentStages(stage).sortBy(_.id)

logDebug("missing: " + missing)

if (missing.isEmpty) {

//父类stage全部提交完之后在提交该stage

logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

submitMissingTasks(stage, jobId.get)

} else {

// 递归提交父类stage

for (parent <- missing) {

submitStage(parent)

}

//将等待提交的stage放入集合

waitingStages += stage

}

}

} else {

abortStage(stage, "No active job for stage " + stage.id, None)

}

}

现在我们来看submitMissingtTasks 方法:

private def submitMissingTasks(stage: Stage, jobId: Int) {

logDebug("submitMissingTasks(" + stage + ")")

// Get our pending tasks and remember them in our pendingTasks entry

// 清空pending状态的partition

stage.pendingPartitions.clear()

 

// 找出还没有计算的partition,从0开始

val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

 

// 如果初始化阶段没有累加器,则创建内部累加器。

// 重置内部累加器,除非此阶段未被部分提交,否则我们可能会从某些任务中重写现有累加器值。

//

// 设置accumulators,每个stage收集了其对应的所有task的accumulators

if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {

stage.resetInternalAccumulators()

}

 

// 使用与此阶段相关的ActuvWork中的调度池、作业组、描述等

//

val properties = jobIdToActiveJob(jobId).properties

 

runningStages += stage

//

// 为stage创建指定数量的task

// 这里有一个很关键,就是task最佳位置的计算

//

stage match {

case s: ShuffleMapStage =>

outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

case s: ResultStage =>

outputCommitCoordinator.stageStart(

stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

}

// 确定每个需要计算的 task 的preferredLocations(位置优先策略),这里调用到了

// RDD#getPreferredLocs,getPreferredLocs返回的 partition 的优先位置,就是这个

// partition 对应的 task 的优先位置 (计算task的分发策略)

val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {

stage match {

case s: ShuffleMapStage =>

partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap

case s: ResultStage =>

val job = s.activeJob.get

partitionsToCompute.map { id =>

val p = s.partitions(id)

(id, getPreferredLocs(stage.rdd, p))

}.toMap

}

} catch {

case NonFatal(e) =>

stage.makeNewStageAttempt(partitionsToCompute.size)

listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

runningStages -= stage

return

}

 

// 生成下一阶段的的 Attempt id

stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

 

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

// the serialized copy of the RDD and for each task we will deserialize it, which means each

// task gets a different copy of the RDD. This provides stronger isolation between tasks that

// might modify state of objects referenced in their closures. This is necessary in Hadoop

// where the JobConf/Configuration object is not thread-safe.

var taskBinary: Broadcast[Array[Byte]] = null

try {

// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

// For ResultTask, serialize and broadcast (rdd, func).

// 序列化task

val taskBinaryBytes: Array[Byte] = stage match {

case stage: ShuffleMapStage =>

closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()

case stage: ResultStage =>

closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()

}

// 分发序列化后的task

taskBinary = sc.broadcast(taskBinaryBytes)

} catch {

// In the case of a failure during serialization, abort the stage.

case e: NotSerializableException =>

abortStage(stage, "Task not serializable: " + e.toString, Some(e))

runningStages -= stage

 

// Abort execution

return

case NonFatal(e) =>

abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))

runningStages -= stage

return

}

 

// 将task信息封装成task对象数组

val tasks: Seq[Task[_]] = try {

stage match {

case stage: ShuffleMapStage =>

partitionsToCompute.map { id =>

val locs = taskIdToLocations(id)

val part = stage.rdd.partitions(id)

new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, stage.internalAccumulators)

}

 

case stage: ResultStage =>

val job = stage.activeJob.get

partitionsToCompute.map { id =>

val p: Int = stage.partitions(id)

val part = stage.rdd.partitions(p)

val locs = taskIdToLocations(id)

new ResultTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, id, stage.internalAccumulators)

}

}

} catch {

case NonFatal(e) =>

abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

runningStages -= stage

return

}

 

if (tasks.size > 0) {

logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

stage.pendingPartitions ++= tasks.map(_.partitionId)

logDebug("New pending partitions: " + stage.pendingPartitions)

 

// 提交task集合,将taskSet交给TaskSetManager去管理

taskScheduler.submitTasks(new TaskSet(

tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

} else {

// Because we posted SparkListenerStageSubmitted earlier, we should mark

// the stage as completed here in case there are no tasks to run

markStageAsFinished(stage, None)

 

val debugString = stage match {

case stage: ShuffleMapStage =>

s"Stage ${stage} is actually done; " +

s"(available: ${stage.isAvailable}," +

s"available outputs: ${stage.numAvailableOutputs}," +

s"partitions: ${stage.numPartitions})"

case stage : ResultStage =>

s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"

}

logDebug(debugString)

}

}

 

现在来看taskScheduler.submitTasks方法:

override def submitTasks(taskSet: TaskSet) {

val tasks = taskSet.tasks

logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")

this.synchronized {

// 首先创建 TaskSetManager,TaskSetManager负责管理TaskSchedulerImpl中一个单独TaskSet,跟踪每一个task,如果task失败,负责重试task直到达到task重试次数的最多次数。并且通过延迟调度来执行task的位置感知调度。

val manager = createTaskSetManager(taskSet, maxTaskFailures)

val stage = taskSet.stageId

val stageTaskSets =

taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

stageTaskSets(taskSet.stageAttemptId) = manager

val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>

ts.taskSet != taskSet && !ts.isZombie

}

if (conflictingTaskSet) {

throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +

s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")

}

// 将TaskSetManager加入rootPool调度池中,由schedulableBuilder决定调度顺序,SchedulableBuilder本身是应用程序级别的调度器,它自己支持两种调度模式,一种是FIFO,另一种是FAIR,调度策略可以通过spark-env.sh中的spark.scheduler.mode进行具体的设置,默认情况下是FIFO。

schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

 

if (!isLocal && !hasReceivedTask) {

starvationTimer.scheduleAtFixedRate(new TimerTask() {

override def run() {

if (!hasLaunchedTask) {

logWarning("Initial job has not accepted any resources; " +

"check your cluster UI to ensure that workers are registered " +

"and have sufficient resources")

} else {

this.cancel()

}

}

}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)

}

hasReceivedTask = true

}

// 最后在submitTasks中调用了 该方法

backend.reviveOffers()

}

现在来看backend.reviveOffers()方法,该方法在CoarseGrainedSchedulerBackend 重写了:

override def reviveOffers() {

driverEndpoint.send(ReviveOffers)

}

给driverEndpoint发送了一个ReviveOffers case object,这个消息其实是发给driverEndpoint自己的(详情见sparkde RpcEnv模块),也就是说最后处理的这个消息的还是driverEndpoint本身。driverEndpoint收到ReviveOffer消息后调用CoarseGrainedSchedulerBackend 的 makeOffers方法:它将集群的资源以Offer的方式发给上层的TaskSchedulerImpl。TaskSchedulerImpl调用scheduler.resourceOffers获得要被执行的Seq[TaskDescription],然后将得到的Seq[TaskDescription]交给CoarseGrainedSchedulerBackend分发到各个executor上执行

private def makeOffers() {

// 所有可以用于计算的Executor

val activeExecutors = executorDataMap.filterKeys(executorIsAlive)

// workOffers 代表了所有可用ExecutorBackend中可以使用的CPU Cores信息

val workOffers = activeExecutors.map { case (id, executorData) =>

new WorkerOffer(id, executorData.executorHost, executorData.freeCores)

}.toSeq

// 确定task具体运行在哪个ExecutorBackend上的算法 (scheduler.resourceOffers),再通过调用launchTask把任务发送给ExecutorBackend去执行

launchTasks(scheduler.resourceOffers(workOffers))

}

 

下面来看看launchTasks 方法:通过调用launchTask把任务发送给ExecutorBackend去执行

// 由资源提供集返回的启动任务

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

for (task <- tasks.flatten) {

val serializedTask = ser.serialize(task)

if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {

scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>

try {

var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +

"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +

"spark.akka.frameSize or using broadcast variables for large values."

msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,

AkkaUtils.reservedSizeBytes)

taskSetMgr.abort(msg)

} catch {

case e: Exception => logError("Exception in error callback", e)

}

}

}

else {

val executorData = executorDataMap(task.executorId)

executorData.freeCores -= scheduler.CPUS_PER_TASK

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

}

}

}

以上为spark提交任务的详细过程(具体细节可能没有写全)。

总结:1、spark submit后都需要action算子来触发作业,观察源码,发现所有的action算子都会调用SparkContext的runJob的方法。

2、连续调用多个runJob的重载方法,直至调用DAGScheduler的runJob的方法。

3、调用DAGScheduler 的submitJob方法,其中有eventProcessLoop.post 方法将事件放入事件队列。

4、多线程启动,调用事件队列里的数据,调用子类DAGSchedulerEventProcessLoop上重写的onReceive(event)方法,继而调用dagScheduler.handleJobSubmitted方法,该方法生成所有的stage、所有stage之间的依赖关系解析和作业的生成。

5、stage的提交是从最后一个stage开始提交的,然后递归找到所有的父类stage,如果父类stage没有提交,则先提交父类stage。

6、提交stage时需要计算作业的分区,贯穿每一个stage会把job划分成若干个并行的task,并计算出task的最佳位置。

7、最后将task封装成task数组提交task。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部