hive MapRedTask 调度分析

原创
2017/04/28 10:20
阅读数 112

承接上一篇博客:hive Derver类源码解析 本文介绍最常用的mapred任务调度过程。作为个人的学习笔记,没有很好的描述博客,大家将就着看吧,如果有什么问题欢迎留言交流。 上篇博客中 driver 的execute方法中调度了TaskRunner的 run方法或者runQuential方法,下边来看看这两个方法具体干了什么

TaskRunner.run()

@Override
  public void run() {
 
      runSequential();
     }
  }
  public void runSequential() {
    //调用抽象类Task 的 executeTask()方法
      exitVal = tsk.executeTask();
       result.setExitVal(exitVal, tsk.getException());
  }

抽象类Task 的executeTask() 方法

public int executeTask() {
        //调用抽象方法execute
      int retval = execute(driverContext);     
  }

这里特殊关照了一下Task的子类MapRedTask,下边具体解析MapRedTask的execute方法

MapRedTask 的execute方法

 public int execute(DriverContext driverContext) {

      //判断是否是本地mapred模式
      if (!ctx.isLocalOnlyExecutionMode() &&
          conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
      }
    //判断是否允许子类提交任务,,如果允许,继续向下执行,
//否则会调用父类ExecDriver的execute方法。在这里我被可耻的截胡了。😭。
// 父类方法是通过构建一个jobclient来提交任务,就像我们写其他mapreduce一样,,
//而下边的代码是通过hadoop -jar <jar包> <主类>的方法来提交任务,本质上和父类提交的过程类似。
      runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
      if (!runningViaChild) {
        return super.execute(driverContext);
      }
       //简单关照一下这种方法提交mapreduce的方式,
      String jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;

      String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
          + planPath.toString() + " " + isSilent + " " + hiveConfArgs;

      executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
      //报告任务执行进度
    int exitVal = jobExecHelper.progressLocal(executor, getId());

    }
  

而ExecDriver中提交任务的方式

 JobClient jc = new JobClient(job);
    
      // Finally SUBMIT the JOB! 根据jobconf 生成RunningJob对象,
// 这是jobid已经存在
        //在底层调度到hadoop 的代码了,提交任务,到hadoop 
//,调用usergroupinfomation的doAs方法,生成job对象,提交任务,这里不会等到
      rj = jc.submitJob(job);
      // replace it back
      if (pwd != null) {
        HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
      }
       //循环获取任务执行进度
      returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager());

简单显示一下mapred方法中对任务处理

public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
                                                  IOException {
    return submitJobInternal(conf);
  }

  @InterfaceAudience.Private
  public RunningJob submitJobInternal(final JobConf conf)
      throws FileNotFoundException, IOException {
    try {
      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
        @Override
        public Job run() throws IOException, ClassNotFoundException, 
          InterruptedException {
          Job job = Job.getInstance(conf);
          job.submit();
          return job;
        }
      });
      // update our Cluster instance with the one created by Job for submission
      // (we can't pass our Cluster instance to Job, since Job wraps the config
      // instance, and the two configs would then diverge)
      cluster = job.getCluster();
      return new NetworkedJob(job);
    } catch (InterruptedException ie) {
      throw new IOException("interrupted", ie);
    }
  }
展开阅读全文
打赏
1
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
1
分享
返回顶部
顶部