文档章节

hive Derver类源码解析

政委007
 政委007
发布于 2017/04/27 17:53
字数 1262
阅读 44
收藏 1

接上一篇博客hive 命令行提交sql 执行过程继续写很烂的文章,如果有意见请在下方留言。 Driver类主要编译sql,处理执行前,执行后的钩子函数, 运行执行计划 下边详细解析driver的几个方法

run 方法

  @Override
  public CommandProcessorResponse run(String command)
      throws CommandNeedRetryException {
    //false 表示命令没有编译成执行计划
    return run(command, false);
  }
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
        throws CommandNeedRetryException {
    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
    //返回cpr 和错误处理方法,省略这部分代码
}
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
    errorMessage = null;
    SQLState = null;
    downstreamError = null;
    //判断配置文件是否有效,如果无效直接退出
    if (!validateConfVariables()) {
      return createProcessorResponse(12);
    }
    //判断是否编译,确认编译状态
    stateLock.lock();
    try {
      if (alreadyCompiled) {
        if (driverState == DriverState.COMPILED) {
          driverState = DriverState.EXECUTING;
        } else {
          errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
          console.printError(errorMessage);
          return createProcessorResponse(12);
        }
      } else {
        driverState = DriverState.COMPILING;
      }
    } finally {
      stateLock.unlock();
    }

    // a flag that helps to set the correct driver state in finally block by tracking if
    // the method has been returned by an error or not.
    boolean isFinishedWithError = true;
    try {
      HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
          alreadyCompiled ? ctx.getCmd() : command);
      // Get all the driver run hooks and pre-execute them.
    //执行配置项hive.exec.driver.run.hooks 配置的钩子函数,实现了HiveDriverRunHook 接口
    //hook函数中可以获取执行计划,queryid,用户session等信息 ,在sql执行前后都会执行这部分钩子函数,如果执行失败,sql执行后不会调度,只有成功才会调度
      List<HiveDriverRunHook> driverRunHooks;
      try {
        driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
            HiveDriverRunHook.class);
        for (HiveDriverRunHook driverRunHook : driverRunHooks) {
            driverRunHook.preDriverRun(hookContext);
        }
      } catch (Exception e) {
        errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
        SQLState = ErrorMsg.findSQLState(e.getMessage());
        downstreamError = e;
        console.printError(errorMessage + "\n"
            + org.apache.hadoop.util.StringUtils.stringifyException(e));
        return createProcessorResponse(12);
      }

      // Reset the perf logger 重置性能分析日志,我看到的记录是在hiveserver2启动时的web ui界面中
      PerfLogger perfLogger = SessionState.getPerfLogger(true);
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);

      int ret;
      //开始编译命令 ,构建一个ParseDriver 来编译sql命令,生成执行计划,这部分不过多介绍。还没看懂
      if (!alreadyCompiled) {
        // compile internal will automatically reset the perf logger
        ret = compileInternal(command, true);
        if (ret != 0) {
          return createProcessorResponse(ret);
        }
      }
      //省略代码,事务相关东西
      
 //开始执行命令
      ret = execute(true);
     //省略代码,判端执行结果,构建返回对象,执行成功调度,(hive.exec.driver.run.hooks )钩子函数
    }
  }

execute 方法

主要作用: 构建jobname, 执行hive.exec.pre.hooks配置的钩子函数,循环执行根据plan生产的task任务。plan通过getRootTask方法获取一个任务列表,添加到DriverContext中,遍历执行任务,执行完成后会获取下一批任务,知道任务执行完成。 下边是一些删减后的代码

