文档章节

你想了解的Hive Query生命周期--钩子函数篇!

问津已非少年
 问津已非少年
发布于 2017/08/18 20:19
字数 2160
阅读 946
收藏 25
点赞 2
评论 3

前言

无论Hive Cli还是HiveServer2,一个HQl语句都要经过Driver进行解析和执行,粗略如下图:

hive arch|center

Driver处理的流程如下:

HQL解析(生成AST语法树) => 语法分析(得到QueryBlock) => 生成逻辑执行计划(Operator) => 逻辑优化(Logical Optimizer Operator) => 生成物理执行计划(Task Plan) => 物理优化(Task Tree) => 构建执行计划(QueryPlan) => 表以及操作鉴权 => 执行引擎执行

流程涉及HQL解析,HQL编译(语法分析,逻辑计划和物理计划,鉴权),执行器执行三个大的方面,在整个生命周期中,按执行顺序会有如下钩子函数:

Driver.run()之前的preDriverRun

该钩子函数由配置 hive.exec.driver.run.hooks 控制,多个钩子实现类以逗号间隔,钩子需实现 org.apache.hadoop.hive.ql.HiveDriverRunHook 接口,该接口描述如下:

public interface HiveDriverRunHook extends Hook {
  /**
   * Invoked before Hive begins any processing of a command in the Driver,
   * notably before compilation and any customizable performance logging.
   */
  public void preDriverRun(
    HiveDriverRunHookContext hookContext) throws Exception;

  /**
   * Invoked after Hive performs any processing of a command, just before a
   * response is returned to the entity calling the Driver.
   */
  public void postDriverRun(
    HiveDriverRunHookContext hookContext) throws Exception;
}

可以看出钩子还提供了 postDriverRun 方法供HQL执行完,数据返回前调用,这个在后面会说到

其参数在Hive里使用的是 HiveDriverRunHookContext 的默认实现类 org.apache.hadoop.hive.ql.HiveDriverRunHookContextImpl,里面提供了两个有用的参数,分别是HiveConf和要执行的Command,其调用信息如下:

HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
  driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
      HiveDriverRunHook.class);
  for (HiveDriverRunHook driverRunHook : driverRunHooks) {
      driverRunHook.preDriverRun(hookContext);
  }
} catch (Exception e) {
  errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
  SQLState = ErrorMsg.findSQLState(e.getMessage());
  downstreamError = e;
  console.printError(errorMessage + "\n"
      + org.apache.hadoop.util.StringUtils.stringifyException(e));
  return createProcessorResponse(12);
}

语法分析之前的preAnalyze

在Driver开始run之后,HQL经过解析会进入编译阶段的语法分析,而在语法分析前会经过钩子 HiveSemanticAnalyzerHookpreAnalyze 方法,该钩子函数由 hive.semantic.analyzer.hook 配置,钩子需实现 org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook 接口,接口描述如下:

public interface HiveSemanticAnalyzerHook extends Hook {
  public ASTNode preAnalyze(
    HiveSemanticAnalyzerHookContext context,
    ASTNode ast) throws SemanticException;

  public void postAnalyze(
    HiveSemanticAnalyzerHookContext context,
    List<Task<? extends Serializable>> rootTasks) throws SemanticException;
}

可以看出钩子类还提供了 postAnalyze 方法供语法分析完后调用,这个在后面会提到

其参数在Hive里使用的是 HiveSemanticAnalyzerHookContext 的默认实现类 org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl,里面提供了HQL对应的输入,输出,提交用户,HiveConf和客户端IP等信息,输入和输出的表及分区信息需要做完语法分析后才能得到,在 preAnalyze 里获取不到,其调用信息如下:

List<HiveSemanticAnalyzerHook> saHooks =
    getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
        HiveSemanticAnalyzerHook.class);

