文档章节

Hbase增删查改工具类

gulf
 gulf
发布于 07/17 15:56
字数 787
阅读 101
收藏 0
package cn.hljmobile.tagcloud.service.data.repository;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.log4j.Logger;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Repository;

//https://github.com/alibaba/simplehbase
//https://zhuanlan.zhihu.com/p/22742173
@Repository
public class HbaseQueryDao {

	Logger logger = Logger.getLogger(HbaseQueryDao.class); 
	
	private String columnFamily = "columns";

	public String getColumnFamily() {
		return columnFamily;
	}

	public void setColumnFamily(String columnFamily) {
		this.columnFamily = columnFamily;
	}

	@Autowired
	private HbaseTemplate hbaseTemplate;

	public <T> T queryForBeanByRowKey(String tableName, String rowKey, final Class<T> beanType) {
		
		logger.info("-----------------------HbaseQueryDao.queryForBeanByRowKey-----------------------------------");
		logger.info("tableName:" + tableName + ",rowKey:" + rowKey);
		logger.info("-----------------------HbaseQueryDao.queryForBeanByRowKey-----------------------------------");
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<T>() {  
            public T mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes(columnFamily));
            	T t = beanType.newInstance();
            	BeanWrapper beanWrapper = new BeanWrapperImpl(t);
                for(Map.Entry<byte[], byte[]> entry : map.entrySet()){
                	beanWrapper.setPropertyValue(Bytes.toString(entry.getKey()),Bytes.toString(entry.getValue()));
                }
//                List<Cell> ceList = result.listCells();
//                
//                if (ceList != null && ceList.size() > 0) {  
//                    for (Cell cell : ceList) {
//                    	
//                    	String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),  
//                                cell.getQualifierLength());
//                    	String columnValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
//                    	if (beanWrapper.isWritableProperty(columnName)) {
//                    		beanWrapper.setPropertyValue(columnName,columnValue);
//                    	}
//                    	
//                    }
//                }

                return t;  
            }  
        });  
	}
	
	public Map<String, String> queryForMapByRowKey(String tableName, String rowKey) {
		
		logger.info("-----------------------HbaseQueryDao.queryForMapByRowKey-----------------------------------");
		logger.info("tableName:" + tableName + ",rowKey:" + rowKey);
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, String>>() {  
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  

            	/*Map<byte[], byte[]> mapColumn = result.getFamilyMap(Bytes.toBytes(columnFamily));
            	Map<String, String> map = new HashMap<String, String>();
                for(Map.Entry<byte[], byte[]> entry : mapColumn.entrySet()){
                	map.put(Bytes.toString(entry.getKey()),Bytes.toString(entry.getValue()));
                }*/
                
                Map<String, String> map = new HashMap<String, String>();
            	if(result.listCells()!=null && result.listCells().size()>0){  
            		Map<byte[], byte[]> mapColumn = result.getFamilyMap(Bytes.toBytes(columnFamily));
            		for(Map.Entry<byte[], byte[]> entry : mapColumn.entrySet()){
                    	map.put(Bytes.toString(entry.getKey()),Bytes.toString(entry.getValue()));
                    }
            	}
                return map;  
            }  
        });  
	}

	public List<Map<String, String>> queryForListByScanRange(String tableName, String startRow, String stopRow) {
		
		logger.info("-----------------------HbaseQueryDao.queryForListByScanRange-----------------------------------");
		logger.info("tableName:" + tableName + ",startRow:" + startRow+ ",stopRow:" + stopRow);
		
		Scan scan = new Scan();  
		if (startRow != null) {
			scan.setStartRow(Bytes.toBytes(startRow));
        } else {
        	scan.setStartRow(Bytes.toBytes(""));
        }
        if (stopRow != null) {
        	scan.setStopRow(Bytes.toBytes(stopRow));
        } else {
        	scan.setStopRow(Bytes.toBytes(""));
        }
        
        Filter pf = new PrefixFilter(Bytes.toBytes(startRow));
        scan.setFilter(pf);

        //scan.setBatch(batch)
        return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, String>>() {
			
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<String, String> map = new HashMap<String, String>();
            	String  rowkey = "";
            	if(result.listCells()!=null && result.listCells().size()>0){  
	            	for (Cell cell : result.listCells()) {  
	            		rowkey =Bytes.toString( cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());  
	            		String value =Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());  
	                    String family =  Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());  
	                    String quali = Bytes.toString( cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());  
	                    if(columnFamily.equals(family)) {
	                    	map.put(quali,value);
	                    }
//	                    System.out.println(family+"_"+quali+"_"+ value);  
	                }
	            	map.put("rowkey",rowkey);
            	}
                /*
                 Map<byte[], byte[]> mapColumn = result.getFamilyMap(Bytes.toBytes(columnFamily));
                 for(Map.Entry<byte[], byte[]> entry : mapColumn.entrySet()){
                	map.put(Bytes.toString(entry.getKey()), Bytes.toString(entry.getValue()));
                }*/
                return map;  
            }  
        });  
	}
	//用正则表达式模糊查询
	public List<Map<String, String>> queryForListByRegex(String tableName,String regex) {
		logger.info("-----------------------HbaseQueryDao.queryForListByRegex-----------------------------");
		logger.info("tableName:" + tableName + ",rowKey:" + regex);
		Scan scan = new Scan();  
		/*List<Filter> filters = new ArrayList<Filter>();   
		Filter filter1 = new RowFilter(CompareOp.EQUAL,new RegexStringComparator("kpi")); 
		filters.add(filter1); 
		Filter filter2 = new RowFilter(CompareOp.EQUAL,new RegexStringComparator("kqi")); 
		filters.add(filter2); 
		FilterList filterList = new FilterList(filters); 
		scan.setFilter(filterList);*/
		Filter filter = new RowFilter(CompareOp.EQUAL,new RegexStringComparator(regex)); 
		scan.setFilter(filter);
        return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, String>>() {
			
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<String, String> map = new HashMap<String, String>();
            	String  rowkey = "";
            	if(result.listCells()!=null && result.listCells().size()>0){  
	            	for (Cell cell : result.listCells()) {  
	            		rowkey =Bytes.toString( cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());  
	            		String value =Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());  
	                    String family =  Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());  
	                    String quali = Bytes.toString( cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());  
	                    if(columnFamily.equals(family)) {
	                    	map.put(quali,value);
	                    }
	                }
	            	map.put("rowkey",rowkey);
            	}
                return map;  
            }  
        });  
	}
	//查询除了regex以外的值
	public List<Map<String, String>> queryForListNotByRegex(String tableName,String regex) {
		logger.info("-----------------------HbaseQueryDao.queryForListByRegex-----------------------------");
		logger.info("tableName:" + tableName + ",rowKey:" + regex);
		Scan scan = new Scan();  
		Filter filter = new RowFilter(CompareOp.NOT_EQUAL,new RegexStringComparator(regex)); 
		scan.setFilter(filter);
        return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, String>>() {
			
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<String, String> map = new HashMap<String, String>();
            	String  rowkey = "";
            	if(result.listCells()!=null && result.listCells().size()>0){  
	            	for (Cell cell : result.listCells()) {  
	            		rowkey =Bytes.toString( cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());  
	            		String value =Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());  
	                    String family =  Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());  
	                    String quali = Bytes.toString( cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());  
	                    if(columnFamily.equals(family)) {
	                    	map.put(quali,value);
	                    }
	                }
	            	map.put("rowkey",rowkey);
            	}
                return map;  
            }  
        });  
	}
	
	
	
	

	public void save(String tableName, String rowKey, Map<String,String> rowMap) {

		for (Entry<String, String> en : rowMap.entrySet()) {
			hbaseTemplate.put(tableName, rowKey, columnFamily, en.getKey(), Bytes.toBytes(en.getValue()));
		}
	}
	
	public void delete(String tableName, final String rowKey) {
		
		  hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {  
		        public Boolean doInTable(HTableInterface table) throws Throwable {  
		            boolean flag = false;  
		            try{  
		                List<Delete> list = new ArrayList<Delete>();  
		                Delete d1 = new Delete(rowKey.getBytes());  
		                list.add(d1);  
		            	table.delete(list); 
		             flag = true;  
		            }catch(Exception e){  
		                e.printStackTrace();  
		            }  
		            return flag;  
		        }  
		    });  
	}
	
}

 

