yarn 状态机

原创
2020/03/27 17:44
阅读数 400

每个有状态且存在复杂状态转换的对象都包含一个状态机

例如org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl 代表一个mr job,其内部就包含一个状态机:

public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, 
  EventHandler<JobEvent> {
      protected static final StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> stateMachineFactory 
          = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)
          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
              JobEventType.JOB_INIT_FAILED,
              new InitFailedTransition()) //JobImpl 在NEW状态 下接收到 JOB_INIT_FAILED类型的事件后 进行InitFailedTransition转换, 转换之后 JobImpl的状态变为InitFailedTransition
          .addTransition(JobStateInternal.NEW, 
              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
              JobEventType.JOB_INIT,
              new InitTransition())  //JobImpl 在NEW状态 下接收到 JOB_INIT类型的事件后 进行InitTransition转换, 转换之后 JobImpl的状态可能为INITED、NEW 二者中的一个
           //略
          .installTopology();
          
      //JobImpl 持有的状态机
      private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
      
      //构造函数
      public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,...) {
          //略
          stateMachine = stateMachineFactory.make(this);  //将stateMachineFactory添加的各种Transition 生成状态机
          //略
      }
  }
 
状态机构建过程

状态机构建过程就是将之前stateMachineFactory添加的各种Transition 生成状态机,最核心的就是生成stateMachineTable。 

Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable = 
           new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();

stateMachineTable 维护了状态之间的转化关系:

  1. 一个状态可以转成成哪些状态
  2. 一个状态可以接受哪些类型的事件
  3. 一个状态接受到事件之后做何种处理(Transition)
状态转换过程

状态转换过程就是事件处理过程

//step1: handler 处理事件
//org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl
public void handle(JobEvent event) {
         //略
         getStateMachine().doTransition(event.getType(), event);
         //略
}

//step2:
//org.apache.hadoop.yarn.state.StateMachineFactory.InternalStateMachine
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
         throws InvalidStateTransitonException  {
      currentState = StateMachineFactory.this.doTransition
          (operand, currentState, eventType, event);
      return currentState;
}

//step3:
//org.apache.hadoop.yarn.state.StateMachineFactory   
private STATE doTransition (OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
      throws InvalidStateTransitonException {
    Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
      = stateMachineTable.get(oldState);  //获取当前状态下可以相应的事件
    if (transitionMap != null) {
      Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition = transitionMap.get(eventType); //根据当前事件获取的转换
      if (transition != null) {
        return transition.doTransition(operand, oldState, event, eventType);   //执行转换
      }
}

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部