文档章节

hbase0.98.9中实现endpoints

彭苏云
 彭苏云
发布于 2015/01/04 19:40
字数 812
阅读 292
收藏 0

「深度学习福利」大神带你进阶工程师,立即查看>>>

在我的前面一篇博客中,抄录了hbase官网的一段话,讲的是0.96.0以后版本中endpoint的实现机制由于引进了protobuf框架,有了比较彻底的改变。本文承接上面一篇博客,给出定制一个endpoint的过程。

下面是实现过程:

1、定义接口描述文件(该功能有protobuf提供出来

option java_package = "coprocessor.endpoints.generated";
option java_outer_classname = "RowCounterEndpointProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
  rpc getKeyValueCount(CountRequest)
    returns (CountResponse);
}

个文件我直接拿的hbase提供的example中的例子。其中的语法应该有过类似经验的一看就清楚了,实在不清楚就请查查protobuf的帮助手册吧。

2、根据接口描述文件生成java接口类(该功能有protobuf提供出来)

有了接口描述文件,还需要生成java语言的接口类。这个需要借助protobuf提供的工具protoc。

$protoc --java_out=./ Examples.proto

简单解释下,protoc这个命令在你装了protobuf后就有了。Examples.proto这个是文件名,也就是刚才编写的那个接口描述文件。“--java_out”这个用来指定生成后的java类放的地方。

所以,这地方如果你没有装protobuf,你需要装一个,window和linux版都有,多说一句,如果你去装hadoop64位的编译环境的话,应该是要装protobuf。

3、实现接口

package coprocessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;

public class RowCounterEndpointExample extends RowCountService implements
		Coprocessor, CoprocessorService {
	private RegionCoprocessorEnvironment env;

	public RowCounterEndpointExample() {
	}

	@Override
	public Service getService() {
		return this;
	}

	@Override
	public void getRowCount(RpcController controller, CountRequest request,
			RpcCallback<CountResponse> done) {
		Scan scan = new Scan();
		scan.setFilter(new FirstKeyOnlyFilter());
		CountResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(scan);
			List<Cell> results = new ArrayList<Cell>();
			boolean hasMore = false;
			byte[] lastRow = null;
			long count = 0;
			do {
				hasMore = scanner.next(results);
				for (Cell kv : results) {
					byte[] currentRow = CellUtil.cloneRow(kv);
					if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
						lastRow = currentRow;
						count++;
					}
				}
				results.clear();
			} while (hasMore);

			response = CountResponse.newBuilder().setCount(count).build();
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
		} finally {
			if (scanner != null) {
				try {
					scanner.close();
				} catch (IOException ignored) {
				}
			}
		}
		done.run(response);
	}

	@Override
	public void getKeyValueCount(RpcController controller,
			CountRequest request, RpcCallback<CountResponse> done) {
		CountResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(new Scan());
			List<Cell> results = new ArrayList<Cell>();
			boolean hasMore = false;
			long count = 0;
			do {
				hasMore = scanner.next(results);
				for (Cell kv : results) {
					count++;
				}
				results.clear();
			} while (hasMore);

			response = CountResponse.newBuilder().setCount(count).build();
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
		} finally {
			if (scanner != null) {
				try {
					scanner.close();
				} catch (IOException ignored) {
				}
			}
		}
		done.run(response);
	}

	@Override
	public void start(CoprocessorEnvironment env) throws IOException {
		if (env instanceof RegionCoprocessorEnvironment) {
			this.env = (RegionCoprocessorEnvironment) env;
		} else {
			throw new CoprocessorException("Must be loaded on a table region!");
		}
	}

	@Override
	public void stop(CoprocessorEnvironment env) throws IOException {
		// TODO Auto-generated method stub

	}

}

