HBaseUtils工具 HBaseConf 备忘java

原创
2022/06/23 11:45
阅读数 166
HBaseUtils工具 HBaseConf 备忘

package com.github.zengfr.conuniframework.spider.hbase;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.context.annotation.Bean;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

//@Configuration
//@ConfigurationProperties(prefix = "hbase")
public class HBaseConf {

    private Map<String, String> config = new HashMap<>();

    public Map<String, String> getConfig() {
        return config;
    }

    public void setConfig(Map<String, String> config) {
        this.config = config;
    }

    public org.apache.hadoop.conf.Configuration configuration() {
        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
//此处可自己自定义和改造 拓展用
        //        configuration.set(HBASE_QUORUM, "81.68.xx.xx:2181");
//        configuration.set(HBASE_ROOTDIR, "/");
//        configuration.set(HBASE_ZNODE_PARENT, "/hbase");
        for (Map.Entry<String, String> map : config.entrySet()) {
            configuration.set(map.getKey(), map.getValue());
        }
        return configuration;
    }

    @Bean
    public Admin admin() {
        Admin admin = null;
        try {
            Connection connection = ConnectionFactory.createConnection(configuration());
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return admin;
    }
}

https://my.oschina.net/zengfr/

package com.github.zengfr.conuniframework.spider.hbase;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.*;

//@Service
public class HBaseUtils {

    @Autowired
    private Admin hbaseAdmin;