// Do semantic analysis and plan generation
if (saHooks != null) {
  HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
  hookCtx.setConf(conf);
  hookCtx.setUserName(userName);
  hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
  hookCtx.setCommand(command);
  for (HiveSemanticAnalyzerHook hook : saHooks) {
    tree = hook.preAnalyze(hookCtx, tree);
  }
  // 此处开始进行语法分析,会涉及逻辑执行计划和物理执行计划的生成和优化
  sem.analyze(tree, ctx);
  // 更新分析器以便后续的postAnalyzer钩子执行
  hookCtx.update(sem);
  for (HiveSemanticAnalyzerHook hook : saHooks) {
    hook.postAnalyze(hookCtx, sem.getRootTasks());
  }
} else {
  sem.analyze(tree, ctx);
}

语法分析之后的postAnalyze

preAnalyze 的分析可以看出,postAnalyze 与其属于同一个钩子类,因此配置也相同,不同的是它位于Hive的语法分析之后,因此可以获取到HQL的输入和输出表及分区信息,以及语法分析得到的Task信息,由此可以判断是否是需要分布式执行的任务,以及执行引擎是什么,具体的代码和配置可见上面的 preAnalyze 分析

生成执行计划之前的redactor钩子

这个钩子函数是在语法分析之后,生成QueryPlan之前,所以执行它的时候语法分析已完成,具体要跑的任务已定,这个钩子的目的在于完成QueryString的替换,比如QueryString中包含敏感的表或字段信息,在这里都可以完成替换,从而在Yarn的RM界面或其他方式查询该任务的时候,会显示经过替换后的HQL

该钩子由 hive.exec.query.redactor.hooks 配置,多个实现类以逗号间隔,钩子需继承 org.apache.hadoop.hive.ql.hooks.Redactor 抽象类,并替换 redactQuery 方法,接口描述如下:

public abstract class Redactor implements Hook, Configurable {

  private Configuration conf;
  
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  public Configuration getConf() {
    return conf;
  }

  /**
   * Implementations may modify the query so that when placed in the job.xml
   * and thus potenially exposed to admin users, the query does not expose
   * sensitive information.
   */
  public String redactQuery(String query) {
    return query;
  }
}

其调用信息如下:

public static String redactLogString(HiveConf conf, String logString)
    throws InstantiationException, IllegalAccessException, ClassNotFoundException {

  String redactedString = logString;

  if (conf != null && logString != null) {
    List<Redactor> queryRedactors = getHooks(conf, ConfVars.QUERYREDACTORHOOKS, Redactor.class);
    for (Redactor redactor : queryRedactors) {
      redactor.setConf(conf);
      redactedString = redactor.redactQuery(redactedString);
    }
  }

  return redactedString;
}

Task执行之前的preExecutionHook

在执行计划QueryPlan生成完,并通过鉴权后,就会进行具体Task的执行,而Task执行之前会经过一个钩子函数,钩子函数由 hive.exec.pre.hooks 配置,多个钩子实现类以逗号间隔,该钩子的实现方式有两个,分别是:

一、实现 org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext 接口

该接口会传入 org.apache.hadoop.hive.ql.hooks.HookContext 的实例作为参数,而参数类里带有执行计划,HiveConf,Lineage信息,UGI,提交用户名,输入输出表及分区信息等私有变量,为我们实现自己的功能提供了很多帮助

接口描述如下:

public interface ExecuteWithHookContext extends Hook {

  void run(HookContext hookContext) throws Exception;
}

二、实现 org.apache.hadoop.hive.ql.hooks.PreExecute 接口

该接口传入参数包括SessionState,UGI和HQL的输入输出表及分区信息,目前该接口被标为已过时的接口,相比上面的ExecuteWithHookContext,该接口提供的信息可能不完全能满足我们的需求

其接口描述如下:

public interface PreExecute extends Hook {

