文档章节

spring-data管理HbaseTemplate connection 坑之源码分析

DanierWei
 DanierWei
发布于 2017/07/13 21:12
字数 1331
阅读 1535
收藏 0

一、背景:

  • 使用spring xml配置管理初始化HbaseTemplate,并使用HbaseTemplate API时存在bug,每次查询都会建立connection,这样会存在并发查询的时候大量时间浪费在connection建立上。

  • spring-data-hadoop版本为2.2.0.RELEASE,测试对比过 ConnectionFactory创建并使connection全局共享通过spring管理的方式,对应如下图1、图2对比,监控的指标分别为: 查询耗时、应用GC、机器与HBase服务网络情况。其中应用GC、机器与HBase服务网络情况都比较良好。查询耗时通过使用spring管理的connection查询非常不稳地。本文后面会从源码进行解析坑所在。同时对比了2.5.0.RELEASE源码都有类似的问题。2.3.0.RELEASE、2.4.0.RELEASE估计也没差~ 图1
    图2

  • 查询方式使用HBase原生api的方式,rowkey范围查询,满足条件的数据条数为29条,表大小2KW+。

    @Autowired
    private HbaseTemplate hbaseTemplate;
    
    Scan scan = new Scan();
    scan.setStartRow(Bytes.toBytes(filterStartRowKey));
    scan.setStopRow(Bytes.toBytes(filterEndRowKey));
    List<Map<String, Object>> result = hbaseTemplate.find(tableName, scan, (resultSet, i) -> {
           List<Cell> cells = resultSet.listCells();
           Map<String, Object> valueByCloumn = cells.parallelStream()
                .collect(Collectors.toMap(cell -> Bytes.toString(CellUtil.cloneQualifier(cell)),cell -> Bytes.toString(CellUtil.cloneValue(cell))));
            return valueByCloumn;
});

二、分析:

spring管理接入步骤

  • 引入jar包
<dependency>
	<groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>2.5.0.RELEASE</version>
    <exclusions>
    	<exclusion>
        	<groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </exclusion>
        <exclusion>
        	<groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
        	<groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </exclusion>
   </exclusions>
</dependency>
<dependency>
	<groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.1.2</version>
    	<exclusions>
        	<exclusion>
            	<artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
       </exclusions>
</dependency>
  • xml配置
<hdp:hbase-configuration id="hbaseConfigurationConf"
                         zk-quorum="xxxx"
                         zk-port="xxx"/>
<bean id="hTableFactory" class="org.apache.hadoop.hbase.client.HTableFactory"/>
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate" lazy-init="true">
    <property name="configuration" ref="hbaseConfigurationConf"/>
    <property name="tableFactory" ref="hTableFactory"/>
</bean>
  • 跟踪代码 hbaseTemplate.find(....),最终执行的方法为*HbaseTemplate的execute(...)*方法,获取connection的关键代码在HTableInterface table = this.getTable(tableName),两个奇葩点既引起bug的原因如下:</br>

    1、首先会判断传入的table是否已存在HTableInterface对象,通过HbaseSynchronizationManager类管理着这些全局的HTableInterface按照tableName分组,但是发现如果需要HbaseSynchronizationManager管理,需要调用他的bindResource方法,同时这个方法调用需要配置AOP,类似spring管理mysql事物配置,拦截器类为HbaseInterceptor,此类的作用为自动将HBase表绑定给当前线程,也就是说,每个在HBase上执行的DAO操作的类都会被HbaseInterceptor包装,因此一旦发现有在使用的表都将被绑定给当前线程,之后再使用这张表时就无需再初始化表了(理论上达到了HBase连接池的效果),调用结束后,表将被自己关闭。但是拦截器中的初始化table的时候,调用的方式还是this.getTable(tableName)逻辑一样

    2、进入下一步,有判断tableFactory是否为空,此对象为spring的xml注入的实例,此处见如上的配置始终是不为空,最终会调用createHTableInterface创建HTableInterface*, 继续跟踪其源码发现跟不注入tableFactory对应是一样的,始终会new HTable(....),HTable初始化的时候会创建connection,当查询完毕之后会触发table.close,也就将connection关闭了,下次在进入查询,会创建新的connection。*

    3、结合以上两点,无论是AOP方式管理连接池,还是初始化HbaseTemplate的方式,都会触发new Ttable(.....),当table.close()时,Table对像中的Connection都会被关闭。即spring data管理的HBase连接池并没达到想要的效果。

关键代码:
1、判断hasResource 有返回没有反之获取新的HTable实例
public static HTableInterface getHTable(String tableName, Configuration configuration, Charset charset, HTableInterfaceFactory tableFactory) {
        if(HbaseSynchronizationManager.hasResource(tableName)) {
            return (HTable)HbaseSynchronizationManager.getResource(tableName);
        } else {
            Object t = null;
            try {
                if(tableFactory != null) {
                    t = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset));
                } else {
                    t = new HTable(configuration, tableName.getBytes(charset));
                }
                return (HTableInterface)t;
            } catch (Exception var6) {
                throw convertHbaseException(var6);
            }
        }
}
//
//
2、HTableFactory.createHTableInterface  
public class HTableFactory implements HTableInterfaceFactory {
    public HTableFactory() {
    }
    public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
        try {
            return new HTable(config, TableName.valueOf(tableName));
        } catch (IOException var4) {
            throw new RuntimeException(var4);
        }
    }
    public void releaseHTableInterface(HTableInterface table) throws IOException {
        table.close();
    }
}
//
//
3、new HTable(...)
public HTable(Configuration conf, TableName tableName) throws IOException {
        this.autoFlush = true;
        this.closed = false;
        this.defaultConsistency = Consistency.STRONG;
        this.tableName = tableName;
        this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
        if(conf == null) {
            this.connection = null;
        } else {
            this.connection = ConnectionManager.getConnectionInternal(conf);
            this.configuration = conf;
            this.pool = getDefaultExecutor(conf);
            this.finishSetup();
        }
    }