4、注册接口(Hbase功能,通过配置文件或者表模式方式注册

这部分,可以看hbase权威指南了,我就看这部分做的。

5、测试调用

package coprocessor;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.ServiceException;

import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
import util.HBaseHelper;

public class RowCounterEndpointClientExample {
	public static void main(String[] args) throws ServiceException, Throwable {
		Configuration conf = HBaseConfiguration.create();
		HBaseHelper helper = HBaseHelper.getHelper(conf);
		//helper.dropTable("testtable");
		//helper.createTable("testtable", "colfam1", "colfam2");
		System.out.println("Adding rows to table...");
		helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");

		HTable table = new HTable(conf, "testtable");

		final CountRequest request = CountRequest.getDefaultInstance();
		
		final Batch.Call<RowCountService, Long> call =new Batch.Call<RowCountService, Long>() {
			public Long call(RowCountService counter)
					throws IOException {
				ServerRpcController controller = new ServerRpcController();
				BlockingRpcCallback<CountResponse> rpcCallback = new BlockingRpcCallback<CountResponse>();
				counter.getRowCount(controller, request, rpcCallback);
				CountResponse response = rpcCallback.get();
				if (controller.failedOnException()) {
					throw controller.getFailedOn();
				}
				return (response != null && response.hasCount()) ? response
						.getCount() : 0;
			}
		};
		
		Map<byte[], Long> results = table.coprocessorService(
				RowCountService.class, null, null, call);
		
		for(byte[] b : results.keySet()){
			System.err.println(Bytes.toString(b) + ":" + results.get(b));
		} 
	}
}
彭苏云
粉丝 44
博文 204
码字总数 54255
作品 0
广州
高级程序员
私信 提问
加载中
请先登录后再评论。
用vertx实现高吞吐量的站点计数器

工具:vertx,redis,mongodb,log4j 源代码地址:https://github.com/jianglibo/visitrank 先看架构图: 如果你不熟悉vertx,请先google一下。我这里将vertx当作一个容器,上面所有的圆圈要...

jianglibo
2014/04/03
4.1K
3
SQLServer实现split分割字符串到列

网上已有人实现sqlserver的split函数可将字符串分割成行,但是我们习惯了split返回数组或者列表,因此这里对其做一些改动,最终实现也许不尽如意,但是也能解决一些问题。 先贴上某大牛写的s...

cwalet
2014/05/21
9.6K
0
Promises/A 和 when() 实现--When.js

When.js 是 cujojs 的轻量级的 Promises/A 和 when() 实现,从 wire.js 的异步核心和 cujojs 的 IOC 容器派生而来。包含很多其他有用的 Promiss 相关概念,例如联合多个 promiss、mapping 和...

匿名
2013/02/15
7.4K
0
Redis 分片实现--Redis Shard

redis-shard 是 Redis 分区的 Python API ,基于对 key 和 key tag 进行 CRC32 checksum 计算,可参考文章 http://antirez.com/post/redis-presharding.html . 该项目由知乎网开发。 使用限制...

匿名
2012/10/24
5.6K
0
iOS 的 Canvas 和 Audio 实现--Ejecta

Ejecta 是一个快速开源的 JavaScript、Canvas 和 音频实现,适用于 iOS 平台。你可以把它想象成一个只支持显示 Canvas 元素的浏览器,它像一个浏览器却无需浏览器,适用于游戏和动画开发。无...

匿名
2012/10/26
4.4K
0

没有更多内容

加载失败,请刷新页面

加载更多

HTML5 视频和音频的常用方法

HTML5 中为视频 video 和音频 audio 元素,提供了属性、方法和事件。这两个元素的常用属性上一节我们已经讲过了,本节我们来讲一下这两个元素的方法。 视频和音频的常用方法 HTML5 中为 vide...

凌兮洛
3分钟前
0
0
Git应用详解第一讲:Git分区、配置与日志

Git应用详解第一讲:Git分区、配置与日志 前言 曾经听到过这样一句话:不会「git」就不要敲代码了。细细品味确实有其中的道理,可能是当事人代码被强行覆盖后的叹息吧! 因此,为了避免这种情...

osc_jhlfbvu7
4分钟前
0
0
HashMap、HashSet、HashTable比较

1.HashMap和HashTable区别 线程安全:HashMap线程不安全。而HashTable通过让get/put上锁达到线程安全,不过代价很大。 HashMap允许key/value为null(但只能有一个null键),而HashTable不允许...

曦鱼violet
4分钟前
0
0
SHELL脚本编程练习答案(多版本)

练习: 1、编写脚本 systeminfo.sh,显示当前主机系统信息,包括:主机名,IPv4地址,操作系统版本,内核 版本,CPU型号,内存大小,硬盘大小 #!/bin/bashYELLOW='\e[1;33m'RED='\e[1;31m'...

osc_0cugk2ks
5分钟前
0
0
盘点 35 个 Apache 顶级项目,我拜服了…

Apache 软件基金会 Apache 软件基金会,全称:Apache Software Foundation,简称:ASF,成立于 1999 年 7 月,是目前世界上最大的最受欢迎的开源软件基金会,也是一个专门为支持开源项目而生...

Java技术栈
6分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部