文档章节

Hive源码剖析之HiveServer2服务启动过程

 冷血狂魔
发布于 2016/08/27 12:56
字数 1865
阅读 421
收藏 1

一、前言

    前段时间用Hive JDBC出现了阻塞问题,客户端一直处于等待状态,为了解决该问题,花了几天研究了Hive源码,于是准备写成系列博文,与大家分享其中的心得。有不当之处,请大家更正。

二、背景

    我们生活在一个数据的时代,我们在经意与不经意间留下了数据。带着手机出门,不经意间我们留下了计步数据和活动轨迹数据,我们网购商品,不经意间留下个人偏好数据,发个朋友圈,留下了各种图片和文字数据……我们每个人都是数据贡献者。

    数据的暴增给数据的存储、分析和查询带来了很多难点,于是谷歌提出了很多解决方案,并发表了三篇意义重大的论文Google File System、Google MapReduce、Google Bigtable,基于这三篇论文,便产生了Hadoop和HBase。Hadoop是基于GFS和MapReduce理论的开源实现,解决大文件的存储与分析问题,HBase是基于Bigtable理论的开源实现,解决海量数据的实时检索。

    MapReduce对于Hadoop而言,既是算法模型,也是框架。要使用MapReduce必须先把业务转化为适合该算法模型,并基于该框架编程。对于框架,笔者这样认为,框架提高了开发效率的同时,也限定了人的思维范围和编程范围。房子是一个框架结构,住在房子里的人,活动范围只限于活动范围之间,开发框架亦是如此,Struts2是MVC框架,开发人员只能按照它所限定的模式开发,这何尝不是一种禁锢呢?作为一个软件架构者,了解框架实现的细节,才能走出禁锢。这也是笔者坚持看各种框架源码的原因之一。

    编写出好的MapReduce程序本身不是轻而易举的事,所有就有了Hive,它能把Sql指令,转化为MapReduce任务,让开发人员用Sql的方式去使用Hadoop的运算能力。我们在使用这些工具的同时,也不得不赞叹这些杰出科学家奇思妙想。

三、分享重点

    HiveServer2服务启动过程

四、源码

    1、开一段程序,入口肯定是main方法

public static void main(String[] args) {
    //设置加载配置
	HiveConf.setLoadHiveServer2Config(true);
	try {
		ServerOptionsProcessor oproc = new ServerOptionsProcessor(
				"hiveserver2");
		ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
		String initLog4jMessage = LogUtils.initHiveLog4j();
		LOG.debug(initLog4jMessage);
		HiveStringUtils
				.startupShutdownMessage(HiveServer2.class, args, LOG);
		LOG.debug(oproc.getDebugMessage().toString());
        //这里便是要启动服务了
		oprocResponse.getServerOptionsExecutor().execute();
	} catch (LogInitializationException e) {
		LOG.error("Error initializing log: " + e.getMessage(), e);
		System.exit(-1);
	}
}

    2、ServerOptionsExecutor接口有三个实现类

    DeregisterOptionExecutor:将HiveServer2实例从zookeeper中移除

    HelpOptionExecutor:打印help参数

    ServerOptionsExecutor:启动HiveServer2服务

    这里很显然,我们应该看ServerOptionsExecutor类

static class StartOptionExecutor implements ServerOptionsExecutor {
	@Override
	public void execute() {
		try {
            //启动服务
			startHiveServer2();
		} catch (Throwable t) {
			LOG.fatal("Error starting HiveServer2", t);
			System.exit(-1);
		}
	}
}

    3、startHiveServer2方法,重要的地方给出了必要的注释

