文档章节

Storm【Storm-Hbase接口】- 系列 1: 简单介绍

止静
 止静
发布于 2014/10/16 14:01
字数 967
阅读 276
收藏 0
package storm.contrib.hbase.bolts;

import static backtype.storm.utils.Utils.tuple;

import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import storm.contrib.hbase.utils.HBaseCommunicator;
import storm.contrib.hbase.utils.HBaseConnector;

/*

   一个读取Hbase的Bolt,不断的从Hbase中读取表中的行KEY,和列,通过tuples来发送
 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples
 */
public class HBaseColumnValueLookUpBolt implements IBasicBolt {
	
	private static final long serialVersionUID = 1L;
	
	private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;
	
	private static transient HBaseConnector connector = null;
	private static transient HBaseConfiguration conf = null;
	private static transient HBaseCommunicator communicator = null;
	OutputCollector _collector;

	/*
	 * Constructor initializes the variables storing the hbase table information and connects to hbase
	 */
	public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {

		this.tableName = tableName;
		this.colFamilyName = colFamilyName;
		this.colName = colName;
		this.rowKeyField = rowKeyField;

		connector = new HBaseConnector();
		conf = connector.getHBaseConf(hbaseXmlLocation);
		communicator = new HBaseCommunicator(conf);
	}

	/*
	 * emits the value of the column with name @colName and rowkey @rowKey
	 * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {

		String rowKey = input.getStringByField(this.rowKeyField);
		columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);
		collector.emit(tuple(rowKey, columnValue));
	}

	public void prepare(Map confMap, TopologyContext context,
			OutputCollector collector) {
		_collector = collector;
	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("rowKey", "columnValue"));
	}

	public Map<String, Object> getComponentConfiguration() {
		Map<String, Object> map = null;
		return map;
	}

	public void prepare(Map stormConf, TopologyContext context) {
	}
}

package storm.contrib.hbase.bolts;

import static backtype.storm.utils.Utils.tuple;

import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import storm.contrib.hbase.utils.HBaseCommunicator;
import storm.contrib.hbase.utils.HBaseConnector;

/*
 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples
 */
public class HBaseColumnValueLookUpBolt implements IBasicBolt {
	
	private static final long serialVersionUID = 1L;
	
	private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;
	
	private static transient HBaseConnector connector = null;
	private static transient HBaseConfiguration conf = null;
	private static transient HBaseCommunicator communicator = null;
	OutputCollector _collector;

	/*
	 * Constructor initializes the variables storing the hbase table information and connects to hbase
	 */
	public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {

		this.tableName = tableName;
		this.colFamilyName = colFamilyName;
		this.colName = colName;
		this.rowKeyField = rowKeyField;

		connector = new HBaseConnector();
		conf = connector.getHBaseConf(hbaseXmlLocation);
		communicator = new HBaseCommunicator(conf);
	}

	/*
	 * emits the value of the column with name @colName and rowkey @rowKey
	 * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {

		String rowKey = input.getStringByField(this.rowKeyField);
	
		//通过指定我们的 表名,行键,列族,列名,直接通过communitor拿到列的值。
		columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);
		collector.emit(tuple(rowKey, columnValue));
	}

	public void prepare(Map confMap, TopologyContext context,
			OutputCollector collector) {
		_collector = collector;
	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("rowKey", "columnValue"));
	}

	public Map<String, Object> getComponentConfiguration() {
		Map<String, Object> map = null;
		return map;
	}

	public void prepare(Map stormConf, TopologyContext context) {
	}
}

   

   Rowkey 

package storm.contrib.hbase.spouts;

import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import java.util.UUID;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Random;
import org.apache.log4j.Logger;

/*

这个Spout主要是用来发射 Hbase的RowKey,rowkey的集合为自己设置的。
 * Spout emitting tuples containing the rowkey of the hbase table
 */
public class RowKeyEmitterSpout implements IRichSpout {
    
	private static final long serialVersionUID = 6814162766489261607L;
	public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public RowKeyEmitterSpout() {
        this(true);
    }

    public RowKeyEmitterSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
    
