hive 命令行提交sql 执行过程

原创
2017/04/27 15:46
阅读数 662

承接上一篇文档:hive cli 启动 本文继续说查看hive源码,分析hive sql 执行过程。但不会详细解析sql 编译过程,因为我也不懂,还没看到😭。

processLine方法

//line : 要执行的命令 。 allowInterrupting : 命令是否可以被中断
 public int processLine(String line, boolean allowInterrupting) {
        SignalHandler oldSignal = null;
        Signal interruptSignal = null;
        //判断是否允许任务终止
        if (allowInterrupting) {
            //定义接收到的信号 (可以用Ctrl+c ,和kill -2两种方式触发信号 )
            interruptSignal = new Signal("INT");
            oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
                private final Thread cliThread = Thread.currentThread();
                private boolean interruptRequested;

                @Override
                public void handle(Signal signal) {
                    boolean initialRequest = !interruptRequested;
                    interruptRequested = true;

                    // Kill the VM on second ctrl+c 当按两次Ctrl+ c时 退出jvm
                    if (!initialRequest) {
                        console.printInfo("Exiting the JVM");
                        System.exit(127);
                    }

                    // Interrupt the CLI thread to stop the current statement and return
                    // to prompt
                    console.printInfo("Interrupting... Be patient, this might take some time.");
                    console.printInfo("Press Ctrl+C again to kill JVM");

                    // First, kill any running MR jobs 杀死所有正在运行的任务,HadoopJobExecHelper 这个类中保存了正在运行的hadoop job ,任务会在这个类的progress()方法中循环获取执行进度,并且保存了所有这个jvm 中执行的任务。
                    HadoopJobExecHelper.killRunningJobs();
                    TezJobExecHelper.killRunningJobs();
                    HiveInterruptUtils.interrupt();
                }
            });
        }

        try {
            int lastRet = 0, ret = 0;

            String command = "";
            for (String oneCmd : line.split(";")) {

                if (StringUtils.endsWith(oneCmd, "\\")) {
                    command += StringUtils.chop(oneCmd) + ";";
                    continue;
                } else {
                    command += oneCmd;
                }
                if (StringUtils.isBlank(command)) {
                    continue;
                }
                //执行命令,根据命令不同做出不同对应,包括对退出,执行sql文件,执行linux命令,和sql等命令处理
                ret = processCmd(command);
                //清理cli查询状态,包括需要查询的sql ,
                SessionState ss = SessionState.get();
                ss.setCommandType(null);
                command = "";
                lastRet = ret;
                boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
                if (ret != 0 && !ignoreErrors) {
                    CommandProcessorFactory.clean((HiveConf) conf);
                    return ret;
                }
            }
            CommandProcessorFactory.clean((HiveConf) conf);
            return lastRet;
        } finally {
            // Once we are done processing the line, restore the old handler
            if (oldSignal != null && interruptSignal != null) {
                Signal.handle(interruptSignal, oldSignal);
            }
        }

processCmd() 方法

public int processCmd(String cmd) {
        CliSessionState ss = (CliSessionState) SessionState.get();
        ss.setLastCommand(cmd);
        conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QueryPlan.makeQueryId());
        // Flush the print stream, so it doesn't include output from the last command
        ss.err.flush();
        String cmd_trimmed = cmd.trim();
        //按空格分隔命令
        String[] tokens = tokenizeCmd(cmd_trimmed);
        int ret = 0;
        //  quit或exit: 关闭回话,退出hive
        if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
            ss.close();
            System.exit(0);
            // 文件处理
        } else if (tokens[0].equalsIgnoreCase("source")) {
           
            //! 开头: 调用Linux系统的shell执行指令
        } else if (cmd_trimmed.startsWith("!")) {
        }  else { // local mode 本地模式:创建CommandProcessor, 执行用户指令
            try {
                //根据命令的第一个词语判断是set,reset,dfs,add,list,delete,reload,crypto等返回不同的命令处理器,其他返回一个Driver类
                CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
                //处理sql
                ret = processLocalCmd(cmd, proc, ss);
            } catch (SQLException e) {
                console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
                        org.apache.hadoop.util.StringUtils.stringifyException(e));
                ret = 1;
            }
        }

        return ret;
    }

processLocalCmd方法

作用: 处理sql命令,根据不同的命令处理器处理不同sql命令,主要区分处理器是否是Driver类,如果是,处理除了(set,reset,dfs,add,list,delete,reload,crypto)命令外的其他sql ,如果不是调用各自处理器的run方法

 int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
        int tryCount = 0;
        boolean needRetry;
        int ret = 0;
        //执行命令时如果抛出CommondNeedRetryException 异常 命令会被重复执行
        do {
            try {
                needRetry = false;
                if (proc != null) {
                    // //如果CommandProcessor是Driver实例
                    if (proc instanceof Driver) {
                        Driver qp = (Driver) proc;
                        //获取标准输出流,打印结果信息
                        PrintStream out = ss.out;
                        long start = System.currentTimeMillis();
                        //回显命令
                        if (ss.getIsVerbose()) {
                            out.println(cmd);
                        }
                        //能获取运行的命令,获取不到history的日志文件
                        qp.setTryCount(tryCount);
                        //driver实例运行用户指令,获取运行结果响应码
                        ret = qp.run(cmd).getResponseCode();
                        //如果执行失败,直接返回
                        if (ret != 0) {
                            qp.close();
                            return ret;
                        }
                        //// 统计指令的运行时间
                        // query has run capture the time
                        long end = System.currentTimeMillis();
                        double timeTaken = (end - start) / 1000.0;

                        ArrayList<String> res = new ArrayList<String>();
                        //打印查询结果的列名称
                        printHeader(qp, out);
                        // 打印查询结果
                        // print the results
                        int counter = 0;
                        try {
                            if (out instanceof FetchConverter) {
                                ((FetchConverter)out).fetchStarted();
                            }
                            while (qp.getResults(res)) {
                                for (String r : res) {
                                    out.println(r);
                                }
                                counter += res.size();
                                res.clear();
                                if (out.checkError()) {
                                    break;
                                }
                            }
                        } catch (IOException e) {
                            console.printError("Failed with exception " + e.getClass().getName() + ":"
                                    + e.getMessage(), "\n"
                                    + org.apache.hadoop.util.StringUtils.stringifyException(e));
                            ret = 1;
                        }

                        int cret = qp.close();
                        if (ret == 0) {
                            ret = cret;
                        }

                        if (out instanceof FetchConverter) {
                            ((FetchConverter)out).fetchFinished();
                        }

                        console.printInfo("Time taken: " + timeTaken + " seconds" +
                                (counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
                    } else {
                        //如果proc不是Driver,也就是用户执行的是非SQL查询操作,直接执行语句,
                        String firstToken = tokenizeCmd(cmd.trim())[0];
                        String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());

                        if (ss.getIsVerbose()) {
                            ss.out.println(firstToken + " " + cmd_1);
                        }
                        //非查询操作,执行执行sql
                        CommandProcessorResponse res = proc.run(cmd_1);
                        if (res.getResponseCode() != 0) {
                            ss.out.println("Query returned non-zero code: " + res.getResponseCode() +
                                    ", cause: " + res.getErrorMessage());
                        }
                        ret = res.getResponseCode();
                    }
                }
            } catch (CommandNeedRetryException e) {
                console.printInfo("Retry query with a different approach...");
                tryCount++;
                needRetry = true;
            }
        } while (needRetry);

        return ret;
    }
展开阅读全文
打赏
1
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
1
分享
返回顶部
顶部