文档章节

spring-data管理HbaseTemplate connection 坑之源码分析

DanierWei
 DanierWei
发布于 2017/07/13 21:12
字数 1331
阅读 457
收藏 0

一、背景:

  • 使用spring xml配置管理初始化HbaseTemplate,并使用HbaseTemplate API时存在bug,每次查询都会建立connection,这样会存在并发查询的时候大量时间浪费在connection建立上。

  • spring-data-hadoop版本为2.2.0.RELEASE,测试对比过 ConnectionFactory创建并使connection全局共享通过spring管理的方式,对应如下图1、图2对比,监控的指标分别为: 查询耗时、应用GC、机器与HBase服务网络情况。其中应用GC、机器与HBase服务网络情况都比较良好。查询耗时通过使用spring管理的connection查询非常不稳地。本文后面会从源码进行解析坑所在。同时对比了2.5.0.RELEASE源码都有类似的问题。2.3.0.RELEASE、2.4.0.RELEASE估计也没差~ 图1
    图2

  • 查询方式使用HBase原生api的方式,rowkey范围查询,满足条件的数据条数为29条,表大小2KW+。

    @Autowired
    private HbaseTemplate hbaseTemplate;
    
    Scan scan = new Scan();
    scan.setStartRow(Bytes.toBytes(filterStartRowKey));
    scan.setStopRow(Bytes.toBytes(filterEndRowKey));
    List<Map<String, Object>> result = hbaseTemplate.find(tableName, scan, (resultSet, i) -> {
           List<Cell> cells = resultSet.listCells();
           Map<String, Object> valueByCloumn = cells.parallelStream()
                .collect(Collectors.toMap(cell -> Bytes.toString(CellUtil.cloneQualifier(cell)),cell -> Bytes.toString(CellUtil.cloneValue(cell))));
            return valueByCloumn;
});

二、分析:

spring管理接入步骤

  • 引入jar包
<dependency>
	<groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>2.5.0.RELEASE</version>
    <exclusions>
    	<exclusion>
        	<groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </exclusion>
        <exclusion>
        	<groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
        	<groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </exclusion>
   </exclusions>
</dependency>
<dependency>
	<groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.1.2</version>
    	<exclusions>
        	<exclusion>
            	<artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
       </exclusions>
</dependency>
  • xml配置
<hdp:hbase-configuration id="hbaseConfigurationConf"
                         zk-quorum="xxxx"
                         zk-port="xxx"/>
<bean id="hTableFactory" class="org.apache.hadoop.hbase.client.HTableFactory"/>
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate" lazy-init="true">
    <property name="configuration" ref="hbaseConfigurationConf"/>
    <property name="tableFactory" ref="hTableFactory"/>
</bean>
  • 跟踪代码 hbaseTemplate.find(....),最终执行的方法为*HbaseTemplate的execute(...)*方法,获取connection的关键代码在HTableInterface table = this.getTable(tableName),两个奇葩点既引起bug的原因如下:</br>

    1、首先会判断传入的table是否已存在HTableInterface对象,通过HbaseSynchronizationManager类管理着这些全局的HTableInterface按照tableName分组,但是发现如果需要HbaseSynchronizationManager管理,需要调用他的bindResource方法,同时这个方法调用需要配置AOP,类似spring管理mysql事物配置,拦截器类为HbaseInterceptor,此类的作用为自动将HBase表绑定给当前线程,也就是说,每个在HBase上执行的DAO操作的类都会被HbaseInterceptor包装,因此一旦发现有在使用的表都将被绑定给当前线程,之后再使用这张表时就无需再初始化表了(理论上达到了HBase连接池的效果),调用结束后,表将被自己关闭。但是拦截器中的初始化table的时候,调用的方式还是this.getTable(tableName)逻辑一样

    2、进入下一步,有判断tableFactory是否为空,此对象为spring的xml注入的实例,此处见如上的配置始终是不为空,最终会调用createHTableInterface创建HTableInterface*, 继续跟踪其源码发现跟不注入tableFactory对应是一样的,始终会new HTable(....),HTable初始化的时候会创建connection,当查询完毕之后会触发table.close,也就将connection关闭了,下次在进入查询,会创建新的connection。*

    3、结合以上两点,无论是AOP方式管理连接池,还是初始化HbaseTemplate的方式,都会触发new Ttable(.....),当table.close()时,Table对像中的Connection都会被关闭。即spring data管理的HBase连接池并没达到想要的效果。

