文档章节

基于solr实现hbase的二级索引

m
 miscellanea
发布于 2015/08/24 19:42
字数 1920
阅读 223
收藏 1

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

   

一、目的

    了解hbase的都知道,由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想要实现关系型数据库那样可以随意组合的多条件查询、查询总记录数、分页等就比较麻烦了。想要实现这样的功能,我们可以采用两种方法:

  1. 使用hbase提供的filter,

  2. 自己实现二级索引,通过二级索引查询多符合条件的行健,然后再查询hbase。

    第一种方法不多说了,使用起来很方便,但是局限性也很大,hbase的filter是直接扫记录的,如果数据范围很大,会导致查询速度很慢。所以如果能先使用行健把记录缩小到一个较小范围,那么就比较适合,否则就不适用了。此外该方法不能解决获取总数的为。
    第二种是适用范围就比较广泛了,不过根据实现二级索引的方式解决的问题也不同。这里我们选择solr主要是因为solr可以很轻松实现各种查询(本来就是全文检索引擎)。

二、实现方法

    其实hbase结合solr实现方法还是比较简单的,重点在于一些实现细节上。将hbase记录写入solr的关键就在于hbase提供的Coprocessor,Coprocessor提供了两个实现:endpoint和observerendpoint相当于关系型数据库的存储过程,而observer则相当于触发器。说到这相信大家应该就明白了,我们要利用的就是observer。observer允许我们在记录put前后做一些处理,而我们就是通过postPut将记录同步写入solr(关于Coprocessor具体内容请自行查资料)。

    而写入solr这块就比较简单了,如果是单机就使用ConcurrentUpdateSolrServer,如果是集群就是用 CloudSolrServer。不过这里需要注意的是由于CloudSolrServer不像ConcurrentUpdateSolrServer那 样内置缓存,默认情况下hbase没写一条数据就会向solr提交一次,这样速度会非常慢(很可能hbase写完很久solr这边还在提交),因此要自己 实现一个缓存池,根据hbase的写入速度动态调整,并批量向solr提交。

三、实现代码

    实现方法弄清处置后代码就很容易写了。首先看下Coprocessor的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.uboxol.hbase.coprocessor;
 
import com.uboxol.model.VmMoney;
import com.uboxol.solr.SolrWriter;
import java.io.IOException;
 
/**
  * Created with IntelliJ IDEA.
  * User: guojing
  * Date: 14-10-24
  * Time: 上午11:08
  * To change this template use File | Settings | File Templates.
  */
public class SolrIndexCoprocessorObserver extends BaseRegionObserver {
     private static Logger log = Logger.getLogger(SolrIndexCoprocessorObserver. class );
 
     @Override
     public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
         String rowKey = Bytes.toString(put.getRow());
         try {
             Cell cellInnerCode = put.get(Bytes.toBytes( "data" ), Bytes.toBytes( "inner_code" )).get( 0 );
             String innerCode = new String(CellUtil.cloneValue(cellInnerCode));
 
             Cell cellNodeId = put.get(Bytes.toBytes( "data" ), Bytes.toBytes( "node_id" )).get( 0 );
             String nodeId = new String(CellUtil.cloneValue(cellNodeId));
 
             Cell cellPayType = put.get(Bytes.toBytes( "data" ), Bytes.toBytes( "pay_type" )).get( 0 );
             String payType = new String(CellUtil.cloneValue(cellPayType));
 
             Cell cellCts = put.get(Bytes.toBytes( "data" ), Bytes.toBytes( "cts" )).get( 0 );
             String cts = new String(CellUtil.cloneValue(cellCts));
 
             Cell cellTraSeq = put.get(Bytes.toBytes( "data" ), Bytes.toBytes( "tra_seq" )).get( 0 );
             String traSeq = new String(CellUtil.cloneValue(cellTraSeq));
 
             cts=cts.replace( "-" , "" );
             cts=cts.replace( " " , "" );
             cts=cts.replace( ":" , "" );
 
             VmMoney vm = new VmMoney();
             vm.setCts(cts);
             vm.setId( new Integer(id));
             vm.setInnerCode(innerCode);
             vm.setNodeId( new Integer(nodeId));
             vm.setPayType( new Integer(payType));
             vm.setRowKey(rowKey);
             vm.setTraSeq(traSeq);
 
             SolrWriter so = new SolrWriter();
             so.addDocToCache(vm);
         } catch (Exception ex){
             log.info( "write " +rowKey+ " to solr fail:" +ex.getMessage());
             ex.printStackTrace();
         }
     }
 
     @Override
     public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
         String rowKey = Bytes.toString(delete.getRow());
         try {
             SolrWriter so = new SolrWriter();
             so.deleteDoc(rowKey);
         } catch (Exception ex){
             log.info( "delete " +rowKey+ " from solr fail:" +ex.getMessage());
             ex.printStackTrace();
         }
     }
}

    里边代码很简单,就是在hbase记录写入后和删除后调用SolrWriter进行处理。下边看下SolrWriter的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.uboxol.solr;
 