    public boolean isDistributed() {
        return _isDistributed;
    }
    
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        Thread.yield();
        final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word), UUID.randomUUID());
    }
    
    public void ack(Object msgId) {
    }

    public void fail(Object msgId) {
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

	public void activate() {
	}

	public void deactivate() {
	}

	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

 


//  我们用来简单的测试系统的代码,测试接口是否正确

package storm.contrib.hbase.spouts;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class TestSpout implements IRichSpout {

	SpoutOutputCollector _collector;
	Random _rand;  
	int count = 0;


	public boolean isDistributed() {
		return true;
	}

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		_collector = collector;
		_rand = new Random();
	}

	public void nextTuple() {
		Utils.sleep(1000);
		String[] words = new String[] { "hello", "tiwari", "indore", "jayati"};
		Integer[] numbers = new Integer[] {
				1,2,3,4,5
		};

		if(count == numbers.length -1) {
			count = 0;
		}
		count ++;
		int number = numbers[count];
		String word = words[count];
		int randomNum = (int) (Math.random()*1000);
		_collector.emit(new Values(word, number));
	}


	public void close() {        
	}


	public void ack(Object id) {
	}

	public void fail(Object id) {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "number"));
	}

	public void activate() {
	}

	public void deactivate() {
	}

	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

}

    比较简单,也就不做解释了,Storm-hbase的接口并没有像Storm-kafka的接口那样,自身去处理轮询,自身去处理连接的问题。只是简单的构造了一个Hbase的连接,在连接的过程之中,直接构造了一个Connector就可以了。 

     看代码即知道。

© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
加载中

评论(1)

forborn
forborn
你好 有整个项目的代码吗 跪求
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
2018/11/16
0
0
storm自定义分组与Hbase预分区结合节省内存消耗

Hbas预分区 在系统中向hbase中插入数据时,常常通过设置region的预分区来防止大数据量插入的热点问题,提高数据插入的效率,同时可以减少当数据猛增时由于Region split带来的资源消耗。大量的...

刘洋intsmaze
2017/03/31
0
0
2016大数据唯一完整版教程

大数据教程(2016版) 链接: https://pan.baidu.com/s/1qXT9WwG 密码: qrht 2015-12-22_linux 2015-12-24_linux 2015-12-29_MapReduce 2015-12-31_MapReduce 2016-01-05_MapReduce 2016-01-0......

大象分享
2017/02/15
1.2K
0
大数据处理 Hadoop、HBase、ElasticSearch、Storm、Kafka、Spark

场景 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条...

江河海流
2014/03/30
5.1K
0
基于Kafka+Storm构建流式计算卖家日志系统

作者介绍 曾昌荣,来自京东商家研发部京麦平台组,2016年加入京东,从事商家开放平台相关工作。 本文系作者原创投稿,未经 DBAplus社群 允许,不得转载和使用。 本文要给大家分享的是我们如何...

曾昌荣
2017/10/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何优雅地检测JavaScript中的空闲时间?

是否可以在JavaScript中检测“ 空闲 ”时间? 我的主要用例可能是预取或预加载内容。 空闲时间: 用户不活动或没有使用CPU的时间 #1楼 如果不使用jQuery,则仅使用普通JavaScript: var inac...

技术盛宴
39分钟前
31
0
获取枚举值的属性

我想知道是否可以获取枚举值而不是枚举本身的属性? 例如,假设我有以下枚举: using System.ComponentModel; // for DescriptionAttributeenum FunkyAttributesEnum{ [Description(...

javail
今天
78
0
concurrently 启动多个serve时命令行输出混乱

在script中配置中加入参数 -r "start": "npx concurrently -r \"npm:serve\" \"npm:web\"" 输出比较整洁 输出混乱, 会将有用的信息冲掉...

阿豪boy
今天
48
0
每天AC系列(三):电话号码的字母组合

1 题目 Leetcode第17题。 数字2-9映射字母,给出一个包含字符串的数字,列出字母的所有组合。 2 递归 拿到这样的题目想到了递归实现,创建一个Map映射对应的字母,然后把它传给递归函数,同时...

Blueeeeeee
今天
44
0
Kettle自定义jar包供javascript使用

我们都知道 Kettle 是用 Java 语言开发,并且可以在 JavaScript 里面直接调用 java 类方法。所以有些时候,我们可以自定义一些方法,来供 JavaScript 使用。 本篇文章有参考自:https://www...

CREATE_17
昨天
114
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部