文档章节

Elasticsearch学习总结六 使用Observer实现HBase到Elasticsearch的数据同步

winstone
 winstone
发布于 2017/06/06 21:53
字数 1470
阅读 1279
收藏 43
  •     最近在公司做统一日志收集处理平台,技术选型肯定要选择elasticsearch,因为可以快速检索系统日志,日志问题排查及功业务链调用可以被快速检索,公司各个应用的日志有些字段比如说content是不需要在es中作为存储的,当时考虑使用一种keyValue形式的数据库作存储,然后使用hbase的Rowkey作为es的docId,实现数据检索在es中,存储在hbase中,这样可以大大减轻es的存储压力。

  • 什么是 Observer

HBase 0.92 版本引入了协处理器(Coprocessor),可以使开发者将自己的代码嵌入到 HBase 中,其中协处理器分为两大块,一个是终端(Endpoint),另一个是本文将要介绍的观察者(Observer)。

Observer 有些类似于 MySQL 中的触发器(Trigger),它可以为 HBase 中的操作添加钩子,并在事件发生后实现自己的的业务逻辑。

  • Observer 主要分为三种:

RegionObserver:增删改查相关,例如 Get、Put、Delete、Scan 等 WALObserver:WAL 操作相关 MasterObserver:DDL-类型相关,例如创建、删除、修改数据表等

数据同步将会使用 RegionObserver 监听 Put 和 Delete 事件。

  • 如何实现自定义的的 Observer

每一个 Observer 都是一个 Jar 包。首先需要引入hbase-server包,并实现如BaseRegionObserver等 HBase 提供的相关接口,重写需要监听对应事件的方法。

实现数据同步功能可以重写postPut和putDelete方法监听 Put 和 Delete 事件。

下面就是一个最简单的例子,在这两个方法中分别得到 hbsae表名和 RowKey 分别对应着es中的indexName和docId

public class HbaseToEsObserver extends BaseRegionObserver {
    private static Client client = null;
    private static final Log LOG = LogFactory.getLog(HbaseToEsObserver.class);
    public static final String SEARCH_INDICE_PATTERN = "idx_%s_%s";
    /**
     * 读取HBase Shell的指令参数
     * @param env
     */
    private void readConfiguration(CoprocessorEnvironment env) {
        Configuration conf = env.getConfiguration();
        EsConfig.clusterName = conf.get("es_cluster");
        EsConfig.nodeHost = conf.get("es_host");
        EsConfig.nodePort = conf.getInt("es_port", 9300);
        EsConfig.indexName = conf.get("es_index");
        EsConfig.typeName = conf.get("es_type");
        LOG.info("observer -- started with config: " + EsConfig.getInfo());
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        readConfiguration(env);
        client = EsSearchManager.getInstance().getClient();
    }
    
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,                       Durability durability) {
       try {
            LOG.debug("es 索引开始 begin");
            String indexId = new String(put.getRow());
            Map<byte[], List<Cell>> familyMap =  put.getFamilyCellMap();
            Map<String, Object> json = new HashMap<String, Object>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                    LOG.info("key="+key+"value="+value);
                }
            }
           //es中索引表的名称是idx_xxx_xxx
           String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
           String indexName = String.format(SEARCH_INDICE_PATTERN, EsConfig.indexName,tableName).toLowerCase();
           ElasticSearchUtil.addUpdateBuilderToBulk(client.prepareUpdate(indexName, EsConfig.typeName, indexId).setUpsert(json));
        } catch (Exception ex) {
            LOG.error(ex);
        }
    }

public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
        try {
            String indexId = new String(delete.getRow());
            ElasticSearchUtil.addDeleteBuilderToBulk(client.prepareDelete(EsConfig.indexName, EsConfig.typeName, indexId));
            LOG.info("observer -- delete a doc: " + indexId);
        } catch (Exception ex) {
            LOG.error(ex);
        }
    }
  • 当日志hbase中一条条插入到hbase中的时候就会触发协处理器动作,为了减轻es服务器操作的压力我们批量操作es中的数据,先将索引数据存储到BulkRequestBuilder,当缓冲池中的索引数据为10条或者当提交间隔达到最大提交间隔的时候批量将索引数据发送到es服务器中。下面看下ElasticSearchUtil中的代码