private static void startHiveServer2() throws Throwable {
	long attempts = 0, maxAttempts = 1;
	while (true) {
		LOG.info("Starting HiveServer2");
		HiveConf hiveConf = new HiveConf();
		maxAttempts = hiveConf
				.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
		HiveServer2 server = null;
		try {
			server = new HiveServer2();
            //初始化
			server.init(hiveConf);
            //启动
			server.start();
           //省略了一些代码
			break;
		} catch (Throwable throwable) {
			throwable.printStackTrace();
            //出现异常就停止服务
			if (server != null) {
				try {
					server.stop();
				} catch (Throwable t) {
					LOG.info(
							"Exception caught when calling stop of HiveServer2 before retrying start",
							t);
				} finally {
					server = null;
				}
			}
            //如果抛出异常,并尝试启动超过了配置的最大尝试次数,抛出错误,启动失败
			if (++attempts >= maxAttempts) {
				throw new Error("Max start attempts " + maxAttempts
						+ " exhausted", throwable);
			} else {
                //60秒后再次尝试启动
				LOG.warn("Error starting HiveServer2 on attempt "
						+ attempts + ", will retry in 60 seconds",
						throwable);
				try {
					Thread.sleep(60L * 1000L);
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}
}

    4、接下来我们看看server的初始化做了哪些事情

@Override
public synchronized void init(HiveConf hiveConf) {
    //实例化clientService实例,该实例用于把用户请求转化并传递给Driver
	cliService = new CLIService(this);
	addService(cliService);
	if (isHTTPTransportMode(hiveConf)) {
		thriftCLIService = new ThriftHttpCLIService(cliService);
	} else {//默认情况是Thrift二进制服务
		thriftCLIService = new ThriftBinaryCLIService(cliService);
	}
    //添加进服务列表
	addService(thriftCLIService);
	super.init(hiveConf);
	// 省略获取配置信息……
	
	// 启动web UI,改web UI用于查看正在运行的Hive任务,默认端口10002
	try {
		        //省略大量获取的配置的代码
                。。。。。。
				webServer = builder.build();
                //添加查询运行任务数据的servlet
				webServer.addServlet("query_page", "/query_page",
						QueryProfileServlet.class);
			}
		}
	} catch (IOException ie) {
		throw new ServiceException(ie);
	}
	// Add a shutdown hook for catching SIGTERM & SIGINT
	final HiveServer2 hiveServer2 = this;
    //添加钩子
	Runtime.getRuntime().addShutdownHook(new Thread() {
		@Override
		public void run() {
			hiveServer2.stop();
		}
	});
}

    初始化主要是获取了一些配置参数,并且告诉程序,要启动这些服务CLIService、thriftCLIService 、webServer

    5、下面是重头戏,启动服务了,紧接着第3步中server.start()看下来,start方法

@Override
public synchronized void start() {
	super.start();//调用父类start方法
	if (webServer != null) {
		try {
			webServer.start();//启动web服务
			LOG.info("Web UI has started on port " + webServer.getPort());
		} catch (Exception e) {
			LOG.error("Error starting Web UI: ", e);
			throw new ServiceException(e);
		}
	}
}

    start方法,做了两件事情,就是调用父类start方法,启动web服务

    6、下面我跟进去看看,父类CompositeService中定义的start方法

@Override
public synchronized void start() {
	int i = 0;
	try {
		for (int n = serviceList.size(); i < n; i++) {
			Service service = serviceList.get(i);
			service.start();
		}
		super.start();
	} catch (Throwable e) {
		stop(i);
		throw new ServiceException("Failed to Start " + getName(), e);
	}
}

这里仅仅是把第4步中添加的服务列表中的服务进行了启动,下面我们我们说服务中的重点,HiveServer2怎么提供远程服务,请看ThriftBinaryCLIService

    7、ThriftBinaryCLIService中的run方法

@Override
public void run() {
	try {
		// 定义处理请求的线程池
		String threadPoolName = "HiveServer2-Handler-Pool";
		ExecutorService executorService = new ThreadPoolExecutor(
				minWorkerThreads, maxWorkerThreads, workerKeepAliveTime,
				TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
				new ThreadFactoryWithGarbageCleanup(threadPoolName));


		//省略定义传输协议、配置参数代码
         ........
		int maxMessageSize = hiveConf
				.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
		int requestTimeout = (int) hiveConf.getTimeVar(
				HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT,
				TimeUnit.SECONDS);
		int beBackoffSlotLength = (int) hiveConf
				.getTimeVar(
						HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
						TimeUnit.MILLISECONDS);
		TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(
				serverSocket)
				.processorFactory(processorFactory)
				.transportFactory(transportFactory)
				.protocolFactory(new TBinaryProtocol.Factory())
				.inputProtocolFactory(
						new TBinaryProtocol.Factory(true, true,
								maxMessageSize, maxMessageSize))
				.requestTimeout(requestTimeout)
				.requestTimeoutUnit(TimeUnit.SECONDS)
				.beBackoffSlotLength(beBackoffSlotLength)
				.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
				.executorService(executorService);

		// TCP Server
		server = new TThreadPoolServer(sargs);
		server.setServerEventHandler(serverEventHandler);
		server.serve();
		String msg = "Started "
				+ ThriftBinaryCLIService.class.getSimpleName()
				+ " on port " + portNum + " with " + minWorkerThreads
				+ "..." + maxWorkerThreads + " worker threads";
		LOG.info(msg);
	} catch (Throwable t) {
		LOG.fatal("Error starting HiveServer2: could not start "
				+ ThriftBinaryCLIService.class.getSimpleName(), t);
		System.exit(-1);
	}
}

    说明:这里的ExecutorService线程池的定义中用了SynchronousQueue队列,该队列的可以认为是长度为1的阻塞队列,当线程池满,并且没有空闲线程,便会阻塞。TThreadPoolServer的特点是,客户端只要不从服务器上断开连接,就会一直占据服务器的一个线程,所以出现了本文中开头出现的阻塞问题,解决办法,如果服务器内存允许,可以适当加大线程池长度,或者增加hive节点,在配合负载均衡。

    8、特别说明

    Thrift是RPC界的利器,Facebook的杰作,可以轻松实现夸语言的服务调用,支持的语言有C++, Java, Go,Python, PHP, Haskell, C#,  JavaScript, Node.js等等

   (1)、模型接口
       服务的调用接口以及接口参数model、返回值model 
   (2)、Tprotocol协议定义 
         将数据(model)编码 、解码 。 
  ( 3)、Ttramsport传输层定义
        编码后的数据传输(简单socket、http) 
   (5)、Tserver服务类型
        服务的Tserver类型,实现了几种rpc调用(单线程、多线程、非阻塞IO)

五、后记

    本文中的描述了HiveServer2的启动过程中,启动的服务,那么客户端是如何与服务端交互的呢?为什么通过JDBC的方式就可以去Hive上执行任务了呢?Hive JDBC除了实现JDBC标准接口外,还做了哪些事情呢?敬请期待《Hive源码剖析之Hive JDBC》

快乐源于分享。

   此博客乃作者原创, 转载请注明出处

 

© 著作权归作者所有

共有 人打赏支持
粉丝 96
博文 43
码字总数 51095
作品 0
杭州
程序员
私信 提问
Hive2.x、HiveServer、HiveServer2简述及Beeline使用

Hive2.x hive2.x特性 LLAP(Live Long and Process)Hive2.1进行了极大的性能优化。在Hive2.x开启LLAP与Apache Hive1.x进行对比测试,其性能提升约25倍。 支持使用HPL/SQL的存储过程,Hive2...

PeakFang-BOK
2018/10/10
0
0
hive(03)、数据仓库Hive Web UI的配置使用

Hive有一个基于web界面的东西,主要用于查看当前HiveServer2服务链接的会话、服务日志、配置参数等信息,这个服务更像是一个hive提供的监控服务,更加方便对hive的使用情况进行监控,本文我们...

MaxBill
2018/01/15
0
0
Configuring Hive High Availability

Hive从0.14开始,使用Zookeeper实现了HiveServer2的HA功能(ZooKeeper Service Discovery),Client端可以通过指定一个nameSpace来连接HiveServer2,而不是指定某一个host和port。本文描述了...

candon123
2017/12/13
0
0
CentOS 6.9 中 搭建 Hive

解压 hive 安装包 2. 安装 mysql 3. 安装 mysql-connector 4. 建立 mysql-connector 链接 5. 启动 mysql 验证 mysql 服务 是否启动成功 6. 修改 mysql 数据库的 root 用户的密码 7. 配置 Hi...

自东土大唐而来
2018/01/16
0
0
Hive远程模式安装(1.00)

hiveserver2和metastore分离部署,元数据存储采用mysql,mysql与metastore分离部署。 mysql: 部署在ctrl节点 hiveserver2: 部署在ctrl和data01节点 metastore: 部署在data02和data03节点 be...

phacks
2016/09/11
12
0

没有更多内容

加载失败,请刷新页面

加载更多

输入两个整数序列,第一个序列表示栈的压入顺序,请判断第二个序列是否可能为该栈的弹出顺序。

import java.util.Stack; public class Solution { public boolean IsPopOrder(int [] pushA,int [] popA) { if(pushA.length==0||popA.length==0) return false; Stack......

南桥北木
37分钟前
1
0
互联网浪潮下,Java程序员如何追赶技术革新的脚步?

一:时代背景 身处互联网行业的我们一直处在变革的最前端,受到行业发展浪潮的洗礼,不停歇地追赶着技术革新的脚步。特别是近几年来, 互联网架构不断演化,经历了从集中式架构到分布式架构,...

老道士
44分钟前
1
0
flink系列(9)-flink启动流程分析

连续写了几天的flink StreamGraph的代码,今天闲来说一下flink的启动

yiduwangkai
57分钟前
1
0
取变量的地址赋值给另一个变量,C通过,C++编译出错

取变量的地址赋值给另一个变量,C通过。正常运行,C++编译出错。 代码如下: #include <stdio.h>int main(int argc, char *argv[]){int x = 3;int *p = &x;int y = p;/*c ...

SamXIAO
今天
1
0
利用隐写术实施攻击

尽管隐写术是一种低频攻击途径,但网络犯罪分子已经开始利用它结合社交媒体的普遍性和快速传播性来传递恶意有效负载。 低调但有效的隐写技术虽然是旧把戏,但将代码隐藏在看似正常的图像中,...

Linux就该这么学
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部