文档章节

hbase0.98.9中实现endpoints

彭苏云
 彭苏云
发布于 2015/01/04 19:40
字数 812
阅读 275
收藏 0
点赞 0
评论 0

在我的前面一篇博客中,抄录了hbase官网的一段话,讲的是0.96.0以后版本中endpoint的实现机制由于引进了protobuf框架,有了比较彻底的改变。本文承接上面一篇博客,给出定制一个endpoint的过程。

下面是实现过程:

1、定义接口描述文件(该功能有protobuf提供出来

option java_package = "coprocessor.endpoints.generated";
option java_outer_classname = "RowCounterEndpointProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
  rpc getKeyValueCount(CountRequest)
    returns (CountResponse);
}

个文件我直接拿的hbase提供的example中的例子。其中的语法应该有过类似经验的一看就清楚了,实在不清楚就请查查protobuf的帮助手册吧。

2、根据接口描述文件生成java接口类(该功能有protobuf提供出来)

有了接口描述文件,还需要生成java语言的接口类。这个需要借助protobuf提供的工具protoc。

$protoc --java_out=./ Examples.proto

简单解释下,protoc这个命令在你装了protobuf后就有了。Examples.proto这个是文件名,也就是刚才编写的那个接口描述文件。“--java_out”这个用来指定生成后的java类放的地方。

所以,这地方如果你没有装protobuf,你需要装一个,window和linux版都有,多说一句,如果你去装hadoop64位的编译环境的话,应该是要装protobuf。

3、实现接口

package coprocessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;

public class RowCounterEndpointExample extends RowCountService implements
		Coprocessor, CoprocessorService {
	private RegionCoprocessorEnvironment env;

	public RowCounterEndpointExample() {
	}

	@Override
	public Service getService() {
		return this;
	}

	@Override
	public void getRowCount(RpcController controller, CountRequest request,
			RpcCallback<CountResponse> done) {
		Scan scan = new Scan();
		scan.setFilter(new FirstKeyOnlyFilter());
		CountResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(scan);
			List<Cell> results = new ArrayList<Cell>();
			boolean hasMore = false;
			byte[] lastRow = null;
			long count = 0;
			do {
				hasMore = scanner.next(results);
				for (Cell kv : results) {
					byte[] currentRow = CellUtil.cloneRow(kv);
					if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
						lastRow = currentRow;
						count++;
					}
				}
				results.clear();
			} while (hasMore);

			response = CountResponse.newBuilder().setCount(count).build();
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
		} finally {
			if (scanner != null) {
				try {
					scanner.close();
				} catch (IOException ignored) {
				}
			}
		}
		done.run(response);
	}

	@Override
	public void getKeyValueCount(RpcController controller,
			CountRequest request, RpcCallback<CountResponse> done) {
		CountResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(new Scan());
			List<Cell> results = new ArrayList<Cell>();
			boolean hasMore = false;
			long count = 0;
			do {
				hasMore = scanner.next(results);
				for (Cell kv : results) {
					count++;
				}
				results.clear();
			} while (hasMore);

			response = CountResponse.newBuilder().setCount(count).build();
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
		} finally {
			if (scanner != null) {
				try {
					scanner.close();
				} catch (IOException ignored) {
				}
			}
		}
		done.run(response);
	}

	@Override
	public void start(CoprocessorEnvironment env) throws IOException {
		if (env instanceof RegionCoprocessorEnvironment) {
			this.env = (RegionCoprocessorEnvironment) env;
		} else {
			throw new CoprocessorException("Must be loaded on a table region!");
		}
	}

	@Override
	public void stop(CoprocessorEnvironment env) throws IOException {
		// TODO Auto-generated method stub

	}

}