  /**
   * The run command that is called just before the execution of the query.
   *
   * @param sess
   *          The session state.
   * @param inputs
   *          The set of input tables and partitions.
   * @param outputs
   *          The set of output tables, partitions, local and hdfs directories.
   * @param ugi
   *          The user group security information.
   */
  @Deprecated
  public void run(SessionState sess, Set<ReadEntity> inputs,
      Set<WriteEntity> outputs, UserGroupInformation ugi)
    throws Exception;
}

该钩子的调用信息如下:

SessionState ss = SessionState.get();
HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(), ss.getUserIpAddress());
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);

for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
  if (peh instanceof ExecuteWithHookContext) {
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());

    ((ExecuteWithHookContext) peh).run(hookContext);

    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
  } else if (peh instanceof PreExecute) {
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());

    ((PreExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
        Utils.getUGI());

    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
  }
}

Task执行失败时的ON_FAILURE_HOOKS

Task执行完后,如果执行失败了,那么Hive会调用这个失败的Hook。该钩子由参数 hive.exec.failure.hooks 配置,多个钩子实现类以逗号间隔,钩子需实现 org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext 接口,该接口在上文已有描述。该钩子主要用于在任务执行失败时执行一些措施,比如统计等等

该钩子的调用信息如下:

hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
// Get all the failure execution hooks and execute them.
for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
  perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());

  ((ExecuteWithHookContext) ofh).run(hookContext);

  perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}

Task执行完毕的postExecutionHook

这个钩子是在Task任务执行完毕后执行,如果Task失败,会先执行ON_FAILURE_HOOKS这个钩子,之后执行postExecutionHook,该钩子由参数 hive.exec.post.hooks 配置,多个钩子实现类以逗号间隔,该钩子的实现方式也有两个

一、实现 org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext 接口

这个与preExecutionHook一致

二、实现 org.apache.hadoop.hive.ql.hooks.PostExecute 接口

该接口传入参数包括SessionState,UGI,列级的LineageInfo和HQL的输入输出表及分区信息,目前该接口被标为已过时的接口,相比上面的ExecuteWithHookContext,该接口提供的信息可能不完全能满足我们的需求

其接口描述如下:

public interface PostExecute extends Hook {

  /**
   * The run command that is called just before the execution of the query.
   *
   * @param sess
   *          The session state.
   * @param inputs
   *          The set of input tables and partitions.
   * @param outputs
   *          The set of output tables, partitions, local and hdfs directories.
   * @param lInfo
   *           The column level lineage information.
   * @param ugi
   *          The user group security information.
   */
  @Deprecated
  void run(SessionState sess, Set<ReadEntity> inputs,
      Set<WriteEntity> outputs, LineageInfo lInfo,
      UserGroupInformation ugi) throws Exception;
}

该钩子的调用信息如下:

hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
// Get all the post execution hooks and execute them.
for (Hook peh : getHooks(HiveConf.ConfVars.POSTEXECHOOKS)) {
  if (peh instanceof ExecuteWithHookContext) {
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());

    ((ExecuteWithHookContext) peh).run(hookContext);

    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
  } else if (peh instanceof PostExecute) {
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());

    ((PostExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
        (SessionState.get() != null ? SessionState.get().getLineageState().getLineageInfo()
            : null), Utils.getUGI());

    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
  }
}

Task执行完毕结果返回之前的postDriverRun

该钩子在Task执行完毕,而结果尚未返回之前执行,与preDriverRun相对应,由于是同一个接口,这里不做详细描述

最后

至此,整个HQL执行生命周期中的钩子函数都讲完了,执行顺序和流程可梳理如下:

Driver.run()

=> HiveDriverRunHook.preDriverRun()(hive.exec.driver.run.hooks)

=> Driver.compile()

=> HiveSemanticAnalyzerHook.preAnalyze()(hive.semantic.analyzer.hook)

=> SemanticAnalyze(QueryBlock, LogicalPlan, PhyPlan, TaskTree)