关键代码:
1、判断hasResource 有返回没有反之获取新的HTable实例
public static HTableInterface getHTable(String tableName, Configuration configuration, Charset charset, HTableInterfaceFactory tableFactory) {
        if(HbaseSynchronizationManager.hasResource(tableName)) {
            return (HTable)HbaseSynchronizationManager.getResource(tableName);
        } else {
            Object t = null;
            try {
                if(tableFactory != null) {
                    t = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset));
                } else {
                    t = new HTable(configuration, tableName.getBytes(charset));
                }
                return (HTableInterface)t;
            } catch (Exception var6) {
                throw convertHbaseException(var6);
            }
        }
}
//
//
2、HTableFactory.createHTableInterface  
public class HTableFactory implements HTableInterfaceFactory {
    public HTableFactory() {
    }
    public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
        try {
            return new HTable(config, TableName.valueOf(tableName));
        } catch (IOException var4) {
            throw new RuntimeException(var4);
        }
    }
    public void releaseHTableInterface(HTableInterface table) throws IOException {
        table.close();
    }
}
//
//
3、new HTable(...)
public HTable(Configuration conf, TableName tableName) throws IOException {
        this.autoFlush = true;
        this.closed = false;
        this.defaultConsistency = Consistency.STRONG;
        this.tableName = tableName;
        this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
        if(conf == null) {
            this.connection = null;
        } else {
            this.connection = ConnectionManager.getConnectionInternal(conf);
            this.configuration = conf;
            this.pool = getDefaultExecutor(conf);
            this.finishSetup();
        }
    }
//
//
4、table.close
public void close() throws IOException {
        if(!this.closed) {
            .....

            if(this.cleanupConnectionOnClose && this.connection != null) {
                this.connection.close();
            }

            this.closed = true;
        }
    }

三、解决:

  • 通过以上代码分析,spring-data管理HbaseTemplate有坑,解决办法:
    1、自己实现一个tableFactory,注入进去,功能为实现管理全局的connection保证不重复创建connection。

    2、简单的直接使用ConnectionFactory方式创建,同时也测试对比过,速度比较键定,见上面截图

public class HbaseUtils { private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); private static Connection connection;

