文档章节

hbase0.98 coprocessor Endpoint 之 HelloWorld

Leo_Vip
 Leo_Vip
发布于 2014/05/15 20:33
字数 1322
阅读 3.8K
收藏 0

###介绍: 源博地址: http://www.cockybook.com/?p=35

HBase作为列族数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。比如,在旧版本的(<0.92)Hbase中,统计数据表的总行数,需要使用Counter方法,执行一次MapReduce Job才能得到。虽然HBase在数据存储层中集成了MapReduce,能够有效用于数据表的分布式计算。然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。于是,HBase在0.92之后引入了协处理器(coprocessors),实现一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器(谓词下推)以及访问控制等。 HBase协处理器的灵感来自于Jeff Dean 09年的演讲( P66-67)。

####hbase coprocessor 大类分为两种coprocessor分别是:

  1. RegionObserver :它是一种类似于传统数据库的触发器,提供了钩子函数:Get、Put、Delete、Scan等。
  1. Endpoint:是一个远程rpc调用,类似于webservice形式调用,但他不适用xml,而是使用的序列化框架是protobuf(序列化后数据更小),本文将介绍此种Coprocessor.

Endpoint 允许您定义自己的动态RPC协议,用于客户端与region servers通讯。Coprocessor 与region server在相同的进程空间中,因此您可以在region端定义自己的方法(endpoint),将计算放到region端,减少网络开销,常用于提升hbase的功能,如:count,sum等。

###我的环境

  • hadoop : 2.2
  • hbase-hadoop2 :0.98+
  • JDK:1.6 ##这里必须要1.6 要不然会出现不能加载jar包的现象。
  • 操作系统:CentOS 6.4