import com.uboxol.model.VmMoney;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class SolrWriter {
     private static Logger log = Logger.getLogger(SolrWriter. class );
 
     public static String urlSolr = "" ;     //solr地址
     private static String defaultCollection = "" //默认collection
     private static int zkClientTimeOut = 0 ; //zk客户端请求超时间
     private static int zkConnectTimeOut = 0 ; //zk客户端连接超时间
     private static CloudSolrServer solrserver = null ;
 
     private static int maxCacheCount = 0 ;   //缓存大小,当达到该上限时提交
     private static Vector<VmMoney> cache = null ;   //缓存
     public  static Lock commitLock = new ReentrantLock();  //在添加缓存或进行提交时加锁
 
     private static int maxCommitTime = 60 ; //最大提交时间,s
 
     static {
         Configuration conf = HBaseConfiguration.create();
         urlSolr = conf.get( "hbase.solr.zklist" , "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181" );
         defaultCollection = conf.get( "hbase.solr.collection" , "collection1" );
         zkClientTimeOut = conf.getInt( "hbase.solr.zkClientTimeOut" , 10000 );
         zkConnectTimeOut = conf.getInt( "hbase.solr.zkConnectTimeOut" , 10000 );
         maxCacheCount = conf.getInt( "hbase.solr.maxCacheCount" , 10000 );
         maxCommitTime =  conf.getInt( "hbase.solr.maxCommitTime" , 60 * 5 );
 
         log.info( "solr init param" +urlSolr+ "  " +defaultCollection+ "  " +zkClientTimeOut+ "  " +zkConnectTimeOut+ "  " +maxCacheCount+ "  " +maxCommitTime);
         try {
             cache= new Vector<VmMoney>(maxCacheCount);
 
             solrserver = new CloudSolrServer(urlSolr);
             solrserver.setDefaultCollection(defaultCollection);
             solrserver.setZkClientTimeout(zkClientTimeOut);
             solrserver.setZkConnectTimeout(zkConnectTimeOut);
 
             //启动定时任务,第一次延迟10执行,之后每隔指定时间执行一次
             Timer timer= new Timer();
             timer.schedule( new CommitTimer(), 10 * 1000 ,maxCommitTime* 1000 );
         } catch (Exception ex){
             ex.printStackTrace();
         }
 
     }
 
     /**
      * 批量提交
      */
     public void inputDoc(List<VmMoney> vmMoneyList) throws IOException, SolrServerException {
         if (vmMoneyList == null || vmMoneyList.size() == 0 ) {
             return ;
         }
         List<SolrInputDocument> doclist= new ArrayList<SolrInputDocument>(vmMoneyList.size());
         for (VmMoney vm : vmMoneyList) {
             SolrInputDocument doc = new SolrInputDocument();
             doc.addField( "id" , vm.getId());
             doc.addField( "node_id" , vm.getNodeId());
             doc.addField( "inner_code" , vm.getInnerCode());
             doc.addField( "pay_type" , vm.getPayType());
             doc.addField( "rowkey" , vm.getRowKey());
             doc.addField( "cts" , vm.getCts());
             doc.addField( "tra_seq" , vm.getTraSeq());
 
             doclist.add(doc);
         }
         solrserver.add(doclist);
     }
 
     /**
      * 单条提交
      */
     public void inputDoc(VmMoney vmMoney) throws IOException, SolrServerException {
         if (vmMoney == null ) {
             return ;
         }
         SolrInputDocument doc = new SolrInputDocument();
         doc.addField( "id" , vmMoney.getId());
         doc.addField( "node_id" , vmMoney.getNodeId());
         doc.addField( "inner_code" , vmMoney.getInnerCode());
         doc.addField( "pay_type" , vmMoney.getPayType());
         doc.addField( "rowkey" , vmMoney.getRowKey());
         doc.addField( "cts" , vmMoney.getCts());
         doc.addField( "tra_seq" , vmMoney.getTraSeq());
 
         solrserver.add(doc);
 
     }
 
     public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException {
         if (rowkeys == null || rowkeys.size() == 0 ) {
             return ;
         }
         solrserver.deleteById(rowkeys);
     }
 
     public void deleteDoc(String rowkey) throws IOException, SolrServerException {
 
         solrserver.deleteById(rowkey);
     }
 
     /**
      * 添加记录到cache,如果cache达到maxCacheCount,则提交
      */
     public static void addDocToCache(VmMoney vmMoney) {
         commitLock.lock();
         try {
             cache.add(vmMoney);
             log.info( "cache commit maxCacheCount:" +maxCacheCount);
             if (cache.size() >= maxCacheCount) {
                 log.info( "cache commit count:" +cache.size());
                 new SolrWriter().inputDoc(cache);
                 cache.clear();
             }
         } catch (Exception ex) {
             log.info(ex.getMessage());
         } finally {
             commitLock.unlock();
         }
     }
 
     /**
      * 提交定时器
      */
     static class CommitTimer extends TimerTask {
         @Override
         public void run() {
             commitLock.lock();
             try {
                 if (cache.size() > 0 ) { //大于0则提交
                     log.info( "timer commit count:" +cache.size());
                     new SolrWriter().inputDoc(cache);
                     cache.clear();
                 }
             } catch (Exception ex) {
                 log.info(ex.getMessage());
             } finally {
                 commitLock.unlock();
             }
         }
     }
}

    SolrWriter的重点就在于addDocToCache方法和定时器CommitTimer,addDocToCache会在hbase每次插入数据时将记录插入缓存,并且判断是否达到上限,如果达到则将缓存内所用数据提交到solr,此外CommitTimer 则会每隔一段时间提交一次,以保证缓存内所有数据最终写入solr。
    其他一些辅助代码就不贴了,可自行到github查看:hbase-solr-coprocessor (代码仅作参考,由于业务不同不能直接运行)

