HBase 架构和 Java Api

原创
2016/11/24 17:36
阅读数 1.2K

HBase 架构

HBase是Hadoop的数据库,能够对大数据提供随机、实时读写访问。他是开源的,分布式的,多版本的,面向列的,存储模型。

在讲解的时候我首先给大家讲解一下HBase的整体结构,如下图

HBase Master是服务器负责管理所有的HRegion服务器,HBase Master并不存储HBase服务器的任何数据,HBase逻辑上的表可能会划分为多个HRegion,然后存储在HRegion Server群中,HBase Master Server中存储的是从数据到HRegion Server的映射。

一台机器只能运行一个HRegion服务器,数据的操作会记录在Hlog中,在读取数据时候,HRegion会先访问Hmemcache缓存,如果 缓存中没有数据才回到Hstore中上找,每一个列都会有一个Hstore集合,每个Hstore集合包含了很多具体的HstoreFile文件,这些文件是B树结构的,方便快速读取。

 

HBase数据物理视图如下:

Ø  Timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version numberØ  Row Key: 行键,Table的主键,Table中的记录按照Row Key排序

Ø  Column Family:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。

Java来操作HBase数据

如果hbase shell已经设置在环境变量中的话,可以直接输入以下命令进入
 hbase shell
0)查看基本信息
 version  
 status  
 whoami
表的管理

1)查看有哪些表  
list  

2)查结构     describe 'my_test'
查数据内容:  scan 'my_test',{LIMIT => 5}

获取某个key下所有的列簇:列:

get 'test_USERPERHOURINFO_SPARK','20170228#20170228112348'

ps: "LIMIT" 大写

封装一个Java 操作HBase 1.1.7 的Api,代码如下:

/**
 * Created by eric on 2016/11/24
 * 基于 HBase 1.1.7版本 HBase集群 的 java api 封装
 */

package com.bw.util;

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.*;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.compress.Compression;
    import org.apache.hadoop.hbase.util.Bytes;

class HBaseUtil {
    private static Configuration conf = null;
    /**
     * 初始化配置 静态初始化块
     */
    static {
        InputStream propInStream = null;
        try {
            propInStream = new FileInputStream(new File("./src/main/resources/config/config.properties"));
        } catch (FileNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        Properties prop = new Properties();
        //load(InputStream inStream)方法从.properties属性文件对应的文件输入流中,加载属性列表到Properties类对象
        try{
            prop.load(propInStream);
        }catch(IOException e){
            System.err.println(e.getMessage());
        }finally{
            try {
                propInStream.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //Hbase读取配置文件中的内容
        //prop.getProperty方法是分别是获取属性信息。
        Configuration HBASE_CONFIG = new Configuration();
        HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",prop.getProperty("hbase.zookeeper.property.clientPort"));
        HBASE_CONFIG.set("hbase.zookeeper.quorum",prop.getProperty("hbase.zookeeper.quorum"));
        HBASE_CONFIG.set("hbase.master.port",prop.getProperty("hbase.master.port"));
        HBASE_CONFIG.set("zookeeper.znode.parent",prop.getProperty("hbase.zookeeper.znode.parent"));
        conf = HBaseConfiguration.create(HBASE_CONFIG);
    }


    /**
     * 创建数据库表
     */

    public static void createTable(String tableName, String[] columnFamilys) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        try {
            if (hAdmin.tableExists(tableName)) {
                System.out.println(tableName + "表已存在");
                conn.close();
                System.exit(0);
            } else {
                // 新建一个表描述
                HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
                // 在表描述里添加列族
                for (String columnFamily : columnFamilys) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                    hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);//设置压缩类型
                    tableDesc.addFamily(hColumnDescriptor);
                }
                // 根据配置好的表描述建表
                hAdmin.createTable(tableDesc);
                System.out.println("创建" + tableName + "表成功");
            }
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 插入或者更新单条数据
     * @param tableName 表名
     * @param rowKey 行键
     * @param family 列簇
     * @param qualifier 限定符(列键名)
     * @param value 列键值
     * @return true: 插入成功; false: 插入失败
     * @throws IOException
     */
    public static boolean saveData(String tableName, String rowKey, String family, String qualifier,
                                   String value) throws IOException{
        try {
            Connection conn = ConnectionFactory.createConnection(conf);
            HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));//参数是 行键

            put.addColumn(
                    Bytes.toBytes(family),
                    Bytes.toBytes(qualifier),
                    Bytes.toBytes(value)
            );

            hTable.put(put);

            hTable.close();
            conn.close();
            return true;
        } catch (IOException e){
            e.printStackTrace();
        }
        return  false;
    }