static {
    Configuration configuration = HBaseConfiguration.create();
    configuration.set("hbase.zookeeper.quorum", "xxxxxx");
    configuration.set("zookeeper.znode.parent", "/hbase");
    try {
        connection = ConnectionFactory.createConnection(configuration);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public static List<Map<String, String>> scan(String tableName, Scan scan) {
    List<Map<String, String>> scanResult = Lists.newArrayList();
    if (connection == null) {
        logger.warn("scan error the connection is null");
        return scanResult;
    }

    Table table = null;
    ResultScanner rs = null;
    try {
        table = connection.getTable(TableName.valueOf(tableName));
        rs = table.getScanner(scan);
        Map<String, String> cellValueItem;
        for (Result r : rs) {
            cellValueItem = r.listCells()
                    .parallelStream()
                    .collect(Collectors.toMap(key -> Bytes.toString(CellUtil.cloneQualifier(key)),
                            value -> Bytes.toString(CellUtil.cloneValue(value))));
            scanResult.add(cellValueItem);
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (rs != null) {
            rs.close();
        }
        if (table != null) {
            try {
                //关闭的逻辑在这个方法里边,但是里边有一个变量cleanupConnectionOnClose控制了关闭与否,
                //这个变量初始化在new Table(....)里。当使用spring管理时,它始终为true,所以connection会被关闭。
                //当非spring管理的时候,是这么获取的,connection.getTable(TableName.valueOf(tableName));  
                //跟踪代码发现cleanupConnectionOnClose始终为false, 
                //并且HTable中的connection属性是通过参数赋值,
                //即为HbaseUtils初始化的connection对象。所以关键在于HTableInterface的创建方式。
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return scanResult;
    }
}

}

[spring-hadoop](https://github.com/spring-projects/spring-hadoop/blob/master/spring-hadoop-hbase/src/main/java/org/springframework/data/hadoop/hbase/HbaseTemplate.java)

© 著作权归作者所有

共有 人打赏支持
DanierWei
粉丝 1
博文 9
码字总数 5765
作品 0
杭州
spring-data-hadoop与HBase结合示例

测试数据准备 关于测试数据的准备可以参考之前写的一篇文章 HBase单机模式和伪分布式模式安装和配置 中 HBase简单操作 这节。 示例程序 pom.xml 要想使用 Spring for Apache Hadoop 与 HBas...

JackieYeah
2016/09/09
1K
0
关于MyBatis sqlSession的一点整理

原文地址:关于MyBatis sqlSession的一点整理 工作中,需要学习一下MyBatis sqlSession的产生过程,翻看了mybatis-spring的源码,阅读了一些mybatis的相关doc,对mybatis sqlSession有了一些...

Realfighter
2015/01/04
0
1
分布式事务系列(1.2)Spring的事务体系

1 系列目录 - 分布式事务系列(开篇)提出疑问和研究过程- 分布式事务系列(1.1)Spring事务管理器PlatformTransactionManager源码分析- 分布式事务系列(1.2)Spring事务体系- 分布式事务系...

乒乓狂魔
2015/05/18
0
4
jdbc-jdbcTemplate-hibernate-jpa-springDataJpa系列(一)

1 需要解决的疑惑 目前jdbc、jdbcTemplate、hibernate、jpa、spring之间有或多或少的关系。在使用它们的时候有着各种各样的配置,初学者很容易分不清到底各自都做了什么事情,如果对自己要求...

乒乓狂魔
2015/04/21
0
12
Spring具体事务处理器的实现

1.Spring的事务处理中,通用的事务处理流程框架是由抽象事务管理器AbstractPlatformTransactionManager来提供的,而具体的底层事务处理实现,由PlatformTransactionManager的具体实现类来实现...

紫萱格主
2015/05/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MySQL面试题集锦

什么是数据库索引?索引有哪几种类型?什么是最左前缀原则?索引算法有哪些?有什么区别? 索引是对数据库表中一列或多列的值进行排序的一种结构。一个非常恰当的比喻就是书的目录页与书的正...

老道士
25分钟前
0
0
使用 LogStash 归集日志

elastic 官网: https://www.elastic.co/ 为了便于集中查看多台主机的业务日志,使用 Filebeat, Redis, Logstash的方式进行收集: (1) Filebeat 监控日志文件的变化, 将新增部分写入redis中, 每...

ouhoo
28分钟前
0
0
java序列化(六) - protostuff序列化

添加依赖 <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.5.9</version> </de......

晨猫
35分钟前
0
0
Ceph学习笔记1-Mimic版本多节点部署

特别说明: 本方法也可以用于单节点部署,只部署一个Monitor(只是会形成单点故障而已),最低要求是使用两个分区创建2个OSD(因为默认最小副本是2);如果不需要使用CephFS,则可以不部署M...

LastRitter
37分钟前
0
0
923. 3Sum With Multiplicity - LeetCode

Question 923. 3Sum With Multiplicity Solution 题目大意: 给一个int数组A和一个目标值target,求满足下面两个条件的组合个数,其中i,j,k分别为数组的索引 i<j<k target = A[i] + A[j] + A[k...

yysue
39分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部