spring-data管理HbaseTemplate connection 坑之源码分析
spring-data管理HbaseTemplate connection 坑之源码分析
DanierWei 发表于10个月前
spring-data管理HbaseTemplate connection 坑之源码分析
  • 发表于 10个月前
  • 阅读 166
  • 收藏 0
  • 点赞 0
  • 评论 0

移动开发云端新模式探索实践 >>>   

一、背景:

  • 使用spring xml配置管理初始化HbaseTemplate,并使用HbaseTemplate API时存在bug,每次查询都会建立connection,这样会存在并发查询的时候大量时间浪费在connection建立上。

  • spring-data-hadoop版本为2.2.0.RELEASE,测试对比过 ConnectionFactory创建并使connection全局共享通过spring管理的方式,对应如下图1、图2对比,监控的指标分别为: 查询耗时、应用GC、机器与HBase服务网络情况。其中应用GC、机器与HBase服务网络情况都比较良好。查询耗时通过使用spring管理的connection查询非常不稳地。本文后面会从源码进行解析坑所在。同时对比了2.5.0.RELEASE源码都有类似的问题。2.3.0.RELEASE、2.4.0.RELEASE估计也没差~ 图1
    图2

  • 查询方式使用HBase原生api的方式,rowkey范围查询,满足条件的数据条数为29条,表大小2KW+。

    @Autowired
    private HbaseTemplate hbaseTemplate;
    
    Scan scan = new Scan();
    scan.setStartRow(Bytes.toBytes(filterStartRowKey));
    scan.setStopRow(Bytes.toBytes(filterEndRowKey));
    List<Map<String, Object>> result = hbaseTemplate.find(tableName, scan, (resultSet, i) -> {
           List<Cell> cells = resultSet.listCells();
           Map<String, Object> valueByCloumn = cells.parallelStream()
                .collect(Collectors.toMap(cell -> Bytes.toString(CellUtil.cloneQualifier(cell)),cell -> Bytes.toString(CellUtil.cloneValue(cell))));
            return valueByCloumn;
});

二、分析:

spring管理接入步骤

  • 引入jar包
<dependency>
	<groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>2.5.0.RELEASE</version>
    <exclusions>
    	<exclusion>
        	<groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </exclusion>
        <exclusion>
        	<groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
        	<groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </exclusion>
   </exclusions>
</dependency>
<dependency>
	<groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.1.2</version>
    	<exclusions>
        	<exclusion>
            	<artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
       </exclusions>
</dependency>
  • xml配置
<hdp:hbase-configuration id="hbaseConfigurationConf"
                         zk-quorum="xxxx"
                         zk-port="xxx"/>
<bean id="hTableFactory" class="org.apache.hadoop.hbase.client.HTableFactory"/>
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate" lazy-init="true">
    <property name="configuration" ref="hbaseConfigurationConf"/>
    <property name="tableFactory" ref="hTableFactory"/>
</bean>
  • 跟踪代码 hbaseTemplate.find(....),最终执行的方法为*HbaseTemplate的execute(...)*方法,获取connection的关键代码在HTableInterface table = this.getTable(tableName),两个奇葩点既引起bug的原因如下:</br>

    1、首先会判断传入的table是否已存在HTableInterface对象,通过HbaseSynchronizationManager类管理着这些全局的HTableInterface按照tableName分组,但是发现如果需要HbaseSynchronizationManager管理,需要调用他的bindResource方法,同时这个方法调用需要配置AOP,类似spring管理mysql事物配置,拦截器类为HbaseInterceptor,此类的作用为自动将HBase表绑定给当前线程,也就是说,每个在HBase上执行的DAO操作的类都会被HbaseInterceptor包装,因此一旦发现有在使用的表都将被绑定给当前线程,之后再使用这张表时就无需再初始化表了(理论上达到了HBase连接池的效果),调用结束后,表将被自己关闭。但是拦截器中的初始化table的时候,调用的方式还是this.getTable(tableName)逻辑一样

    2、进入下一步,有判断tableFactory是否为空,此对象为spring的xml注入的实例,此处见如上的配置始终是不为空,最终会调用createHTableInterface创建HTableInterface*, 继续跟踪其源码发现跟不注入tableFactory对应是一样的,始终会new HTable(....),HTable初始化的时候会创建connection,当查询完毕之后会触发table.close,也就将connection关闭了,下次在进入查询,会创建新的connection。*

    3、结合以上两点,无论是AOP方式管理连接池,还是初始化HbaseTemplate的方式,都会触发new Ttable(.....),当table.close()时,Table对像中的Connection都会被关闭。即spring data管理的HBase连接池并没达到想要的效果。

