从源码角度对比Yarn下的App对任务失败的控制

原创
2017/09/26 19:05
阅读数 6.7K

在Yarn/MRV2/Spark里都有任务失败再重试的设置,Yarn作为资源管理框架,而MRV2/Spark作为计算框架,它们的失败重试有什么不同? 有哪些参数来控制失败重试? Spark on Yarn的时候参数又有什么影响?

Yarn的AM失败重试

Yarn作为资源管理框架,由RM负责AM(ApplicationMaster),具体的任务是由AM自己负责,所以Yarn对于一个Job的重试是在AM层级上的,其参数为 yarn.resourcemanager.am.max-attemptsyarn.resourcemanager.am.max-retries,默认值为2,即如果一个Job的AM死掉了,RM会重新分配container重启AM一次,而对于container的挂掉,则由具体的AppMaster实现来管理,该参数判断代码如下:

// RMAppImpl.java

public RMAppState transition(RMAppImpl app, RMAppEvent event) {
  int numberOfFailure = app.getNumFailedAppAttempts();
  // other code...
  if (numberOfFailure >= app.maxAppAttempts) {
    app.isNumAttemptsBeyondThreshold = true;
  }
  app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED);
}

注意:如果一个Job失败了,可不一定会触发这个重试,Job失败并不代表其AM失败,Job失败的原因是有多种的

MRV2的Task失败重试

对于AM的失败次数,Yarn提供了用户设置参数来单独控制每个任务,可以覆盖Yarn的默认参数值,其中在MRV2里通过参数 mapreduce.am.max-attempts 体现,该参数的默认值也为2,AM的失败次数由它和 yarn.resourcemanager.am.max-attempts 一起决定,判断逻辑如下:

// RMAppImpl.java

// yarn.resourcemanager.am.max-attempts
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
// mapreduce.am.max-attempts 或Spark里的 spark.yarn.maxAppAttempts
int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
if (individualMaxAppAttempts <= 0 || individualMaxAppAttempts > globalMaxAppAttempts) {
  this.maxAppAttempts = globalMaxAppAttempts;
  LOG.warn("The specific max attempts: " + individualMaxAppAttempts
          + " for application: " + applicationId.getId()
          + " is invalid, because it is out of the range [1, "
          + globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else {
  this.maxAppAttempts = individualMaxAppAttempts;
}

即在RMV2或Spark里的用户设置值在0到 yarn.resourcemanager.am.max-attempts 之间,那么取用户的设置值,如果不在,那么取 yarn.resourcemanager.am.max-attempts 设置值或默认值

由于AM控制了每个Job的运行,而Job由Map Task和Reduce Task组成,因此Job的失败就与Task相关,在MRV2里,提供了 mapreduce.map.maxattemptsmapreduce.reduce.maxattempts 两个值来控制MR Task的最大失败次数,两个参数的默认值都为4,但是在Uber模式的时候,两个参数的值被设为了1

// ReduceTaskImpl.java
protected int getMaxAttempts() {
  return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
}

// MapTaskImpl.java
protected int getMaxAttempts() {
  return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);
}

这两个参数的意思是,单个map task或reduce task的最大尝试次数是4,如果一个task尝试了4次还未成功,那么该Task就是失败的,从而整个Job也是失败的,这时由于AM并没有问题,所以不会引起Yarn对Job的重试

同时这两个参数是针对单个task的,并不是所有task的尝试次数总和,所以如果多个task都有失败,只要每个task的尝试次数不超过4次,Job就不是失败的,所以有时你看到一个Job有几十次或上百次失败,Job最后也是运行成功的!如下:

// TaskImpl.java

// TaskImpl是一个abstract class,Map和Reduce有不同的实现,代表的是单个Task,所以这里判断的是单个Task的尝试次数
if (attemptState == TaskAttemptState.FAILED) {
  failedAttempts.add(attempt.getID());
  if (failedAttempts.size() >= maxAttempts) {
    taces = TaskAttemptCompletionEventStatus.TIPFAILED;
  }
}

Spark on Yarn

对于Spark on Yarn,yarn只负责启动和管理AM以及分配资源,Spark有自己的AM实现,当Executor运行起来后,任务的控制是由Driver负责的,所以在重试上Yarn只负责AM的重试,没有重试的参数冲突

同MRV2一样,Spark可以使用 spark.yarn.maxAppAttempts 参数控制AM的尝试次数,该参数没有默认值,如果不设置则保持Yarn的设置,如果有设置,则与MRV2的 mapreduce.am.max-attempts 参数判断逻辑一致

其次,在Spark对ApplicationMaster的实现里,Spark提供了参数 spark.yarn.max.executor.failures 来控制Executor的失败次数,其默认值是 numExecutors * 2(如果dynamicallocation打开了,那么该值为最大的Executors个数乘2),同时其最小值不小于3。当Executor的失败次数达到这个值的时候,整个Spark Job(这里的Job是整个Spark任务,而不是DAG里的Job/Stage/Task)就失败了,判断逻辑如下:

// ApplicationMaster.scala

private val maxNumExecutorFailures = {
  val effectiveNumExecutors =
    if (Utils.isDynamicAllocationEnabled(sparkConf)) {
      sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
    } else {
      sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
    }
  // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
  // avoid the integer overflow here.
  val defaultMaxNumExecutorFailures = math.max(3,
    if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors))
}
  sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)

// other code ...

// judge
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
  finish(FinalApplicationStatus.FAILED,
    ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
    s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else {
  // ...
}

对于Executor失败的原因,可能是OOM,也可能是心跳超时等等,Task的失败并不一定能导致Executor的失败

对于Task的失败,Spark还提供了参数 spark.task.maxFailures 来控制task的失败次数,其默认值是4,同一个Task失败的次数不能超过4次,否则Spark Job(Job是Spark任务,非DAG里的Job)就失败了。此参数无法限制Task总的失败次数,如果有多个Task失败,只要每个Task的失败次数不超过4次,Spark Job就是成功的!如下:

// TaskSetManager.scala

// numFailures是一个数组,大小为numTasks,存的是每个task的失败次数
if (numFailures(index) >= maxTaskFailures) {
  logError("Task %d in stage %s failed %d times; aborting job".format(
    index, taskSet.id, maxTaskFailures))
  abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
    .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
  return
}

一个Spark app根据DAGScheduler可分为多个Job,Stage或Task,但是任务的重试次数跟Job或Stage无关

参数总结

参数 默认值 备注 设置位置
yarn.resourcemanager.am.max-attempts(yarn.resourcemanager.am.max-retries) 2 控制AppMaster的重试 Yarn的RM
mapreduce.am.max-attempts 2 覆盖Yarn的默认AppMaster的重试次数 MRV2 App or hive
mapreduce.map.maxattempts 4 控制单个map任务的重试次数 MRV2 App or hive
mapreduce.reduce.maxattempts 4 控制单个reduce任务的重试次数 MRV2 App or hive
spark.yarn.maxAppAttempts none 覆盖Yarn的默认AppMaster的重试次数 Spark on Yarn App
spark.yarn.max.executor.failures numExecutors * 2 (twice the maximum number of executors if dynamicallocation is enabled), with a minimum of 3 控制Spark executor的失败重试次数 Spark on Yarn App
spark.task.maxFailures 4 控制Spark单个task的最大失败次数 Spark on Yarn App or Spark standalone App

以上代码基于Hadoop 2.6.0和Spark 2.0.0

欢迎阅读转载,转载请注明出处:https://my.oschina.net/kavn/blog/1543769

展开阅读全文
打赏
0
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部