4、注册接口(Hbase功能,通过配置文件或者表模式方式注册

这部分,可以看hbase权威指南了,我就看这部分做的。

5、测试调用

package coprocessor;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.ServiceException;

import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
import util.HBaseHelper;

public class RowCounterEndpointClientExample {
	public static void main(String[] args) throws ServiceException, Throwable {
		Configuration conf = HBaseConfiguration.create();
		HBaseHelper helper = HBaseHelper.getHelper(conf);
		//helper.dropTable("testtable");
		//helper.createTable("testtable", "colfam1", "colfam2");
		System.out.println("Adding rows to table...");
		helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");

		HTable table = new HTable(conf, "testtable");

		final CountRequest request = CountRequest.getDefaultInstance();
		
		final Batch.Call<RowCountService, Long> call =new Batch.Call<RowCountService, Long>() {
			public Long call(RowCountService counter)
					throws IOException {
				ServerRpcController controller = new ServerRpcController();
				BlockingRpcCallback<CountResponse> rpcCallback = new BlockingRpcCallback<CountResponse>();
				counter.getRowCount(controller, request, rpcCallback);
				CountResponse response = rpcCallback.get();
				if (controller.failedOnException()) {
					throw controller.getFailedOn();
				}
				return (response != null && response.hasCount()) ? response
						.getCount() : 0;
			}
		};
		
		Map<byte[], Long> results = table.coprocessorService(
				RowCountService.class, null, null, call);
		
		for(byte[] b : results.keySet()){
			System.err.println(Bytes.toString(b) + ":" + results.get(b));
		} 
	}
}

© 著作权归作者所有

共有 人打赏支持
彭苏云
粉丝 41
博文 204
码字总数 54255
作品 0
广州
高级程序员
Kubernetes的Endpoints

在之前的博文中,我们演示过如何通过ceph来实现kubernetes的持久存储,以使得像mysql这种有状态服务可以在kubernetes中运行并保存数据。这看起来很美妙,然而在实际的生产环境使用中,通过分...

msj0905
06/27
0
0
Kubernetes核心原理(二)之Controller Manager

1. Controller Manager简介 Controller Manager作为集群内部的管理控制中心,负责集群内的Node、Pod副本、服务端点(Endpoint)、命名空间(Namespace)、服务账号(ServiceAccount)、资源定...

huwh_
2017/07/21
0
0
consule服务注册和发现 安装 部署

安装部署参考 http://blog.sina.com.cn/s/blog_8ea8e9d50102wwlf.html http://www.techweb.com.cn/network/system/2016-01-28/2270190.shtml api连接参考 https://github.com/JoergM/consul......

tianshuai369
2017/04/10
0
0
Prometheus Operator

原文:https://github.com/coreos/prometheus-operator/blob/master/Documentation/design.md 设计(Design) 本文描述了 Prometheus Operator 引入的自定义资源定义之间的设计和交互。 Prom...

weixin_38975685
04/24
0
0
[WCF-Discovery]如何利用”发现代理”实现可用服务的实时维护?

上面的内容大部分是围绕着Ad-Hoc模式展开介绍的。Managed模式和Ad-Hoc不同之处在于可用服务的终结点通过发现代理来统一管理。客户端在进行可用目标服务探测和解析的时候不再需要发送广播请求...

长平狐
2012/09/04
463
1
Botposter.com集群ETCD2.3.7升级至3.0实录

7月1日,为庆祝我党生日,ETCD隆重发布了3.0版本。Botposter.com也在第一时间对集群进行了升级。本文是升级过程的记录与总结(文中假设读者已经使用或测试过ETCD V2,如有不妥请见谅)。 Bo...

Jesse_Joygenio
2016/07/05
375
4
Kubernetes核心概念之Service详解

Service是k8s中非常重要的组成单元,作用是作为代理把在POD中容器内的服务发布出去,提供一套简单的发现机制和服务代理,也就是运维常说的‘前端’概念,那么它如何实现代理功能以及自动伸缩...

奋斗的寒霜
06/25
0
0
SpringBoot 整合 oauth2(五)实现 jwt 及 扩展

什么是jwt,即 json web token。JWT是一种用于双方之间传递安全信息的简洁的、URL安全的表述性声明规范。也是一种token,但是和token有一些不同。 jwt优点: 自包含 防篡改 可自定义扩展 JW...

FantJ
05/22
0
0
深入Spring Boot:Spring Context 的继承关系和影响

原文出处:hengyunabc 前言 对于一个简单的Spring boot应用,它的spring context是只会有一个。 非web spring boot应用,context是AnnotationConfigApplicationContext web spring boot应用,...

hengyunabc
2017/12/30
0
0
Gitea 发布 v1.1 版本,支持 Git-LFS,两步验证等大量改进

我们很高兴的宣布Gitea 发布了 1.1.0 版本。在这个版本中,我们关闭了 126 工单,同时合并了 348 合并请求。你可以从 下载页面 根据你所处的平台和架构下载预编译版本。更多安装详情请参考 ...

lunny
2017/03/10
975
3

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Android 复制和粘贴功能

做了一回搬运工,原文地址:https://blog.csdn.net/kennethyo/article/details/76602765 Android 复制和粘贴功能,需要调用系统服务ClipboardManager来实现。 ClipboardManager mClipboardM...

她叫我小渝
21分钟前
0
0
拦截SQLSERVER的SSL加密通道替换传输过程中的用户名密码实现运维审计(一)

工作准备 •一台SQLSERVER 2005/SQLSERVER 2008服务 •SQLSERVER jdbc驱动程序 •Java开发环境eclipse + jdk1.8 •java反编译工具JD-Core 反编译JDBC分析SQLSERVER客户端与服务器通信原理 SQ...

紅顏為君笑
38分钟前
4
0
jQuery零基础入门——(六)修改DOM结构

《jQuery零基础入门》系列博文是在廖雪峰老师的博文基础上,可能补充了个人的理解和日常遇到的点,用我的理解表述出来,主干出处来自廖雪峰老师的技术分享。 在《零基础入门JavaScript》的时...

JandenMa
54分钟前
0
0
linux mint 1.9 qq 安装

转: https://www.jianshu.com/p/cdc3d03c144d 1. 下载 qq 轻聊版,可在百度搜索后下载 QQ7.9Light.exe 2. 去wine的官网(https://wiki.winehq.org/Ubuntu) 安装 wine . 提醒网页可以切换成中...

Canaan_
今天
0
0
PHP后台运行命令并管理运行程序

php后台运行命令并管理后台运行程序 class ProcessModel{ private $pid; private $command; private $resultToFile = ''; public function __construct($cl=false){......

colin_86
今天
1
0
数据结构与算法4

在此程序中,HighArray类中的find()方法用数据项的值作为参数传递,它的返回值决定是否找到此数据项。 insert()方法向数组下一个空位置放置一个新的数据项。一个名为nElems的字段跟踪记录着...

沉迷于编程的小菜菜
今天
1
1
fiddler安装和基本使用以及代理设置

项目需求 由于开发过程中客户端和服务器数据交互非常频繁,有时候服务端需要知道客户端调用接口传了哪些参数过来,这个时候就需要一个工具可以监听这些接口请求参数,已经接口的响应的数据,这种...

银装素裹
今天
0
0
Python分析《我不是药神》豆瓣评论

读取 Mongo 中的短评数据,进行中文分词 对分词结果取 Top50 生成词云 生成词云效果 看来网上关于 我不是药神 vs 达拉斯 的争论很热啊。关于词频统计就这些,代码中也会完成一些其它的分析任...

猫咪编程
今天
0
0
虚拟机怎么安装vmware tools

https://blog.csdn.net/tjcwt2011/article/details/72638977

AndyZhouX
昨天
1
0
There is no session with id[xxx]

参考网页 https://blog.csdn.net/caimengyuan/article/details/52526765 报错 2018-07-19 23:04:35,330 [http-nio-1008-exec-8] DEBUG [org.apache.shiro.web.servlet.SimpleCookie] - Found......

karma123
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部