=> HiveSemanticAnalyzerHook.postAnalyze()(hive.semantic.analyzer.hook)

=> QueryString redactor(hive.exec.query.redactor.hooks)

=> QueryPlan Generation

=> Authorization

=> Driver.execute()

=> ExecuteWithHookContext.run() || PreExecute.run() (hive.exec.pre.hooks)

=> TaskRunner

=> if failed, ExecuteWithHookContext.run()(hive.exec.failure.hooks)

=> ExecuteWithHookContext.run() || PostExecute.run() (hive.exec.post.hooks)

=> HiveDriverRunHook.postDriverRun()(hive.exec.driver.run.hooks)

欢迎阅读转载,转载请注明出处:https://my.oschina.net/u/2539801/blog/1514648

© 著作权归作者所有

共有 人打赏支持
问津已非少年
粉丝 15
博文 21
码字总数 33944
作品 0
海淀
程序员
加载中

评论(3)

人生如梦
人生如梦
666 刚好需要用这些功能
问津已非少年
问津已非少年

引用来自“柯激情”的评论

看到有很多的针对hive or spark的二次开发 都是暴力在门外做脏活,利用内部已有的逻辑可以事半功倍。
是的,Hive里除了这些钩子函数,还有一些事件监听器,可以据此实现很多功能
柯激情
柯激情
看到有很多的针对hive or spark的二次开发 都是暴力在门外做脏活,利用内部已有的逻辑可以事半功倍。
深入React的生命周期(上):出生(Mount)

深入React的生命周期(上):出生(Mount) 轻薄2017-11-054 阅读 Reactmount生命周期 前言 本文是对开源图书 React In-depth: An exploration of UI development 的归纳和增强。同时也融入了自己...

轻薄
2017/11/05
0
0
如何解释vue的生命周期才能令面试官满意?

当面试官问:“谈谈你对vue的生命周期的理解”,听到这句话你是不是心里暗自窃喜:这也太容易了吧,不就是beforeCreate、created、beforeMount、mounted、beforeUpdate、updated、beforeDes...

闰土大叔
04/14
0
0
Vue 2.0 浅谈--生命周期和钩子函数

前言 生命周期和钩子函数-介绍 啥也不说先上图 图-1为 Vue 1.0 生命周期图,图-2为 Vue 2.0 生命周期图,图-3为Vue 1.0 和 Vue 2.0 钩子函数比较 重点看 Vue 2.0 生命周期和钩子函数-具体 上代...

青棘
2017/06/06
0
0
深入React的生命周期(下):更新(Update)

深入React的生命周期(下):更新(Update) 轻薄2017-11-055 阅读 React生命周期Update 更新阶段 更新阶段会在三种情况下触发: 更改 :一个组件并不能主动更改它拥有的 属性,它的 属性是由它的...

轻薄
2017/11/05
0
0
05. react 初次见面---State&生命周期

到目前为止我们只学习了一种方法来更新UI。调用 ReactDOM.render( ) 方法来改变输出。 在前面博客中有一个时钟的例子代码: 将时钟封装为Clock组件 1. 将函数转换为类 可以通过5个步骤将函数...

秋季长青
01/12
0
0
Vue2学习小记-给Vue2路由导航钩子和axios拦截器做个封装

1.写在前面 最近在学习Vue2,遇到有些页面请求数据需要用户登录权限、服务器响应不符预期的问题,但是总不能每个页面都做单独处理吧,于是想到axios提供了拦截器这个好东西,再于是就出现了本...

劣者
2017/11/23
0
0
理解vue实例的生命周期和钩子函数

虽然经常使用mounted、created等几个钩子函数,但是并没有很系统的将它和生命周期关联一起去理解,这导致我在最近项目里踩了坑。所以现在花点时间,整理一下这方面的知识。 我的其他文章,也...

四小七
06/21
0
0
angular2 脏检查series1-Zone.js