© 著作权归作者所有

共有 人打赏支持
gulf
粉丝 12
博文 276
码字总数 156468
作品 0
广州
程序员
hbase 数据库简介安装与常用命令的使用

一:hbase 简介与架构功能 二:hbase 安装与配置 三:hbase 常见shell 命令操作 一:hbase 简介与架构功能 1.1 为什么要使用hbase 数据库 1.2 hbase 简介: 1.3 HBase与Hadoop的对比 1.4 HB...

flyfish225
04/12
0
0
基于HBase的实时历史数据库设计思路

实时历史数据库是实现工业海量数据采集存储的有效手段。HBase的写入性能优于查询性能,这个特点使其适合作为实时历史数据库开发的基础。 所有的开发是基于HBase的API进行,主要开发节点和数据...

什么都没想到
2017/10/23
0
0
HBase 常用Shell命令

1.进入hbase shell console $HBASE_HOME/bin/hbase shell 如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命...

zlfwmm
06/26
0
0
Hadoop(4)--Hbase

Hadoop 其它组成角色介绍--Hbase 在apache的官方网站上,对于Hbase的定义是他是Hadoop的第一个分布式、可扩展的大数据存储的数据库,他的目标是将非常大的表托管到一个集群中进行相应的存储,...

spark009
08/13
0
0
java设计模式求教(自己认为问题为此类)