    /**
     * 读取一个限定符的值
     * @param tableName
     * @param rowKey
     * @param family
     * @param qualifier
     * @return
     * @throws IOException
     */
    public static String getValueByQualifier(String tableName, String rowKey, String family, String qualifier)
    throws IOException{
       try {
           Connection conn = ConnectionFactory.createConnection(conf);
           HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
           Get get = new Get(Bytes.toBytes(rowKey));
           get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
           Result res = hTable.get(get);

           hTable.close();
           conn.close();

           return Bytes.toString(CellUtil.cloneValue(res.listCells().get(0)));
       } catch (IOException e) {
           e.printStackTrace();
       }

        return null;
    }

    /**
     * 获取一个列簇的一个限定符的值
     * @param tableName
     * @param rowKey
     * @param family
     * @return
     * @throws IOException
     */
    public static Map<String, String> getValueByFamily (String tableName, String rowKey, String family)
            throws IOException{
        Map<String, String> result = null;
       try {
           Connection conn = ConnectionFactory.createConnection(conf);
           HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
           Get get = new Get(Bytes.toBytes(rowKey));
           get.addFamily(Bytes.toBytes(family));

           Result res = hTable.get(get);
           List<Cell> cs = res.listCells();
           result = cs.size() > 0 ? new HashMap<String, String>() : result;

           for (Cell cell : cs) {
               result.put(
                       Bytes.toString(CellUtil.cloneFamily(cell)),
                       Bytes.toString(CellUtil.cloneValue(cell))
               );
           }
           hTable.close();
           conn.close();
       } catch (IOException e) {
            e.printStackTrace();
       }

        return result;
    }

    /**
     *获取一个列簇的所有限定符的值
     * @param tableName
     * @param rowKey
     * @return
     * @throws IOException
     */
    public static Map<String, Map<String, String>> getValueByFamilyAll(String tableName, String rowKey)
            throws IOException{
        Map<String, Map<String, String>> results = null ;

        try {
            Connection conn = ConnectionFactory.createConnection(conf);
            HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));

            Result res = hTable.get(get);
            List<Cell> cs = res.listCells();
            results = cs.size() > 0 ? new HashMap<String, Map<String, String>> () : results;

            for (Cell cell : cs) {
                String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
                if (results.get(familyName) == null)
                {
                    results.put(familyName, new HashMap<String,  String> ());
                }
                results.get(familyName).put(
                        Bytes.toString(CellUtil.cloneQualifier(cell)),
                        Bytes.toString(CellUtil.cloneValue(cell))
                );
            }

            hTable.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return results;
    }

    /**
     * 删除一个限定符
     * @param tableName
     * @param rowKey
     * @param family
     * @param qualifier
     * @return
     */
    public static boolean del(String tableName, String rowKey, String family, String qualifier) {
        try {
            Connection conn = ConnectionFactory.createConnection(conf);
            HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
            Delete del = new Delete(Bytes.toBytes(rowKey));

            if (qualifier != null) {
                del.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            } else if (family != null) {
                del.addFamily(Bytes.toBytes(family));
            }
            hTable.delete(del);
            hTable.close();
            conn.close();

            return true;
        } catch (IOException e) {
            e.printStackTrace();
        }

        return false;
    }

    /**
     * 删除一行
     * @param tableName
     * @param rowKey
     * @return
     */
    public static boolean delRowKey(String tableName, String rowKey) {
        return del(tableName, rowKey, null, null);
    }

    /**
     * 删除一行下的一个列簇
     * @param tableName
     * @param rowKey
     * @param family
     * @return
     */
    public static boolean delFamily(String tableName, String rowKey, String family) {
        return del(tableName, rowKey, family, null);
    }

    /**
     * 通过rowkey获取一条数据
     */
    public static void getRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过rowkey创建一个get对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 输出结果
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println(
                    "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                            "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
                            "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
                            "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                            "时间戳:" + cell.getTimestamp());
        }
        // 关闭资源
        table.close();
        conn.close();
    }

    /**
     * 全表扫描
     */
    public static void scanTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 扫描全表输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {
            for (Cell cell : result.rawCells()) {
                System.out.println(
                        "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                                "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
                                "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
                                "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                                "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
        results.close();
        table.close();
        conn.close();
    }

    /**
     * 删除多条数据
     */
    public static void delRows(String tableName, String[] rows) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除多条数据
        List<Delete> list = new ArrayList<Delete>();
        for (String row : rows) {
            Delete delete = new Delete(Bytes.toBytes(row));
            list.add(delete);
        }
        table.delete(list);
        // 关闭资源
        table.close();
        conn.close();
    }

    /**
     * 删除数据库表
     */
    public static boolean deleteTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {
            // 失效表
            hAdmin.disableTable(tableName);
            // 删除表
            hAdmin.deleteTable(tableName);
            conn.close();
            return true;
        } else {
            conn.close();
            return false;
        }
    }

}

 

展开阅读全文
打赏
1
52 收藏
分享
加载中
更多评论
打赏
0 评论
52 收藏
1
分享
返回顶部
顶部