文档章节

Hbase总结(三)--使用spring-data-hadoop进行hbase的读写操作

ordiy
 ordiy
发布于 2017/03/09 19:52
字数 997
阅读 1218
收藏 5

1.新建一个maven项目引入spring-data-hadoop框架

在pom文件中添加以下依赖

<!-- hbase 的主要依赖-->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
        <!-- hbase pom start -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>1.2.0</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- hbase pom end -->

        <!-- hadoop config start -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
            <scope>compile</scope>
        </dependency>

2.配置hbase的文件

将hbase的的配置放入到pom文件的profile中,注意hbase url是使用主机名host-name进行访问的

    <profile>
        <id>develop_new</id>
        <properties>
            <!--hbase测试 -->
            <hbase-hdfs-dir>hdfs://hdfs-hbase-master:9000/hbase</hbase-hdfs-dir>
            <zookeeper-znode-parent-dir>/hbase</zookeeper-znode-parent-dir>
            <hbase-zookper-list>192.168.10.52,192.168.10.53,192.168.10.54</hbase-zookper-list>
            <hbase-hbase-zookper-port>2181</hbase-hbase-zookper-port>
            <hadoop-tmp-dir>/tmp/hbase-root</hadoop-tmp-dir>
            <config-file-name>hbase-site.xml</config-file-name>
            <log4j2-log-level>debug</log4j2-log-level>

        </properties>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
    </profile>
    </profiles>

3.配置hbase的spring文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:hdp="http://www.springframework.org/schema/hadoop"
	xmlns:p="http://www.springframework.org/schema/p"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
	http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">

	<context:property-placeholder location="hbase.properties"/>

	<!--扫描spring-data框架中的依赖-->
	<context:component-scan base-package="org.springframework.samples.hadoop.hbase"/>
	
	<hdp:configuration id="hadoopConfiguration">
		${hbase-hdfs-dir}
	</hdp:configuration>
	<!-- -->

	<hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}">
	</hdp:hbase-configuration>

	<!-- 配置hbase config bean-->
	<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
		<property name="configuration" ref="hbaseConfiguration"/>
	</bean>
	<!-- hadoop的配置文件-->
	<hdp:configuration resources="${config-file-name}"></hdp:configuration>
</beans>

4.使用实现数据查询方法

/**
 * @Description: 实现Hbase部分查询
 */
@Repository("nlocationInfoJDOImplNew")
public class LocationInfoJDOImplNew implements InitializingBean {
    private static final Logger log = LogManager.getLogger(LocationInfoJDOImplNew.class);
    @Autowired
    private HbaseTemplate hbaseTemplate;
    @Resource(name = "hbaseConfiguration")
    private Configuration config;
    private HBaseAdmin admin;

    private static final String cf_name = "lf";
    /**
     *从hbase中获取查询结果
     */
    public List<Object> queryLocDataByTable(List<String> rowkeyPrefix, Date startTime, Date endTime) {

        String tableName = "";
        String cf_name = "demo";
        byte[] cf_bytes = Bytes.toBytes(cf_name);

        tableName = "lo_20170309";
        Scan scan = new Scan();
        scan.setCaching(50);//全表扫描设置cache
        scan.addFamily(cf_name.getBytes());
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        try {
            //时间戳,过滤器
            scan.setTimeRange(startTime.getTime(), endTime.getTime());
        } catch (IOException e) {
            e.printStackTrace();
        }

        //批量增加过滤条件 ,使用行键的PrefixFilter进行过滤
        rowkeyPrefix.forEach(s -> {
            filterList.addFilter(new PrefixFilter(Bytes.toBytes(s)));
        });
        scan.setFilter(filterList);
        //java8 特性 , 可以通过该方法设定返回值来实现

        List<Object> entities = hbaseTemplate.find(tableName, scan, results -> {
            List<Object> objectList = new ArrayList<>();
            Iterator<Result> iterator = results.iterator(); //这个迭代器只能获取一次
            while (iterator.hasNext()) {
                //迭代获取数据
//                        SimpleLocationEntity temp = new SimpleLocationEntity();
                Result next = iterator.next();
                next.getFamilyMap(cf_bytes).forEach((bytes, values) -> {
                        byte[] row = next.getRow();
                        System.out.println("rowKey:" + Bytes.toString(next.getRow(), 0, 7) + Bytes.toIntUnsafe(row, 7));
                });
                objectList.add(next);
                log.debug("get one recode");
            }
            return objectList;
        });
        return entities;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Configuration configuration = config;
        admin = new HBaseAdmin(configuration);
    }
}

5.编写hbase的存储类

@Repository
public class LocationRepository  implements InitializingBean {
    private static final Logger log = LogManager.getLogger(LocationRepository.class);
    @Autowired
    private HbaseTemplate hbaseTemplate;
    @Resource(name = "hbaseConfiguration")
    private Configuration config;
    private HBaseAdmin admin;