public class ElasticSearchUtil {
    private static final Log LOG = LogFactory.getLog(ElasticSearchUtil.class);
    // 缓冲池容量
    private static final int MAX_BULK_COUNT = 10;
    // 最大提交间隔(秒)
    private static final int MAX_COMMIT_INTERVAL = 60 * 2;
    private static Client client = null;
    private static BulkRequestBuilder bulkRequestBuilder = null;
    private static Lock commitIndexLock= new ReentrantLock();

    static {
        try {
           client = EsSearchManager.getInstance().getClient();
           bulkRequestBuilder = client.prepareBulk();
           bulkRequestBuilder.setRefresh(true);
           ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
           executor.scheduleWithFixedDelay(
                   new CommitIndexTimer(),
                   30 * 1000,
                   MAX_COMMIT_INTERVAL * 1000,
                   TimeUnit.MILLISECONDS);
        }catch(Exception e){
            LOG.error(e.getMessage());
         }
    }

    /**
     * 判断缓存池是否已满,批量提交
     *
     * @param threshold
     */
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            LOG.info("执行索引程序,当前池中待索引数量="+bulkRequestBuilder.numberOfActions());
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkResponse.hasFailures()) {
                LOG.info("es索引程序成功!");
                bulkRequestBuilder = client.prepareBulk();
            }
            if (bulkResponse.hasFailures()) {
                LOG.error("es索引异常:"+bulkResponse.buildFailureMessage());
            }
        }
    }

    /**
     * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步
     * 定时执行
     */
    static class CommitIndexTimer implements Runnable {
        @Override
        public void run() {
            commitIndexLock.lock();
            try {
                bulkRequest(0);
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                commitIndexLock.unlock();
            }
        }
    }
}

然后将项目打成jar包,提交到hdfs中,然后使用 HBase Shell 创建一个表,将这个 Observer 挂到该表中:

create 'businessslog','info'
disable 'businessslog'

alter 'businessslog',METHOD =>'table_att','coprocessor' => 'hdfs://hadoop26:9000/observer.jar|com.github.hbase.observer.HbaseToEsObserver|1001|es_cluster=myes,es_type=loginfo,es_index=test,es_port=9300,es_host=114.55.253.15'

enable 'businessslog'		
describe 'businessslog'

最后使用 describe 'businessslog' 命令就可以查看协处理器是否挂载成功,使用命令挂载协处理器还是有点麻烦,为此 封装了hbase创建表的时候自动建立协处理器的代码如下,不用在使用麻烦的命令建立协处理器了,直接调用Java 方法创建,方便了许多

 public void createTableWithCoprocessor(String tableName,String oberverName,String path,Map<String,String> map, String...familyColumn) throws Exception {
        TableName table = TableName.valueOf(tableName);
        Admin admin = getConn().getAdmin();
        boolean isExists = admin.tableExists(table);
        if(isExists){
            return ;
        }else{
            try {
                HTableDescriptor htd = new HTableDescriptor(table);
                for (String fc : familyColumn) {
                    HColumnDescriptor hcd = new HColumnDescriptor(fc);
                    htd.addFamily(hcd);
                }
                admin.createTable(htd);
                admin.disableTable(table);
                HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
                for (String fc : familyColumn) {
                    HColumnDescriptor hcd = new HColumnDescriptor(fc);
                    hTableDescriptor.addFamily(hcd);
                }
                hTableDescriptor.addCoprocessor(oberverName, new Path(path), Coprocessor.PRIORITY_USER, map);
                admin.modifyTable(table, hTableDescriptor);
                admin.enableTable(table);
                admin.close();
            } catch (IOException e) {
                logger.error(e.getMessage());
            }
        }
    }

总结: es:可以实现复杂快速查询,但是不适合存储海量数据(针对一些大字段,不存储) hbase:可以实现海量数据存储,但是不适合进行复杂查询 es+hbase可以实现海量数据的复杂快速查询,在这里es可以认为是hbase的二级索引