四、部署

    这里重点说下hbase的Coprocessor部署的一些问题。部署步骤如下:

  • Coprocessor代码打成jar包,拷贝到所有hbase的region server上,注意jdk一定要1.6,高版本可能会导致无法加载

  • 将hbase的hbase.coprocessor.abortonerror设置成true,待确定Coprocessor运行正常后在改为false。此步骤非必要,但是如果Coprocessor有问题会导致所有region无法启动

  • 于我们实现的Coprocessor是region级的,所以不需要启动,直接通过hbase shell即可加载:

    1
    2
    3
    4
    5
    disable 'tablename'
     
    alter 'tablename' ,METHOD => 'table_att' , 'coprocessor' => 'jar包路径,本地使用file:///开头,hdfs上的则用hdfs:///开头|1001|参数,多个逗号隔开' 
     
    enable 'tablename'

五、总结

    这次hbase+solr的部署前后花了不少时间,其实理论方面都很简单,让人感觉轻而易举,但是实际实现的过程中就会遇到不少问题,就比如写入缓存之类的,如果不去测试,就很容易被忽略。


本文转载自:http://www.sxt.cn/info-5072-u-756.html

m
粉丝 6
博文 86
码字总数 22525
作品 0
海淀
私信 提问
加载中

评论(0)