public int execute(boolean deferClose) throws CommandNeedRetryException {
     //获取jobname默认的长度50。配置项:hive.jobname.length
    int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
    //获取queryID
    String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
    //获取要执行的sql
    String queryStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING);
    //获取最多并行任务数 ,配置项:hive.exec.parallel.thread.number 默认:8
    maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);

    boolean executionError = false;
    try {
      
      resStream = null;
      SessionState ss = SessionState.get();
      HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(), ss.getUserIpAddress());
      hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
        //执行任务前的钩子函数 配置项:hive.exec.pre.hooks
      for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
       
      }

      int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
        + Utilities.getTezTasks(plan.getRootTasks()).size()
        + Utilities.getSparkTasks(plan.getRootTasks()).size();
      String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
       //设置jobname ,默认是sql 44个字符,+ taskid(stage-1)
        //String jobname = "queryID = " + plan.getQueryId() + " ; sql = " + plan.getQueryStr();//Utilities.abbreviate(queryStr, maxlen - 6);
      
      if (isInterrupted()) {
        return handleInterruption("before running tasks.");
      }
      DriverContext driverCxt = new DriverContext(ctx);
      driverCxt.prepare(plan);

      ctx.setHDFSCleanup(true);
      this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)

        //添加任务到driverCxt 中 只获取了第一次需要执行的任务,下边的while循环中还会获取子任务
      for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
        // This should never happen, if it does, it's a bug with the potential to produce
        // incorrect results.
        assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
        driverCxt.addToRunnable(tsk);
      }

      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
      // Loop while you either have tasks running, or tasks queued up ,循环获取任务,当childTask不为null 时继续执行下一层任务
        while (driverCxt.isRunning()) {

        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          queryDisplay.addTask(task);
          //开始执行任务,判断是否是mapred任务,根据任务类型执行任务和是否允许并行执行任务(hive.exec.parallel)调用任务的开始执行
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
          if (!runner.isRunning()) {
            break;
          }
        }

        // 循环等待任务完成
        TaskRunner tskRun = driverCxt.pollFinished();
        if (tskRun == null) {
          continue;
        }
        //当有任务完成时,做后续处理 ,获取执行退出状态,进行错误处理,执行错误退出时的钩子函数。
        
        //如果执行完成的任务有后续任务添加任务到driverCxt 中去执行
        if (tsk.getChildTasks() != null) {
          for (Task<? extends Serializable> child : tsk.getChildTasks()) {
            if (DriverContext.isLaunchable(child)) {
              driverCxt.addToRunnable(child);
            }
          }
        }
      }
       //执行整个sql执行完成后的钩子函数
      hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
      
    //这个sql执行完成后,后续处理,没看懂具体处理了什么,有兴趣可以自己研究一下
    return (0);
  }
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
      String jobname, int jobs, DriverContext cxt) throws HiveException {
    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
    }
    if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
      if (noName) {
        conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
      }
      conf.set("mapreduce.workflow.node.name", tsk.getId());
      Utilities.setWorkflowAdjacencies(conf, plan);
      cxt.incCurJobNo(1);
      console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
    }
    tsk.initialize(conf, plan, cxt);
    TaskResult tskRes = new TaskResult();
    //构建任务运行线程。如果是并行模式,并且是mapreduce 则启动线程执行,否则调用runSequential方法
    TaskRunner tskRun = new TaskRunner(tsk, tskRes);

    cxt.launching(tskRun);
    // Launch Task
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
      // Launch it in the parallel mode, as a separate thread only for MR tasks
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in parallel");
      }
      tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
      tskRun.start();
      LOG.info("任务运行的jobid 是" + tsk.getJobID());
    } else {
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in serial mode");
      }
      tskRun.runSequential();
    }
    return tskRun;
  }

© 著作权归作者所有

政委007
粉丝 10
博文 15
码字总数 15843
作品 0
洛阳
程序员
私信 提问
Hive代码组织及架构简单介绍

hive三个主要组件 Serialzers/Deserialzers(trunk/serde) 此组件允许用户自己开发自定义序列化、反序列化文件解析器,hive自身也内置了一些序列化解析类。 MetaStore(trunk/metastore) 此组件...

扫大街的程序员
2013/11/03
2.9K
0
Hive中文注释乱码解决方案(2)

本文来自网易云社区 作者:王潘安 执行阶段 launchTask 回到Driver类的runInternal方法,看以下执行过程。在runInternal方法中,执行过程调用了execute方法。execute方法里面的内容很多,但是...

网易云
2018/10/16
0
0
Hive客户端脚本CLI分析及debug环境搭建

接触Hive,自然少不了它家的shell命令行模式,下面就来分析下,平常我们接触的这个脚本到底如何运行的。 研究的版本是 1.2.1.(先搭建好Hadoop环境) ---扩展阅读:有兴趣的看看别人写的 https:...

强子哥哥
2016/03/12
191
0
hive基本组件简单介绍

Hive 开发指南 三个主要组件: Serializers/Deserializers,用户可以自定义来解析自有的数据格式。 MetaStore Query Processor 一些其它组件: 命令行组件 Hive Server:提供被客户端使用的a...

枯藤KT
2016/08/03
99
0
震秦/hive-protobuf-serde

Apache Hive Protobuf SerDe ##项目说明: 该项目用于 Hive 解析读取 Protobuf 序列化文件的一个组件. ##开发背景: 因为项目的不确定性, 加字段是常见的. 因此需要对于加了字段不需要改 Scha...

震秦
2015/11/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
197
8
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
1
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
2
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部