文档章节

hbase使用

政委007
 政委007
发布于 2017/05/15 11:59
字数 1748
阅读 513
收藏 30

[toc]

Hbase 常用shell操作

list

     #查看default中的表
     list
     # 查看命名空间
     list_namespace
     #查看命名空间表
     list_namespace_tables 'hbase'

create

    #创建表
    create 't5','cf1'
    #创建命名空间
    create_namespace 'beh'
    #创建命名空间表
    create 'beh:t5','cf'
    #创建分区表
    create 't1','f1', SPLITS => ['10','20','30']
    create 't1','f1',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

put

    # 向表中插入数据
    put 'beh:t1','r1','cf1:c1','value1'
    # 向命名空间的表中插入数据
    put 'beh:t1','r2','cf1:c2','value2'
    # 指定时间戳
    put 'beh:t1','r3','cf1:c2','value2',1234567
    #指定额外属性 
    put 'beh:t1','r100','cf:c2','value2',1234567 , {ATTRIBUTES=>{'mykey'=>'myvalue'}}

get

    # 获取一行数据
    get 't1','r1'
    # 只获取cf2 列簇的数据
    get 't1','r1','cf2'
    # 只获取c1,c2两列数据
    get 't1','r1',['cf2:c1','cf2:c2']
    #限定时间戳
    get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => 123456}
    #限定版本
    get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => 12345, VERSIONS => 4}

scan

    # 扫描表
    scan 't1'
    # 扫描列
    scan 't1',{COLUMN => ['cf1:c1','cf2:c2']}
    # limit 10
    scan 't1',{LIMIT => 10}
    # 设置开始位置
    scan 't1',{STARTROW => 'r20'}
    # 添加过滤器
    scan 't1', FILTER=>"ColumnPrefixFilter('c2') AND ValueFilter(=,'substring:8')"

drop 和delete

    #删除数据
    deleteall 't1','r1'
    #删除一列数据
    delete 't1','r2','cf1,c1'
    # 删除表
    disable 't1'
    drop 't1'
    #清空表
    truncate 't1'

hbase起停

 #启动
 start-hbase.sh
 #启动单个服务
 hbase-daemon.sh start master
 hbase-daemon.sh start regionserver

hbase常用java api

package com.bonc.hbase;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

@SuppressWarnings("deprecation")
public class ApiSample {
	private static HBaseAdmin admin;
	private static HTable table;
	private static byte[] tableName = Bytes.toBytes("a1");
	private static byte[] cf1 = Bytes.toBytes("cf1");
	private static byte[] cf2 = Bytes.toBytes("cf2");
	Configuration conf = HBaseConfiguration.create();
	{
		try {
			admin = new HBaseAdmin(conf);
		} catch (Exception e) {
		}
	}

	public static void main(String[] args) throws Exception {
		ApiSample sample = new ApiSample();
		// sample.listTable();
		// sample.listNameSpace();
		// sample.listTableWithNamespace();
		// // sample.createTable();
		// // sample.createTableWithSplit();
		// sample.editVersion();
		// sample.putData();
		// sample.getData();
		// sample.scanData();
		sample.scanDataWithFilter();
		sample.deleteData();
		sample.dropTable();
	}

	// list
	public void listTable() throws Exception {
		System.out.println("---------table------------------");
		TableName[] ts = admin.listTableNames();
		for (TableName tb : ts) {
			System.out.println(tb.getNameWithNamespaceInclAsString());
		}
	}

	// list namespace
	public void listNameSpace() throws Exception {
		System.out.println("---------namespace------------------");
		NamespaceDescriptor[] nss = admin.listNamespaceDescriptors();
		for (NamespaceDescriptor ns : nss) {
			System.out.println(ns.getName());
		}
	}

	// 获取 beh 命名空间下的表
	// list
	public void listTableWithNamespace() throws Exception {
		System.out.println("---------table on beh------------------");
		TableName[] ts = admin.listTableNamesByNamespace("beh");
		for (TableName tb : ts) {
			System.out.println(tb.getNameWithNamespaceInclAsString());
		}
	}