    /**
     * 判断表是否存在
     *
     * @param tableName 表名
     * @return true/false
     */
    public boolean isExists(String tableName) {
        boolean tableExists = false;
        try {
            TableName table = TableName.valueOf(tableName);
            tableExists = hbaseAdmin.tableExists(table);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return tableExists;
    }

    /**
     * 创建表
     *
     * @param tableName    表名
     * @param columnFamily 列族
     * @return true/false
     */
    public boolean createTable(String tableName, List<String> columnFamily) {
        return createTable(tableName, columnFamily, null);
    }

    /**
     * 预分区创建表
     *
     * @param tableName    表名
     * @param columnFamily 列族
     * @param keys         分区集合
     * @return true/false
     */
    public boolean createTable(String tableName, List<String> columnFamily, List<String> keys) {
        if (!isExists(tableName)) {
            try {
                TableName table = TableName.valueOf(tableName);
                HTableDescriptor desc = new HTableDescriptor(table);
                for (String cf : columnFamily) {
                    desc.addFamily(new HColumnDescriptor(cf));
                }
                if (keys == null) {
                    hbaseAdmin.createTable(desc);
                } else {
                    byte[][] splitKeys = getSplitKeys(keys);
                    hbaseAdmin.createTable(desc, splitKeys);
                }
                return true;
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(tableName + "is exists!!!");
            return false;
        }
        return false;
    }

    /**
     * 删除表
     *
     * @param tableName 表名
     */
    public void dropTable(String tableName) throws IOException {
        if (isExists(tableName)) {
            TableName table = TableName.valueOf(tableName);
            hbaseAdmin.disableTable(table);
            hbaseAdmin.deleteTable(table);
        }
    }

    /**
     * 插入数据(单条)
     *
     * @param tableName    表名
     * @param rowKey       rowKey
     * @param columnFamily 列族
     * @param column            * @param value             * @return true/false
     */
    public boolean putData(String tableName, String rowKey, String columnFamily, String column,
                           String value) {
        return putData(tableName, rowKey, columnFamily, Arrays.asList(column),
                Arrays.asList(value));
    }

    /**
     * 插入数据(批量)
     *
     * @param tableName    表名
     * @param rowKey       rowKey
     * @param columnFamily 列族
     * @param columns           * @param values            * @return true/false
     */
    public boolean putData(String tableName, String rowKey, String columnFamily,
                           List<String> columns, List<String> values) {
        try {
            Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            for (int i = 0; i < columns.size(); i++) {
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)),
                        Bytes.toBytes(values.get(i)));
            }
            table.put(put);
            table.close();
            return true;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 获取数据(全表数据)
     *
     * @param tableName 表名
     * @return map
     */
    public List<Map<String, String>> getData(String tableName) {
        List<Map<String, String>> list = new ArrayList<>();
        try {
            Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            ResultScanner resultScanner = table.getScanner(scan);
            for (Result result : resultScanner) {
                HashMap<String, String> map = new HashMap<>();
                //rowkey
                String row = Bytes.toString(result.getRow());
                map.put("row", row);
                for (Cell cell : result.listCells()) {
                    //列族
                    String family = Bytes.toString(cell.getFamilyArray(),
                            cell.getFamilyOffset(), cell.getFamilyLength());
                    //                    String qualifier = Bytes.toString(cell.getQualifierArray(),
                            cell.getQualifierOffset(), cell.getQualifierLength());
                    //                    String data = Bytes.toString(cell.getValueArray(),
                            cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
                list.add(map);
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }

    /**
     * 获取数据(根据传入的filter     *
     * @param tableName 表名
     * @param filter    过滤器
     * @return map
     */
    public List<Map<String, String>> getData(String tableName, Filter filter, String startRow, long maxResultSize) {
        List<Map<String, String>> list = new ArrayList<>();
        try {
            Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            // 添加过滤器
            scan.setFilter(filter);
            if (startRow != null && startRow != "")
                scan.withStartRow(Bytes.toBytes(startRow));
            if (maxResultSize > 0)
                scan.setMaxResultSize(maxResultSize);
            ResultScanner resultScanner = table.getScanner(scan);
            for (Result result : resultScanner) {
                HashMap<String, String> map = new HashMap<>();
                //rowkey
                String row = Bytes.toString(result.getRow());
                map.put("row", row);
                for (Cell cell : result.listCells()) {
                    //列族
                    String family = Bytes.toString(cell.getFamilyArray(),
                            cell.getFamilyOffset(), cell.getFamilyLength());
                    //                    String qualifier = Bytes.toString(cell.getQualifierArray(),
                            cell.getQualifierOffset(), cell.getQualifierLength());
                    //                    String data = Bytes.toString(cell.getValueArray(),
                            cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
                list.add(map);
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }

    /**
     * 获取数据(根据rowkey     *
     * @param tableName 表名
     * @param rowKey    rowKey
     * @return map
     */
    public Map<String, String> getData(String tableName, String rowKey) {
        HashMap<String, String> map = new HashMap<>();
        try {
            Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                for (Cell cell : result.listCells()) {
                    //列族
                    String family = Bytes.toString(cell.getFamilyArray(),
                            cell.getFamilyOffset(), cell.getFamilyLength());
                    //                    String qualifier = Bytes.toString(cell.getQualifierArray(),
                            cell.getQualifierOffset(), cell.getQualifierLength());
                    //                    String data = Bytes.toString(cell.getValueArray(),
                            cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return map;
    }

    /**
     * 获取数据(根据rowkey,列族,列)
     *
     * @param tableName       表名
     * @param rowKey          rowKey
     * @param columnFamily    列族
     * @param columnQualifier      * @return map
     */
    public String getData(String tableName, String rowKey, String columnFamily,
                          String columnQualifier) {
        String data = "";
        try {
            Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                Cell cell = result.listCells().get(0);
                data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
                        cell.getValueLength());
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return data;
    }

    /**
     * 删除数据(根据rowkey     *
     * @param tableName 表名
     * @param rowKey    rowKey
     */
    public void deleteData(String tableName, String rowKey) throws IOException {
        Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
        table.close();
    }

    /**
     * 删除数据(根据rowkey,列族)
     *
     * @param tableName    表名
     * @param rowKey       rowKey
     * @param columnFamily 列族
     */
    public void deleteData(String tableName, String rowKey, String columnFamily)
            throws IOException {
        Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addFamily(columnFamily.getBytes());
        table.delete(delete);
        table.close();
    }

    /**
     * 删除数据(根据rowkey,列族)
     *
     * @param tableName    表名
     * @param rowKey       rowKey
     * @param columnFamily 列族
     * @param column            */
    public void deleteData(String tableName, String rowKey, String columnFamily, String column)
            throws IOException {
        Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(columnFamily.getBytes(), column.getBytes());
        table.delete(delete);
        table.close();
    }

    /**
     * 删除数据(多行)
     *
     * @param tableName 表名
     * @param rowKeys   rowKey集合
     */
    public void deleteData(String tableName, List<String> rowKeys) throws IOException {
        Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
        List<Delete> deleteList = new ArrayList<>();
        for (String row : rowKeys) {
            Delete delete = new Delete(Bytes.toBytes(row));
            deleteList.add(delete);
        }
        table.delete(deleteList);
        table.close();
    }

    /**
     * 分区【10, 20, 30-> ( ,10] (10,20] (20,30] (30, )
     *
     * @param keys 分区集合[10, 20, 30]
     * @return byte二维数组
     */
    private byte[][] getSplitKeys(List<String> keys) {
        byte[][] splitKeys = new byte[keys.size()][];
        TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR);
        for (String key : keys) {
            rows.add(Bytes.toBytes(key));
        }
        int i = 0;
        for (byte[] row : rows) {
            splitKeys[i] = row;
            i++;
        }
        return splitKeys;
    }
}
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部