基于solr实现hbase的二级索引

基于solr实现hbase的二级索引 [X] 目的: 由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想要实现关系型数据库那样可以随意组合的、、等就比较麻烦了。想要实现这样的功能,我们可...

白石
2015/01/16
4.6K
7
为了实现在线库的复杂查询,你还在双写吗?

一、在线库不支持在线复杂查询 做在线业务的开发者经常会碰到这样的难题:在线数据库上面运行稍微复杂点的查询,在线业务就挂了!不管是单机数据库如MySQL、PG,还是分布式数据库,HBase、M...

阿里云云栖社区
2019/11/19
0
0
Lily Hbase Indexer 结合 solr实现Hbase的二级索引

环境:CDH5.4 描述:使用CDH 的 key-value store index 服务实现 Hbase的二级索引,当在hbase put 数据后 索引数据没有 立即保存到solr里,而当solr服务重启后,之前在hbase添加的数据的索引...

czmabc
2015/12/17
2.7K
2
HBase应用与发展之HBase RowKey与索引设计

RowKey设计可以说是一个非常基础的话题,因为每一个HBase的使用/开发人员,都是从表/RowKey设计着手的。但细究起来,RowKey设计也有很多难点,尤其是如何与应用特点很好的结合起来。 这篇演讲...

HBase技术社区
2018/09/16
0
0
hadoop组件---面向列的开源数据库(六)--使用sql访问hbase的组件--phoenix全面了解和安装

phoenix简介 我们在之前得文章中已经学习了thrift 以及使用 thrift 对hbase进行访问。 hadoop组件—面向列的开源数据库(三)—hbase的接口thrift简介和安装 hadoop组件—面向列的开源数据库(...

张小凡vip
03/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

20171113曾英特《网络欺诈防范》实验报告

一、实验名称 网络欺诈防范 二、实验目的 理解常用网络欺诈背后的原理,以提高防范意识,并提出具体防范方法。 三、实验内容 1.简单应用SET工具建立冒名网站 2.Ettercap工具的dns_spoof 3...

osc_mzickfah
今天
25
0
IdentityServer4(8)- 使用密码认证方式控制API访问(资源所有者密码授权模式)

一.前言 本文已经更新到 .NET Core 2.2 OAuth 2.0 资源所有者密码模式允许客户端向令牌服务发送用户名和密码,并获取代表该用户的访问令牌。 除了通过无法浏览器进行交互的应用程序之外,通常...

osc_p23q7y3z
今天
19
0
Hail_Hydra2—Beta冲刺日志(5)

这个作业属于哪个课程 2020春-S班(福州大学) 这个作业的要求在那里 团队作业第六次——beta冲刺+事后诸葛亮 团队名称 Hail Hydra(九头蛇) 这个作业的目标 Beta冲刺5 作业正文 作业正文 其他参...

osc_y8c6tkvz
今天
19
0
Tomcat9 Error: Could not find or load main class org.apache.catalina.startup.Bootstrap

今天因为同事要求,希望安装一个Tomcat9给他,结果发现一个问题: 首先我登录的是tomcat官网,选择source包下载,wget http://apache.fayea.com/tomcat/tomcat-9/v9.0.0.M26/src/apache-tomc...

osc_htns3spg
今天
18
0
SpringSecurity使用json登陆

一、创建项目并导入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.spr......

osc_oa9f94a9
今天
23
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部