	// 创建表 a1 列簇: cf1,cf2
	public void createTable() throws Exception {
		System.out.println("---------create table------------------");
		HTableDescriptor t = new HTableDescriptor(tableName);
		t.addFamily(new HColumnDescriptor(cf1));
		t.addFamily(new HColumnDescriptor(cf2));
		admin.createTable(t);

	}

	// 创建预分区表a1 列簇: cf1,cf2 开始row 1 结束row 9 分10个分区
	public void createTableWithSplit() throws Exception {
		System.out.println("---------create table with 10 regin------------------");
		HTableDescriptor t = new HTableDescriptor(Bytes.toBytes("a2"));
		t.addFamily(new HColumnDescriptor(cf1));
		t.addFamily(new HColumnDescriptor(cf2));
		admin.createTable(t, Bytes.toBytes(0), Bytes.toBytes(9), 10);

	}

	// put数据
	public void putData() throws Exception {
		System.out.println("---------put data to a1------------------");
		table = new HTable(conf, tableName);
		Put p1 = new Put(Bytes.toBytes("r1"));
		p1.add(cf1, Bytes.toBytes("c1"), Bytes.toBytes("v1"));
		p1.add(cf2, Bytes.toBytes("c2"), Bytes.toBytes("v666"));
		p1.add(cf1, Bytes.toBytes("c3"), Bytes.toBytes("v3"));
		Put p2 = new Put(Bytes.toBytes("r3"));
		p2.add(cf1, Bytes.toBytes("c1"), Bytes.toBytes("v4"));
		p2.add(cf2, Bytes.toBytes("c2"), Bytes.toBytes("v5"));
		p2.add(cf1, Bytes.toBytes("c3"), Bytes.toBytes("v6"));
		table.put(p1);
		table.put(p2);
		table.close();
	}

	// 修改a1存储多版本
	public void editVersion() throws Exception {
		System.out.println("---------修改表存储多版本------------------");
		HColumnDescriptor cf = new HColumnDescriptor(cf2);
		cf.setMaxVersions(3);
		admin.modifyColumn(tableName, cf);

	}

	// get数据
	public void getData() throws Exception {
		System.out.println("---------get data from a1------------------");
		table = new HTable(conf, tableName);
		Get get = new Get(Bytes.toBytes("r1"));

		printResult(table.get(get));
		System.out.println("---------get r1 only cf2 family-----------------");
		get.addFamily(cf2);
		printResult(table.get(get));
		System.out.println("----------get with max version----------------");
		get.setMaxVersions();
		printResult(table.get(get));

		table.close();
	}

	// scan
	public void scanData() throws Exception {
		System.out.println("---------scan data from a1------------------");
		table = new HTable(conf, tableName);
		Scan scan = new Scan();
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data from a1 start with row key r2------------------");
		scan.setStartRow(Bytes.toBytes("r2"));
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data from a1 start with row key r2 only get cf1 :c1 data-----------------");
		scan.addColumn(cf1, Bytes.toBytes("c1"));
		printResultScan(table.getScanner(scan));
		table.close();
	}