###编写代码

  1. 首先你需要利用protobuf(网上自己搜google维护的目前发展到2.5版本) 工具成一个HelloWorld 序列化对象。

    ####HelloWorld.proto

    option java_package = "com.gzhdi.coprocessor.generated";
    option java_outer_classname = "ServerHelloworld";
    option java_generic_services = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    
    message HelloRequest {
      required bytes askWord = 10;
    }
    
    message HelloResponse {
      required bytes retWord = 10;
    }
    
    message AskRequest {
      required bytes ask = 100;
    }
    
    message AnsResponse {
      required bytes ans = 100;
    }
    
    service HelloWorld {
      rpc sendHello(HelloRequest)
        returns (HelloResponse);
    
      rpc question(AskRequest)
        returns (AnsResponse);
    }
    
    
  2. 使用命令生成代码,并拷贝到你的工程里边去,我的文件在工程下面放着呢,直接生成到工程里边。 这段代码就会生成一个HelloWorld.java文件.

    protoc.exe  --java_out=../src HelloWorld.proto
    
  3. 编写主要代码

    ####server端代码

    package com.gzhdi.copocessor;
    
    import java.io.IOException;
    
    import org.apache.hadoop.hbase.Coprocessor;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    
    import com.google.protobuf.ByteString;
    import com.google.protobuf.RpcCallback;
    import com.google.protobuf.RpcController;
    import com.google.protobuf.Service;
    import com.gzhdi.coprocessor.generated.ServerHelloworld;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloRequest;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloResponse;
    
    public class HelloWorldEndPoint  extends ServerHelloworld.HelloWorld implements Coprocessor,CoprocessorService{
    
    	private RegionCoprocessorEnvironment env; 
    
    	@Override
    	public void sendHello(RpcController controller, HelloRequest request,
    			RpcCallback<HelloResponse> done) {
    		System.out.println("request HelloRequest:"+request.getAskWord());
    		HelloResponse resp=HelloResponse.newBuilder().setRetWord(ByteString.copyFromUtf8("hello world!!!")).build();
    
    		done.run(resp);
    	}
    
    	@Override
    	public void question(RpcController controller, AskRequest request,
    			RpcCallback<AnsResponse> done) {
    		System.out.println("request question:"+request.getAsk());
    		AnsResponse resp=AnsResponse.newBuilder().setAns(ByteString.copyFromUtf8("helloworld,"+request.getAsk().toStringUtf8())).build();
    		done.run(resp);
    	}
    
    	@Override
    	public Service getService() {
    		return this;
    	}
    
    	@Override
    	public void start(CoprocessorEnvironment env) throws IOException {
    		if (env instanceof RegionCoprocessorEnvironment) {  
    		      this.env = (RegionCoprocessorEnvironment)env;  
    		    } else {  
    		      throw new CoprocessorException("Must be loaded on a table region!");  
    		    }  
    	}
    
    	@Override
    	public void stop(CoprocessorEnvironment env) throws IOException {
    
    	}
    }
    

    ####client 端代码

    package com.gzhdi.copocessor;
    
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.coprocessor.Batch;
    import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
    import org.apache.hadoop.hbase.ipc.ServerRpcController;
    
    import com.google.protobuf.ByteString;
    import com.google.protobuf.ServiceException;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloWorld;
    
    public class HelloWorldClient {
    
    	public static void main(String[] args) throws ServiceException, Throwable {
    		myclient();
    	}
    //如果你没有写好自己的例子可以跑跑hbase自带的小例子
    //	private static void example1() throws IOException, ServiceException,
    //			Throwable {
    //		System.out.println("begin.....");  
    //        long begin_time=System.currentTimeMillis();  
    //       Configuration config=HBaseConfiguration.create();  
    ////     String master_ip="192.168.150.128";  
    //       String master_ip="10.10.113.211";  
    //       String zk_ip="10.10.113.211";  
    //       String table_name="t1";  
    //       config.set("hbase.zookeeper.property.clientPort", "2181");   
    //       config.set("hbase.zookeeper.quorum", zk_ip);   
    //       config.set("hbase.master", master_ip+":600000");  
    //       
    //       HTable table = new HTable(config, table_name);
    //       final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
    //       Map results = table.coprocessorService(
    //           ExampleProtos.RowCountService.class, // the protocol interface we're invoking
    //           null, null,                          // start and end row keys
    //           
    //           new Batch.Call() {
    //        	   
    //               public Long call(Object counter) throws IOException {
    //                 BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
    //                     new BlockingRpcCallback();
    //                 ((ExampleProtos.RowCountService)counter).getRowCount(null, request, rpcCallback);
    //                 ExampleProtos.CountResponse response = rpcCallback.get();
    //                 System.out.println("count :::::"+response.getCount());
    //                 return response.hasCount() ? response.getCount() : 0;
    //               }
    //			
    //           });
    //	}
    
    	public static void myclient(){
    		// TODO Auto-generated method stub
    				System.out.println("begin.....");  
    		        long begin_time=System.currentTimeMillis();  
    		       Configuration config=HBaseConfiguration.create();  
    //		     String master_ip="192.168.150.128";  
    		       String master_ip="10.10.113.211";  
    		       String zk_ip="10.10.113.211";  
    		       String table_name="t1";  
    		       config.set("hbase.zookeeper.property.clientPort", "2181");   
    		       config.set("hbase.zookeeper.quorum", zk_ip);   
    		       config.set("hbase.master", master_ip+":600000");  
    
    		       final AskRequest req=AskRequest.newBuilder().setAsk(ByteString.copyFromUtf8("hello")).build();
    		       AnsResponse resp=null;
    		       try {
    				HTable table=new HTable(config,table_name);
    				Map<byte[], ByteString> re=table.coprocessorService(HelloWorld.class, null, null, new Batch.Call<HelloWorld, ByteString>() {
    
    					@Override
    					public ByteString call(HelloWorld instance) throws IOException {
    						ServerRpcController controller = new ServerRpcController();
    						BlockingRpcCallback<AnsResponse> rpccall=new BlockingRpcCallback<AnsResponse>();
    						instance.question(controller, req, rpccall);
    						AnsResponse resp=rpccall.get();
    
    
    						//result
    						System.out.println("resp:"+ resp.getAns().toStringUtf8());
    
    						return resp.getAns();
    					}
    
    				});
    			} catch (IOException e) {
    				e.printStackTrace();
    			} catch (ServiceException e) {
    				e.printStackTrace();
    			} catch (Throwable e) {
    				e.printStackTrace();
    			}  
    	}
    
    }
    
    
  4. 利用jdk 1.6打包(切记jdk1.6,因为hbase用1.6打包的) 导出hellworld.jar 包名随便起。