    public static final String gps_table_basic = "lo_";
    public static byte[] CF_INFO = Bytes.toBytes("lf");
    @Override
    public void afterPropertiesSet() throws Exception {
        admin = new HBaseAdmin(config);
    }
    /**
     * 数据发送到 Hbase数据库中
     * 1.自定义业务前缀+ (一天的总毫秒数- 当前的时间在一天中的毫秒数) //得到最新数据总是在最前面的hbase表
     * 2.使用map的层级结构自动映射为hbase的列族和列名
     */
    public byte[] saveObjSimpleFun(final HashMap<String, LinkedHashMap<String, Object>> srcData, final String businessID, final String tableName) {
        LocalDateTime now = LocalDateTime.now();
        byte[] execute = hbaseTemplate.execute(tableName, table -> {
            byte[] add = Bytes.add(Bytes.toBytes(businessID), Bytes.toBytes(Integer.MAX_VALUE - now.get(ChronoField.MILLI_OF_DAY)));
            Put putObj = new Put(add);
            srcData.forEach((cf_name, file_map)
                    -> file_map.forEach((filed_key, filed_value) -> {
                byte[] family = CF_INFO;
                log.debug("convert filed code is {}", filed_key);
                if (filed_value !=null ){

                    byte[] value = toBytes(filed_value);
                    if (value.length >0) {
                        putObj.addColumn(family, Bytes.toBytes(filed_key), value);
                    }
                }
            }));
            table.put(putObj);
            return add;
        });
        log.debug("end of save data to hbase .....");
        return execute;
    }

    public byte[] toBytes(Object obj){
        byte[] result = new byte[]{};
        if (obj!=null){
            if (obj instanceof String) {
                result = Bytes.toBytes((String)obj);
            }else if (obj instanceof Integer){
                result = Bytes.toBytes((Integer)obj);
            }else if(obj instanceof Date){
                result = Bytes.toBytes(obj.toString());
            }else if (obj instanceof Short){
                result = Bytes.toBytes((Short)obj);
            }else if (obj instanceof Long){
                result = Bytes.toBytes((Long)obj);
            }else if (obj instanceof Double){
                result = Bytes.toBytes((Double) obj);
            }else if (obj instanceof Float){
                result = Bytes.toBytes((Float) obj);
            }else if (obj instanceof Boolean){
                result = Bytes.toBytes((Boolean) obj);
            }else if (obj instanceof BigDecimal){
                result = Bytes.toBytes((BigDecimal)obj);
            }else if (obj instanceof ByteBuffer){
                result = Bytes.toBytes((ByteBuffer) obj);
            }else if (obj instanceof ArrayList){
                result = convertArrayList(obj);
            } else {
                System.out.println("invalid Data Type....." + obj.toString());
            }
        }
        return result;
    }

    private byte[] convertArrayList(Object object){
        ArrayList<?> arrayList = (ArrayList) object;
        final byte[] result = new byte[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            Object o = arrayList.get(i);
            if (o instanceof Integer) {
                result[i] = (byte) (int) o;
            }else {
                System.out.println("error convertArrayList");
            }
        }
        return result;
    }
}

6.项目源代码地址 spring-hbase-demo

© 著作权归作者所有

ordiy
粉丝 2
博文 23
码字总数 8614
作品 0
深圳
程序员
私信 提问
hive 与 hbase 结合

一、hive与hbase的结合 Hive会经常和Hbase结合使用,把Hbase作为Hive的存储路径,所以Hive整合Hbase尤其重要。使用Hive读取Hbase中的数据,可以使用HQL语句在HBase表上进行查询、插入操作;甚...

meteor_hy
2018/06/26
0
0
HBase原理之HBase MetaStore&Compaction剖析

1.概述 客户端读写数据是先从HBase Clienr获取RegionServer的元数据信息,比如Region地址信息。在执行数据写操作时,HBase会先写MetaStore,为什么会写到MetaStore。本篇文章将为读者剖析HBa...

HBase技术社区
2018/09/23
0
0
区分 hdfs hbase hive hbase适用场景

Hive 不想用程序语言开发MapReduce的朋友比如DB们,熟悉SQL的朋友可以使用Hive开离线的进行数据处理与分析工作。 注意Hive现在适合在离线下进行数据的操作,就是说不适合在挂在真实的生产环境...

八戒_o
2015/11/19
1K
0
Hbase高可用(HA)安装笔记

这是在前一篇Hadoop 3.1.1高可用(HA)集群安装笔记 基础上,进行的操作. 安装HBase vim conf/hbase-site.xml vim conf/regionservers nn01启动 访问web页面 master regionServer http://maste...

张shieppp
2018/09/28
257
0
一文让您全面了解清楚HBase数据库的所有知识点,值得收藏!

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一、HBase基本概念:列式数据...

李金泽
2018/03/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

分布式协调服务zookeeper

ps.本文为《从Paxos到Zookeeper 分布式一致性原理与实践》笔记之一 ZooKeeper ZooKeeper曾是Apache Hadoop的一个子项目,是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它...

ls_cherish
今天
4
0
redis 学习2

网站 启动 服务端 启动redis 服务端 在redis 安装目录下 src 里面 ./redis-server & 可以指定 配置文件或者端口 客户端 在 redis 的安装目录里面的 src 里面 ./redis-cli 可以指定 指定 连接...

之渊
昨天
2
0
Spring boot 静态资源访问

0. 两个配置 spring.mvc.static-path-patternspring.resources.static-locations 1. application中需要先行的两个配置项 1.1 spring.mvc.static-path-pattern 这个配置项是告诉springboo......

moon888
昨天
4
0
hash slot(虚拟桶)

在分布式集群中,如何保证相同请求落到相同的机器上,并且后面的集群机器可以尽可能的均分请求,并且当扩容或down机的情况下能对原有集群影响最小。 round robin算法:是把数据mod后直接映射...

李朝强
昨天
4
0
Kafka 原理和实战

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/bV8AhqAjQp4a_iXRfobkCQ 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。先后从事过电子商务、开放平...

vivo互联网技术
昨天
24
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部