//
//
4、table.close
public void close() throws IOException {
        if(!this.closed) {
            .....

            if(this.cleanupConnectionOnClose && this.connection != null) {
                this.connection.close();
            }

            this.closed = true;
        }
    }

三、解决:

  • 通过以上代码分析,spring-data管理HbaseTemplate有坑,解决办法:
    1、自己实现一个tableFactory,注入进去,功能为实现管理全局的connection保证不重复创建connection。

    2、简单的直接使用ConnectionFactory方式创建,同时也测试对比过,速度比较键定,见上面截图

public class HbaseUtils { private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); private static Connection connection;

static {
    Configuration configuration = HBaseConfiguration.create();
    configuration.set("hbase.zookeeper.quorum", "xxxxxx");
    configuration.set("zookeeper.znode.parent", "/hbase");
    try {
        connection = ConnectionFactory.createConnection(configuration);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public static List<Map<String, String>> scan(String tableName, Scan scan) {
    List<Map<String, String>> scanResult = Lists.newArrayList();
    if (connection == null) {
        logger.warn("scan error the connection is null");
        return scanResult;
    }

    Table table = null;
    ResultScanner rs = null;
    try {
        table = connection.getTable(TableName.valueOf(tableName));
        rs = table.getScanner(scan);
        Map<String, String> cellValueItem;
        for (Result r : rs) {
            cellValueItem = r.listCells()
                    .parallelStream()
                    .collect(Collectors.toMap(key -> Bytes.toString(CellUtil.cloneQualifier(key)),
                            value -> Bytes.toString(CellUtil.cloneValue(value))));
            scanResult.add(cellValueItem);
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (rs != null) {
            rs.close();
        }
        if (table != null) {
            try {
                //关闭的逻辑在这个方法里边,但是里边有一个变量cleanupConnectionOnClose控制了关闭与否,
                //这个变量初始化在new Table(....)里。当使用spring管理时,它始终为true,所以connection会被关闭。
                //当非spring管理的时候,是这么获取的,connection.getTable(TableName.valueOf(tableName));  
                //跟踪代码发现cleanupConnectionOnClose始终为false, 
                //并且HTable中的connection属性是通过参数赋值,
                //即为HbaseUtils初始化的connection对象。所以关键在于HTableInterface的创建方式。
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return scanResult;
    }
}

}

[spring-hadoop](https://github.com/spring-projects/spring-hadoop/blob/master/spring-hadoop-hbase/src/main/java/org/springframework/data/hadoop/hbase/HbaseTemplate.java)

© 著作权归作者所有

DanierWei
粉丝 1
博文 9
码字总数 5765
作品 0
杭州
私信 提问
spring-data-hadoop与HBase结合示例

测试数据准备 关于测试数据的准备可以参考之前写的一篇文章 HBase单机模式和伪分布式模式安装和配置 中 HBase简单操作 这节。 示例程序 pom.xml 要想使用 Spring for Apache Hadoop 与 HBas...

JackieYeah
2016/09/09
2.6K
0
Spring for Apache Hadoop 2.2 GA 发布

Spring for Apache Hadoop 2.2 GA 发布,重要更新内容如下: Remove support for running with JDK 6, Java 7 or later is now required Improvements to the HDFS writer to support syncab......

oschina
2015/06/12
1K
6
Redis scan命令的一次坑

Redis作为当前服务架构不可或缺的Cache,其支持丰富多样的数据结构,Redis在使用中其实也有很多坑,本次博主遇到的坑或许说是Java程序员会遇到的多一点,下面就听博主详细道来。 线上服务堵塞...

薛定谔的旺
01/05
188
0
由for update引发的血案

微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。 老司机倾囊相授,带你一路进阶,来不及解释了快上车! 公司的某些业务用到了数据库的悲观...

后端进阶
05/17
731
6
关于MyBatis sqlSession的一点整理

原文地址:关于MyBatis sqlSession的一点整理 工作中,需要学习一下MyBatis sqlSession的产生过程,翻看了mybatis-spring的源码,阅读了一些mybatis的相关doc,对mybatis sqlSession有了一些...

Realfighter
2015/01/04
31.5K
1

没有更多内容

加载失败,请刷新页面

加载更多

rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0
简述TCP的流量控制与拥塞控制

1. TCP流量控制 流量控制就是让发送方的发送速率不要太快,要让接收方来的及接收。 原理是通过确认报文中窗口字段来控制发送方的发送速率,发送方的发送窗口大小不能超过接收方给出窗口大小。...

鏡花水月
今天
10
0
OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
1K
11
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部