	// scan
	/**
	 * row过滤 过滤设置scan范围 设置setRowPrefixFilter 跟设置开始结束一样 使用rowFilter family
	 * addFalimy() FamilyFilter 列过滤 addColumn() ColumnPrefixFilter
	 * ColumnPaginationFilter 值过滤 valueFilter SingleColumnValueFiler
	 * SingleColumnValueExcludeFilter
	 * 
	 * @throws Exception
	 */
	public void scanDataWithFilter() throws Exception {
		System.out.println("---------scan data with row filter from a1------------------");
		table = new HTable(conf, tableName);
		Scan scan = new Scan();
		Filter fiter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("r1")));

		scan.setFilter(fiter);
		printResultScan(table.getScanner(scan));

		System.out.println("---------scan data with add family from a1------------------");
		// scan.addFamily(cf1);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with cf Filter filter from a1------------------");
		scan.setFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("cf1"))));
		printResultScan(table.getScanner(scan));

		System.out.println("---------scan data with column prefix filter from a1------------------");
		FilterList list = new FilterList(Operator.MUST_PASS_ALL);
		list.addFilter(new ColumnPrefixFilter(Bytes.toBytes("c")));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));

		System.out.println("---------scan data with column pagination filter from a1------------------");
		// 对行进行分页,获取一行的几列,从哪开始
		list.addFilter(new ColumnPaginationFilter(2, 2));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with value  filter from a1------------------");
		list.addFilter(new ValueFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("v5"))));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with column  value  filter from a1------------------");
		list.addFilter(new SingleColumnValueFilter(cf1, Bytes.toBytes("c2"), CompareOp.GREATER, Bytes.toBytes("v5")));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with column  value include filter from a1------------------");
		list.addFilter(
				new SingleColumnValueExcludeFilter(cf1, Bytes.toBytes("c2"), CompareOp.GREATER, Bytes.toBytes("v5")));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		table.close();
	}

	public void deleteData() throws Exception {
		System.out.println("---------delete data from a1------------------");
		table = new HTable(conf, tableName);
		Delete delete = new Delete(Bytes.toBytes("r1"));
		table.delete(delete);
		System.out.println("---------get  r1 from a1------------------");
		Get get = new Get(Bytes.toBytes("r1"));
		printResult(table.get(get));
		table.close();
	}

	public void dropTable() throws Exception {
		System.out.println("---------delete table a1------------------");
		admin.disableTable(tableName);
		admin.deleteTable(tableName);
		admin.disableTable(Bytes.toBytes("a2"));
		admin.deleteTable(Bytes.toBytes("a2"));
		admin.close();
	}

	private void printResultScan(ResultScanner rs) throws Exception {
		Result r = null;
		while ((r = rs.next()) != null) {
			printResult(r);
		}
		rs.close();
	}

	private void printResult(Result rs) {
		if (!rs.isEmpty()) {
			for (KeyValue kv : rs.list()) {
				System.out.println(String.format("row:%s\t family:%s\t qualifier:%s\t value:%s\t timestamp:%s",
						Bytes.toString(kv.getRow()), Bytes.toString(kv.getFamily()), Bytes.toString(kv.getQualifier()),
						Bytes.toString(kv.getValue()), kv.getTimestamp()));
			}
		}
	}
}

hbase 常见的二级索引

Coprocessor

思路

利用coprocessor可以在入库,查询前后的一些操作实现同时对所用表的入库,删除等维护索引表

实现

package com.bonc.hbase.coprocessor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;

public class SecondaryIndexByComprocessorForGlobal extends BaseMasterAndRegionObserver {

	private static String INDEX_TABLE_NAME = "index.table.name";
	private static String INDEX_FAMILY_NAME = "index.family.name";
	private static String TABLE_CELL = "table.cell";
	private byte[] index_name;
	private byte[] index_family;
	private byte[] main_family;
	private byte[] main_qualifier;
	private Configuration conf;
	private Connection conn;

	/**
	 * 需要参数: 索引表名称,索引表列簇名称,主表需要索引列
	 */
	@Override
	public void start(CoprocessorEnvironment ctx) throws IOException {

		conf = ctx.getConfiguration();
		index_name = Bytes.toBytes(conf.get(INDEX_TABLE_NAME));
		index_family = Bytes.toBytes(conf.get(INDEX_FAMILY_NAME));
		String cell = conf.get(TABLE_CELL);
		if (cell != null) {
			String[] s = cell.split(":");
			main_family = Bytes.toBytes(s[0]);
			main_qualifier = Bytes.toBytes(s[1]);
		}
		System.out.println("添加协处理器,建立索引的表:" + Bytes.toString(index_name) + " ,被索引的列 : " + cell);
		super.start(ctx);
	}

	private Connection getConnection() {
		if (conn == null || conn.isClosed()) {
			try {
				conn = ConnectionFactory.createConnection(conf);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return conn;
	}

	private Table getTable() throws IllegalArgumentException, IOException {
		return getConnection().getTable(TableName.valueOf(index_name));
	}

	@Override
	// 定义创建表之前的动作,因为表创建表之前无法确定有那些列,索引没办法同步创建索引表
	public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, HTableDescriptor desc,
			HRegionInfo[] regions) throws IOException {
		// TODO Auto-generated method stub
		super.preCreateTable(ctx, desc, regions);
	}

	@Override
	// put数据之后执行,但是很难保证事务性
	public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
			throws IOException {
		// put
		List<Cell> list = put.get(main_family, main_qualifier);
		if (list != null && !list.isEmpty()) {
			for (Cell cell : list) {
				putDataToIndex(cell);
			}
		}
		super.prePut(e, put, edit, durability);
	}

	@Override
	// 主表删除数据后,索引表删除数据
	public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
			Durability durability) throws IOException {
		byte[] row = delete.getRow();
		Result result = e.getEnvironment().getRegion().get(new Get(row));
		KeyValue kv = result.getColumnLatest(main_family, main_qualifier);
		byte[] value = kv.getValue();
		deleteDataFromIndex(Bytes.add(Bytes.toBytes(MD5Hash.getMD5AsHex(value).substring(0, 8)), value, row));
		super.postDelete(e, delete, edit, durability);
	}