angular2 脏检查总述 这系列文章将介绍angular2的脏值检查是如何工作的?如何比ng1更高效?带着上述问题,让我们一起来看看angular2这禽兽(谁让它叫angular,又那么生猛)干了什么。 什么是...

诺克船长
2017/11/29
0
0
探寻 webpack 插件机制

webpack 可谓是让人欣喜又让人忧,功能强大但需要一定的学习成本。在探寻 webpack 插件机制前,首先需要了解一件有意思的事情,webpack 插件机制是整个 webpack 工具的骨架,而 webpack 本身...

牧云云
04/18
0
0
vue开发看这篇文章就够了(上)

摘要:作者整理的内容非常全面了,学习Vue的同学值得收藏下这篇文章。 Vue -渐进式JavaScript框架 介绍 vue 中文网 vue github Vue.js 是一套构建用户界面(UI)的渐进式JavaScript框架 库和框...

segmentfault
01/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

pbgo: 基于Protobuf的迷你RPC/REST框架

https://www.oschina.net/p/pbgo

chai2010
29分钟前
0
0
rsync工具介绍、常用选项以及通过ssh同步

linux下的文件同步工具 rsync rsync是非常实用的一个同步工具,可以从a机器到b机器传输一个文件,也可以备份数据,系统默认没有这个工具,要使用命令 yum install -y rsync 安装。 rsync的命...

黄昏残影
44分钟前
0
0
OSChina 周四乱弹 —— 表妹要嫁人 舅妈叮嘱……

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @哈哈哈哈哈嗝:一定要听——The Pancakes的单曲《咁咁咁》 《咁咁咁》- The Pancakes 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :...

小小编辑
今天
194
4
流利阅读笔记30-20180719待学习

重磅:让人类得老年痴呆的竟是它? Lala 2018-07-19 1.今日导读 去年奥斯卡最佳动画长片《寻梦环游记》里有一句经典台词:“比死亡更可怕的,是遗忘”。在电影中,年迈的曾祖母会重复说一样的...

aibinxiao
今天
3
0
1.16 Linux机器相互登录

Linux机器之间以密码方式互相登录 运行命令#ssh [ip address],标准命令:#ssh [username]@ip, 如果没有写用户名,则默认为系统当前登录的用户 命令#w查看系统负载,可查看到连接到该主机的...

小丑鱼00
今天
0
0
about git flow

  昨天元芳做了git分支管理规范的分享,为了拓展大家关于git分支的认知,这里我特意再分享这两个关于git flow的链接,大家可以看一下。 Git 工作流程 Git分支管理策略   git flow本质上是...

qwfys
今天
2
0
Linux系统日志文件

/var/log/messages linux系统总日志 /etc/logrotate.conf 日志切割配置文件 参考https://my.oschina.net/u/2000675/blog/908189 dmesg命令 dmesg’命令显示linux内核的环形缓冲区信息,我们可...

chencheng-linux
今天
1
0
MacOS下给树莓派安装Raspbian系统

下载镜像 前往 树莓派官网 下载镜像。 点击 最新版Raspbian 下载最新版镜像。 下载后请,通过 访达 双击解压,或通过 unzip 命令解压。 检查下载的文件 ls -lh -rw-r--r-- 1 dingdayu s...

dingdayu
今天
1
0
spring boot使用通用mapper(tk.mapper) ,id自增和回显等问题

最近项目使用到tk.mapper设置id自增,数据库是mysql。在使用通用mapper主键生成过程中有一些问题,在总结一下。 1、UUID生成方式-字符串主键 在主键上增加注解 @Id @GeneratedValue...

北岩
今天
2
0
告警系统邮件引擎、运行告警系统

告警系统邮件引擎 cd mail vim mail.py #!/usr/bin/env python#-*- coding: UTF-8 -*-import os,sysreload(sys)sys.setdefaultencoding('utf8')import getoptimport smtplibfr......

Zhouliang6
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部