关键代码:
1、判断hasResource 有返回没有反之获取新的HTable实例
public static HTableInterface getHTable(String tableName, Configuration configuration, Charset charset, HTableInterfaceFactory tableFactory) {
        if(HbaseSynchronizationManager.hasResource(tableName)) {
            return (HTable)HbaseSynchronizationManager.getResource(tableName);
        } else {
            Object t = null;
            try {
                if(tableFactory != null) {
                    t = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset));
                } else {
                    t = new HTable(configuration, tableName.getBytes(charset));
                }
                return (HTableInterface)t;
            } catch (Exception var6) {
                throw convertHbaseException(var6);
            }
        }
}
//
//
2、HTableFactory.createHTableInterface  
public class HTableFactory implements HTableInterfaceFactory {
    public HTableFactory() {
    }
    public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
        try {
            return new HTable(config, TableName.valueOf(tableName));
        } catch (IOException var4) {
            throw new RuntimeException(var4);
        }
    }
    public void releaseHTableInterface(HTableInterface table) throws IOException {
        table.close();
    }
}
//
//
3、new HTable(...)
public HTable(Configuration conf, TableName tableName) throws IOException {
        this.autoFlush = true;
        this.closed = false;
        this.defaultConsistency = Consistency.STRONG;
        this.tableName = tableName;
        this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
        if(conf == null) {
            this.connection = null;
        } else {
            this.connection = ConnectionManager.getConnectionInternal(conf);
            this.configuration = conf;
            this.pool = getDefaultExecutor(conf);
            this.finishSetup();
        }
    }
//
//
4、table.close
public void close() throws IOException {
        if(!this.closed) {
            .....

            if(this.cleanupConnectionOnClose && this.connection != null) {
                this.connection.close();
            }

            this.closed = true;
        }
    }

三、解决:

  • 通过以上代码分析,spring-data管理HbaseTemplate有坑,解决办法:
    1、自己实现一个tableFactory,注入进去,功能为实现管理全局的connection保证不重复创建connection。

    2、简单的直接使用ConnectionFactory方式创建,同时也测试对比过,速度比较键定,见上面截图

public class HbaseUtils { private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); private static Connection connection;

static {
    Configuration configuration = HBaseConfiguration.create();
    configuration.set("hbase.zookeeper.quorum", "xxxxxx");
    configuration.set("zookeeper.znode.parent", "/hbase");
    try {
        connection = ConnectionFactory.createConnection(configuration);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public static List<Map<String, String>> scan(String tableName, Scan scan) {
    List<Map<String, String>> scanResult = Lists.newArrayList();
    if (connection == null) {
        logger.warn("scan error the connection is null");
        return scanResult;
    }

    Table table = null;
    ResultScanner rs = null;
    try {
        table = connection.getTable(TableName.valueOf(tableName));
        rs = table.getScanner(scan);
        Map<String, String> cellValueItem;
        for (Result r : rs) {
            cellValueItem = r.listCells()
                    .parallelStream()
                    .collect(Collectors.toMap(key -> Bytes.toString(CellUtil.cloneQualifier(key)),
                            value -> Bytes.toString(CellUtil.cloneValue(value))));
            scanResult.add(cellValueItem);
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (rs != null) {
            rs.close();
        }
        if (table != null) {
            try {
                //关闭的逻辑在这个方法里边,但是里边有一个变量cleanupConnectionOnClose控制了关闭与否,
                //这个变量初始化在new Table(....)里。当使用spring管理时,它始终为true,所以connection会被关闭。
                //当非spring管理的时候,是这么获取的,connection.getTable(TableName.valueOf(tableName));  
                //跟踪代码发现cleanupConnectionOnClose始终为false, 
                //并且HTable中的connection属性是通过参数赋值,
                //即为HbaseUtils初始化的connection对象。所以关键在于HTableInterface的创建方式。
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return scanResult;
    }
}

}

[spring-hadoop](https://github.com/spring-projects/spring-hadoop/blob/master/spring-hadoop-hbase/src/main/java/org/springframework/data/hadoop/hbase/HbaseTemplate.java)
标签: Spring Data HBase
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 1
博文 9
码字总数 5765
×
DanierWei
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: