hive Derver类源码解析

原创
2017/04/27 17:53
阅读数 115

接上一篇博客hive 命令行提交sql 执行过程继续写很烂的文章,如果有意见请在下方留言。 Driver类主要编译sql,处理执行前,执行后的钩子函数, 运行执行计划 下边详细解析driver的几个方法

run 方法

  @Override
  public CommandProcessorResponse run(String command)
      throws CommandNeedRetryException {
    //false 表示命令没有编译成执行计划
    return run(command, false);
  }
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
        throws CommandNeedRetryException {
    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
    //返回cpr 和错误处理方法,省略这部分代码
}
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
    errorMessage = null;
    SQLState = null;
    downstreamError = null;
    //判断配置文件是否有效,如果无效直接退出
    if (!validateConfVariables()) {
      return createProcessorResponse(12);
    }
    //判断是否编译,确认编译状态
    stateLock.lock();
    try {
      if (alreadyCompiled) {
        if (driverState == DriverState.COMPILED) {
          driverState = DriverState.EXECUTING;
        } else {
          errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
          console.printError(errorMessage);
          return createProcessorResponse(12);
        }
      } else {
        driverState = DriverState.COMPILING;
      }
    } finally {
      stateLock.unlock();
    }

    // a flag that helps to set the correct driver state in finally block by tracking if
    // the method has been returned by an error or not.
    boolean isFinishedWithError = true;
    try {
      HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
          alreadyCompiled ? ctx.getCmd() : command);
      // Get all the driver run hooks and pre-execute them.
    //执行配置项hive.exec.driver.run.hooks 配置的钩子函数,实现了HiveDriverRunHook 接口
    //hook函数中可以获取执行计划,queryid,用户session等信息 ,在sql执行前后都会执行这部分钩子函数,如果执行失败,sql执行后不会调度,只有成功才会调度
      List<HiveDriverRunHook> driverRunHooks;
      try {
        driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
            HiveDriverRunHook.class);
        for (HiveDriverRunHook driverRunHook : driverRunHooks) {
            driverRunHook.preDriverRun(hookContext);
        }
      } catch (Exception e) {
        errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
        SQLState = ErrorMsg.findSQLState(e.getMessage());
        downstreamError = e;
        console.printError(errorMessage + "\n"
            + org.apache.hadoop.util.StringUtils.stringifyException(e));
        return createProcessorResponse(12);
      }

      // Reset the perf logger 重置性能分析日志,我看到的记录是在hiveserver2启动时的web ui界面中
      PerfLogger perfLogger = SessionState.getPerfLogger(true);
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);

      int ret;
      //开始编译命令 ,构建一个ParseDriver 来编译sql命令,生成执行计划,这部分不过多介绍。还没看懂
      if (!alreadyCompiled) {
        // compile internal will automatically reset the perf logger
        ret = compileInternal(command, true);
        if (ret != 0) {
          return createProcessorResponse(ret);
        }
      }
      //省略代码,事务相关东西
      
 //开始执行命令
      ret = execute(true);
     //省略代码,判端执行结果,构建返回对象,执行成功调度,(hive.exec.driver.run.hooks )钩子函数
    }
  }

execute 方法

主要作用: 构建jobname, 执行hive.exec.pre.hooks配置的钩子函数,循环执行根据plan生产的task任务。plan通过getRootTask方法获取一个任务列表,添加到DriverContext中,遍历执行任务,执行完成后会获取下一批任务,知道任务执行完成。 下边是一些删减后的代码

public int execute(boolean deferClose) throws CommandNeedRetryException {
     //获取jobname默认的长度50。配置项:hive.jobname.length
    int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
    //获取queryID
    String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
    //获取要执行的sql
    String queryStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING);
    //获取最多并行任务数 ,配置项:hive.exec.parallel.thread.number 默认:8
    maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);

    boolean executionError = false;
    try {
      
      resStream = null;
      SessionState ss = SessionState.get();
      HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(), ss.getUserIpAddress());
      hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
        //执行任务前的钩子函数 配置项:hive.exec.pre.hooks
      for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
       
      }

      int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
        + Utilities.getTezTasks(plan.getRootTasks()).size()
        + Utilities.getSparkTasks(plan.getRootTasks()).size();
      String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
       //设置jobname ,默认是sql 44个字符,+ taskid(stage-1)
        //String jobname = "queryID = " + plan.getQueryId() + " ; sql = " + plan.getQueryStr();//Utilities.abbreviate(queryStr, maxlen - 6);
      
      if (isInterrupted()) {
        return handleInterruption("before running tasks.");
      }
      DriverContext driverCxt = new DriverContext(ctx);
      driverCxt.prepare(plan);

      ctx.setHDFSCleanup(true);
      this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)

        //添加任务到driverCxt 中 只获取了第一次需要执行的任务,下边的while循环中还会获取子任务
      for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
        // This should never happen, if it does, it's a bug with the potential to produce
        // incorrect results.
        assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
        driverCxt.addToRunnable(tsk);
      }

      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
      // Loop while you either have tasks running, or tasks queued up ,循环获取任务,当childTask不为null 时继续执行下一层任务
        while (driverCxt.isRunning()) {

        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          queryDisplay.addTask(task);
          //开始执行任务,判断是否是mapred任务,根据任务类型执行任务和是否允许并行执行任务(hive.exec.parallel)调用任务的开始执行
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
          if (!runner.isRunning()) {
            break;
          }
        }

        // 循环等待任务完成
        TaskRunner tskRun = driverCxt.pollFinished();
        if (tskRun == null) {
          continue;
        }
        //当有任务完成时,做后续处理 ,获取执行退出状态,进行错误处理,执行错误退出时的钩子函数。
        
        //如果执行完成的任务有后续任务添加任务到driverCxt 中去执行
        if (tsk.getChildTasks() != null) {
          for (Task<? extends Serializable> child : tsk.getChildTasks()) {
            if (DriverContext.isLaunchable(child)) {
              driverCxt.addToRunnable(child);
            }
          }
        }
      }
       //执行整个sql执行完成后的钩子函数
      hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
      
    //这个sql执行完成后,后续处理,没看懂具体处理了什么,有兴趣可以自己研究一下
    return (0);
  }
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
      String jobname, int jobs, DriverContext cxt) throws HiveException {
    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
    }
    if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
      if (noName) {
        conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
      }
      conf.set("mapreduce.workflow.node.name", tsk.getId());
      Utilities.setWorkflowAdjacencies(conf, plan);
      cxt.incCurJobNo(1);
      console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
    }
    tsk.initialize(conf, plan, cxt);
    TaskResult tskRes = new TaskResult();
    //构建任务运行线程。如果是并行模式,并且是mapreduce 则启动线程执行,否则调用runSequential方法
    TaskRunner tskRun = new TaskRunner(tsk, tskRes);

    cxt.launching(tskRun);
    // Launch Task
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
      // Launch it in the parallel mode, as a separate thread only for MR tasks
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in parallel");
      }
      tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
      tskRun.start();
      LOG.info("任务运行的jobid 是" + tsk.getJobID());
    } else {
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in serial mode");
      }
      tskRun.runSequential();
    }
    return tskRun;
  }
展开阅读全文
打赏
1
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
1
分享
返回顶部
顶部