###部署

  1. 将包helloworld.jar 放在 %HBASE_HOME/lib/ 下就可以了。

  2. 重新启动hbase

  3. 验证

     [root@hdp22 ~ Desktop]# hbase shell
    hbase(main):001:0> import com.gzhdi.copocessor.HelloWorldEndPoint
    => Java::ComGzhdiCopocessor::HelloWorldEndPoint    //如果打印出这句话就说明包已经加载完毕
    
  4. 向指定表添加endpoint

    hbase(main):002:0> create 't1','f1'
    0 row(s) in 6.5290 seconds
    => Hbase::Table - t1   //创建表t1
    
    hbase(main):003:0> alter 't1','coprocessor'=>'|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'
    Updating all regions with the new schema...
    0/1 regions updated.
    1/1 regions updated.
    Done.
    0 row(s) in 2.5960 seconds
    
    hbase(main):005:0> describe 't1'
    DESCRIPTION                                                                                                              ENABLED                                                          
     't1', {TABLE_ATTRIBUTES => {coprocessor$1 => '|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'}, {NAME => 'f1', DATA_BLO true                                                             
     CK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERS                                                                  
     IONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE                                                                  
      => 'true'}                                                                                                                                                                              
    1 row(s) in 0.0940 seconds
    
    //OK 成功了
    

###调用 现在就可以使用你的客户端代码调用该服务了,需要制定zookeeper地址和表名(因为服务是针对表的)。

###参考资料 http://blog.csdn.net/yangzongzhao/article/details/24306775 https://blogs.apache.org/hbase/entry/coprocessor_introduction#comment-1337006507000 http://blog.csdn.net/hljlzc2007/article/details/12652243 http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/coprocessor/package-summary.html http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html

© 著作权归作者所有

Leo_Vip

Leo_Vip

粉丝 10
博文 10
码字总数 12766
作品 0
广州
高级程序员
私信 提问
加载中

评论(4)

Leo_Vip
Leo_Vip 博主

引用来自“国宝熊猫”的评论

0楼主 求助呀,肿么部署到单个表。 我把打好的jar包 上传到了hdfs上,然后指定路径添加进去。 方式1: 先disable表,添加,后enable表的时候就不能enable成功了。 方式2:直接按你的创建表,添加协处理器就不成功一直0/1 regions updated.
这个。。。 帮不了你,你只能从日志查看是什么错误。可以看下源码,这块代码是什么意思。希望对你有帮助
国宝熊猫
0楼主 求助呀,肿么部署到单个表。 我把打好的jar包 上传到了hdfs上,然后指定路径添加进去。 方式1: 先disable表,添加,后enable表的时候就不能enable成功了。 方式2:直接按你的创建表,添加协处理器就不成功一直0/1 regions updated.
Leo_Vip
Leo_Vip 博主

引用来自“林福源”的评论

为何编译出来的ServerHelloWorld里含有unknownfields?因为这个导入eclipse时会报错,客户端无法导出可执行的jar包呀,急需解决~
你看看phoenix 里边很多协处理器,这个也是摸索出来的,可能版本变更了吧
林福源
为何编译出来的ServerHelloWorld里含有unknownfields?因为这个导入eclipse时会报错,客户端无法导出可执行的jar包呀,急需解决~
Hbase0.98 的observer coprocessor

@磊_子 你好,想跟你请教个问题: 你做过Hbase0.98 的observer coprocessor的demo吗?能不能分享下?困惑中呢。。。呵呵

Starseve
2015/07/02
82
1
Hbase coprocessor

基于hadoop-2.5.2以及hbase-1.0搭建的环境。出现一个问题:在使用coprocessor endpoint获取HBase数据的时候,单机上查询数据的速度在1S以内(一个region),但是在三台机子上的分布式环境中,...

xugen12
2015/08/30
172
0
hbase协处理的部署

1.hbase的Coprocessor的简介 HBase是一个分布式的存储体系,数据按照RowKey分成不同的Region,再分配给RegionServer管理。但是RegionServer只承担了存储的功能,如果Region能拥有一部分的计算...

岁月留痕
2015/12/21
189
0
【问题】The coprocessor thread stopped itself due to scan timeout or scan threshold

Kylin执行查询语句的时候报错如下: Error while executing SQL "select t.hotelidm,t.livedt, d.dayofweek,sum(rns) from tableT t join TableD d on t.livedt = d.daYno group by t.hotel......

巧克力黒
2017/03/20
0
0
Hbase 学习(三)Coprocessors

Coprocessors 之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服务端执行,他有点儿想我们熟悉的存储过程,传一些参数进...

2k10
2015/05/11
138
0

没有更多内容

加载失败,请刷新页面

加载更多

MBTI助你成功,让你更了解你自己

MBTI助你成功,让你更了解你自己 生活总是一个七日接着又一个七日,相信看过第七日的小伙伴,很熟悉这段开场白,人生是一个测试接着又一个测试,上学的时候测试,是为了证明你的智力,可谓从...

蛤蟆丸子
37分钟前
49
0
Android实现App版本自动更新

现在很多的App中都会有一个检查版本的功能。例如斗鱼TV App的设置界面下: 当我们点击检查更新的时候,就会向服务器发起版本检测的请求。一般的处理方式是:服务器返回的App版本与当前手机安...

shzwork
昨天
63
0
npm 发布webpack插件 webpack-html-cdn-plugin

初始化一个项目 npm init 切换到npm源 淘宝 npm config set registry https://registry.npm.taobao.org npm npm config set registry http://registry.npmjs.org 登录 npm login 登录状态......

阿豪boy
昨天
87
0
java基础(16)递归

一.说明 递归:方法内调用自己 public static void run1(){ //递归 run1(); } 二.入门: 三.执行流程: 四.无限循环:经常用 无限递归不要轻易使用,无限递归的终点是:栈内存溢出错误 五.递...

煌sir
昨天
63
0
REST接口设计规范总结

URI格式规范 URI中尽量使用连字符”-“代替下划线”_”的使用 URI中统一使用小写字母 URI中不要包含文件(脚本)的扩展名 URI命名规范 文档(Document)类型的资源用名词(短语)单数命名 集合(Co...

Treize
昨天
69
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部