	@Override
	// 删除表时同时删除索引表
	public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName)
			throws IOException {
		Admin admin = getConnection().getAdmin();
		admin.deleteTable(TableName.valueOf(INDEX_TABLE_NAME));
		super.postDeleteTable(ctx, tableName);
	}

	private void deleteDataFromIndex(byte[] row) {
		Delete d = new Delete(row);
		try {
			Table table = getTable();
			table.delete(d);
			table.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}

	private void putDataToIndex(Cell cell) throws IOException {
		byte[] row = cell.getRow();
		byte[] value = cell.getValue();
		Put put = new Put(Bytes.add(Bytes.toBytes(MD5Hash.getMD5AsHex(value).substring(0, 8)), value, row));
		put.add(index_family, Bytes.toBytes("c"), row);
		Table table = getTable();
		table.put(put);
		table.close();
	}
}

加载协处理器

# 创建索引表
create 'beh:b1','cf'
create 'beh:b1_index','cf'
#加载协处理器
alter 'beh:b1', METHOD=>'table_att','coprocessor'=>'/Users/zwf/hbase-sample.jar|com.bonc.hbase.coprocessor.SecondaryIndexByComprocessorForGlobal|1001|index.table.name=beh:b1_index,index.family.name=cf,table.cell=cf:c1'

测试

© 著作权归作者所有

政委007
粉丝 10
博文 15
码字总数 15843
作品 0
洛阳
程序员
私信 提问
加载中

评论(3)

织梦之魂
织梦之魂
感谢分享
C
Code_GK
博主666
改着名儿玩
改着名儿玩
Sqoop数据导入到HBase遇上的问题及解决方法

  【IT168 技术】运行bin/sqoop import --connect jdbc:mysql://ip:port/database --username * --password **--hbase-bulkload --hbase-create-table --column-family info --hbase-row-k......

中国大数据
2018/05/31
0
0
搭建HBase的本地模式、伪分布式、全分布式和HA模式

一、安装HBase: 我这里选择的是hbase-1.3.1-bin.tar.gz版本解压HBase: tar -zxvf hbase-1.3.1-bin.tar.gz -C ~/training 配置HBase的环境变量: export HBASE_HOME=/root/training/hbase-1.......

李金泽
2018/07/05
0
0
hbase 数据库简介安装与常用命令的使用

一:hbase 简介与架构功能 二:hbase 安装与配置 三:hbase 常见shell 命令操作 一:hbase 简介与架构功能 1.1 为什么要使用hbase 数据库 1.2 hbase 简介: 1.3 HBase与Hadoop的对比 1.4 HB...

flyfish225
2018/04/12
0
0
HBase实践 | HBase ThriftServer Kerberos认证

1.前置 用户可以通过ThriftServer来访问HBase服务,它的特点如下: ThriftServer代理用户访问HBase服务返回操作结果,用户客户端不需要直接跟HBase进行通信 用户可以使用java/python/php/c++...

封神
2018/11/27
0
0
八年磨一剑,重新定义HBase——HBase 2.0&阿里云HBase解读

摘要:2018年6月6日,阿里云ApsaraDB for HBase2.0正式发布!从2010年开始“试水”到2018年,拥有了3个PMC,6个Committer,拥有中国最多HBase Committer的公司之一的阿里巴巴是如何八年磨一剑...

Mr_zebra
2018/06/13
87
0

没有更多内容

加载失败,请刷新页面

加载更多

《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
5
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
6
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
4
0
OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
1K
11
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部