es中还需要将mapping映射配置正确,确保某些大字段建立索引 不存储,这里就在赘述,如上就可以实现当检索的时候还是在es中查询,当查询具体能容的时候再去hbase根据rowkey也就是es中的docId定位具体日志内容。

以上总结了部分代码,详细的代码请查看github地址 https://github.com/winstonelei/BigDataTools ,包括了一些大数据组件的基本操作,包含了hbase,hadoop,es,hive等

© 著作权归作者所有

winstone
粉丝 38
博文 14
码字总数 17575
作品 0
南京
程序员
私信 提问
加载中

评论(6)

CREATE_17
CREATE_17
博主,读取HBase Shell的指令参数那段代码,EsConfig类怎么实现的,还有EsConfig.nodePort = conf.getInt("es_port", 9300);这个的意思为如果hhbase shell里面不指定es_port的话,就默认为9300吗,期待您的回复,谢谢!
蛮_com
蛮_com
ElasticSearch API 自带有 bulkProcessor
winstone
winstone 博主

引用来自“阿cat”的评论

有没有入门到跑路教程?
哈哈,还没有入门,还在外面呢
MGL_TECH
MGL_TECH
有没有入门到跑路教程?
winstone
winstone 博主

引用来自“文星”的评论

如果把日志内容存储到HBase中,那么如果利用ES来模糊查询content中的内容呢?
_source字段默认是存储的,所以在mapping的时候将这个字段排除,但是检索index依然可以索引该字段的,搜索是完全可以的,到时候再根据索引的docId去hbase中具体检索完整原始内容
文星
文星
如果把日志内容存储到HBase中,那么如果利用ES来模糊查询content中的内容呢?
ElasticSearch,Solr的store问题

是这样,最近在研究对HBase中的数据建立全文索引。在研究Solr和ElasticSearch。在ElasticSearch上为Hbase建立索引的时候,遇到了问题。是mapping的store选项。那个默认应该是不进行存储的,存...

紫色的水瓶
2014/12/19
1K
1
开源数据同步神器——canal

前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将m...

IT米粉
01/10
0
0
Java搜索引擎选择: Elasticsearch与Solr(转)

Elasticsearch简介 Elasticsearch是一个实时的分布式搜索和分析引擎。它可以帮助你用前所未有的速度去处理大规模数据。 它可以用于全文搜索,结构化搜索以及分析,当然你也可以将这三者进行组...

easonjim
2017/11/13
0
0
搜索引擎选择: Elasticsearch与Solr

搜索引擎选型调研文档 Elasticsearch简介* Elasticsearch是一个实时的分布式搜索和分析引擎。它可以帮助你用前所未有的速度去处理大规模数据。 它可以用于全文搜索,结构化搜索以及分析,当然...

开源中国首席码农
2016/11/15
1K
4
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
2018/04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

只需一步,在Spring Boot中统一Restful API返回值格式与统一处理异常

统一返回值 在前后端分离大行其道的今天,有一个统一的返回值格式不仅能使我们的接口看起来更漂亮,而且还可以使前端可以统一处理很多东西,避免很多问题的产生。 比较通用的返回值格式如下:...

晓月寒丶
今天
58
0
区块链应用到供应链上的好处和实际案例

区块链可以解决供应链中的很多问题,例如记录以及追踪产品。那么使用区块链应用到各产品供应链上到底有什么好处?猎头悬赏平台解优人才网小编给大家做个简单的分享: 使用区块链的最突出的优...

猎头悬赏平台
今天
27
0
全世界到底有多少软件开发人员?

埃文斯数据公司(Evans Data Corporation) 2019 最新的统计数据(原文)显示,2018 年全球共有 2300 万软件开发人员,预计到 2019 年底这个数字将达到 2640万,到 2023 年达到 2770万。 而来自...

红薯
今天
61
0
Go 语言基础—— 通道(channel)

通过通信来共享内存(Java是通过共享内存来通信的) 定义 func service() string {time.Sleep(time.Millisecond * 50)return "Done"}func AsyncService() chan string {retCh := mak......

刘一草
今天
57
0
Apache Flink 零基础入门(一):基础概念解析

Apache Flink 的定义、架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速...

Vincent-Duan
今天
50
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部