大神们好,我目前开发一个历史记录的功能,准备把历史记录都存到hbase中取,大数据组给我一个jar包,包中有一个我需要使用的类,我实例化这个类的对象时就会创建hbase和zookeeper的链接什么的...

msgyg
06/29
0
2

没有更多内容

加载失败,请刷新页面

加载更多

MySQL 到底支不支持事务嵌套?

最近开发中遇到了使用MySQL,多次开启事务,出现了数据错乱问题,伪代码如下: begin; # 操作1 begin; # 操作2 rollback; 执行完后出现了操作1的数据真正写入,只有操作2的数据回滚...

宇润
53分钟前
7
0
fastDfs应用(安装过程待写)

1.效果 2.安装 2.1 导入已经安装好fastDFS的镜像 2.1.1 导入镜像 2.1.2 更改系统兼容性 2.1.3 开机 2.1.4 修改 一下内容 2.1.4.1 修改系统的ip 原来系统ip...

Lucky_Me
57分钟前
5
0
5. Python3源码—字符串(str)对象

5.1. 字符串对象 字符串对象是“变长对象”。 5.1.1. Python中的创建 Python中字符串(strs)对象最重要的创建方法为PyUnicode_DecodeUTF8Stateful,如下Python语句最终会调用到PyUnicode_D...

Mr_zebra
今天
4
0
第十章:路由网关(Zuul)进阶:过滤器、异常处理

第十章:路由网关(Zuul)进阶:过滤器、异常处理 简单介绍了关于Zuul的一些简单使用以及一些路由规则的简单说明。而对于一个统一网关而言,需要处理各种各类的请求,对不同的url进行拦截,或者...

DemonsI
今天
4
0
nginx屏蔽指定接口(URL)

Step1:需求 web平台上线后,需要屏蔽某个服务接口,但又不想重新上线,可以采用nginx屏蔽指定平台接口的办法 Step2:具体操作 location /dist/views/landing/UNIQUE_BEACON_URL { re...

Linux_Anna
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部