文档章节

Storm【Storm-MongoDB接口】- 1: 简要介绍

止静
 止静
发布于 2014/10/16 14:30
字数 1042
阅读 568
收藏 1


阅读前提:

    其一:    您需要对于MongoDB有一个初步的了解。

    其二:    您需要对Storm本身有所了解


阅读建议:

                由于整个Storm接口系列包含了围绕Storm实时处理的框架的一系列接口,在一系列的接口文档之中,请对比Storm-hbase接口的博文


整体的Storn接口分为以下的几个class


1:MongoBolt.java

2 : MongoSpout.java

3 : MongoTailableCursorTopology.java

4 : SimpleMongoBolt.java


看代码说话:

package storm.mongo;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;

/**
 *
 * 注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于Mongo的Storm交互做了一次封装
 *
 * @author Adrian Petrescu <apetresc@gmail.com>
 *
 */
public abstract class MongoBolt extends BaseRichBolt {
	private OutputCollector collector;
	
	// MOngDB的DB对象
	private DB mongoDB;
	
	
        //记录我们的主机,端口,和MongoDB的数据DB民粹
	private final String mongoHost;
	private final int mongoPort;
	private final String mongoDbName;

	/**
	 * @param mongoHost The host on which Mongo is running.
	 * @param mongoPort The port on which Mongo is running.
	 * @param mongoDbName The Mongo database containing all collections being
	 * written to.
	 */
	protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) {
		this.mongoHost = mongoHost;
		this.mongoPort = mongoPort;
		this.mongoDbName = mongoDbName;
	}
	
	@Override
	public void prepare(
			@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		
		this.collector = collector;
		try {
		
		        //prepare方法目前在初始化的过程之中得到了一个Mongo的连接
			this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void execute(Tuple input) {
	
	    
	        //注意我们在这里还有一个判断,判断当前是否该发射
	
		if (shouldActOnInput(input)) {
			String collectionName = getMongoCollectionForInput(input);
			DBObject dbObject = getDBObjectForInput(input);
			if (dbObject != null) {
				try {
					mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1));
					collector.ack(input);
				} catch (MongoException me) {
					collector.fail(input);
				}
			}
		} else {
			collector.ack(input);
		}
	}

	/**
	 * Decide whether or not this input tuple should trigger a Mongo write.
	 *
	 * @param input the input tuple under consideration
	 * @return {@code true} iff this input tuple should trigger a Mongo write
	 */
	public abstract boolean shouldActOnInput(Tuple input);
	
	/**
	 * Returns the Mongo collection which the input tuple should be written to.
	 *
	 * @param input the input tuple under consideration
	 * @return the Mongo collection which the input tuple should be written to
	 */
	public abstract String getMongoCollectionForInput(Tuple input);
	
	/**
	 * Returns the DBObject to store in Mongo for the specified input tuple.
	 * 
	 
	 拿到DBObject的一个抽象类
	 
	 
	 * @param input the input tuple under consideration
	 * @return the DBObject to be written to Mongo
	 */
	public abstract DBObject getDBObjectForInput(Tuple input);
	
	
	//注意这里随着计算的终结被关闭了。
	@Override
	public void cleanup() {
		this.mongoDB.getMongo().close();
	}

}


2 :

  

package storm.mongo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;

import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;

/**
* A Spout which consumes documents from a Mongodb tailable cursor.
*
* Subclasses should simply override two methods:
* <ul>
* <li>{@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}
* <li>{@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns
* a Mongo document into a Storm tuple matching the declared output fields.
* </ul>
*
** <p>
* <b>WARNING:</b> You can only use tailable cursors on capped collections.
* 
* @author Dan Beaulieu <danjacob.beaulieu@gmail.com>
*
*/


// 在这里,抽象的过程中,依旧保持了第一层的Spout为一个抽象类,MongoSpout为abstract的一个抽象类,子类在继承这// 个类的过程之中实现特定的方法即可
// 这里还有一个类似Cursor的操作。


public abstract class MongoSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;
	
	private LinkedBlockingQueue<DBObject> queue;
	private final AtomicBoolean opened = new AtomicBoolean(false);
	
	private DB mongoDB;
	private final DBObject query;
	
	private final String mongoHost;
	private final int mongoPort;
	private final String mongoDbName;
	private final String mongoCollectionName;
	
	
	public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) {
		
		this.mongoHost = mongoHost;
		this.mongoPort = mongoPort;
		this.mongoDbName = mongoDbName;
		this.mongoCollectionName = mongoCollectionName;
		this.query = query;
	}
	
	class TailableCursorThread extends Thread {
		
		
		// 内部类 TailableCursorThread线程
		
		
		//注意在其中我们使用了LinkedBlockingQueue的对象,有关java高并发的集合类,请参考本ID的【Java集合类型的博文】博文。
		LinkedBlockingQueue<DBObject> queue;
		String mongoCollectionName;
		DB mongoDB;
		DBObject query;

		public TailableCursorThread(LinkedBlockingQueue<DBObject> queue, DB mongoDB, String mongoCollectionName, DBObject query) {
			
			this.queue = queue;
			this.mongoDB = mongoDB;
			this.mongoCollectionName = mongoCollectionName;
			this.query = query;
		}

		public void run() {
			
			while(opened.get()) {
				try {
					// create the cursor
					mongoDB.requestStart();
					final DBCursor cursor = mongoDB.getCollection(mongoCollectionName)
												.find(query)
												.sort(new BasicDBObject("$natural", 1))
												.addOption(Bytes.QUERYOPTION_TAILABLE)
												.addOption(Bytes.QUERYOPTION_AWAITDATA);
					try {
						while (opened.get() && cursor.hasNext()) {
		                    final DBObject doc = cursor.next();
		
		                    if (doc == null) break;
		
		                    queue.put(doc);
		                }
					} finally {
						try { 
							if (cursor != null) cursor.close(); 
						} catch (final Throwable t) { }
	                    try { 
	                    	mongoDB.requestDone(); 
	                    	} catch (final Throwable t) { }
	                }
					
					Utils.sleep(500);
				} catch (final MongoException.CursorNotFound cnf) {
					// rethrow only if something went wrong while we expect the cursor to be open.
                    if (opened.get()) {
                    	throw cnf;
                    }
                } catch (InterruptedException e) { break; }
			}
		};
	}
	
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		
		this.collector = collector;
		this.queue = new LinkedBlockingQueue<DBObject>(1000);
		try {
			this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}

		TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query);
		this.opened.set(true);
		listener.start();
	}

	@Override
	public void close() {
		this.opened.set(false);
	}

	@Override
	public void nextTuple() {
		
		DBObject dbo = this.queue.poll();
		if(dbo == null) {
            Utils.sleep(50);
        } else {
            this.collector.emit(dbObjectToStormTuple(dbo));
        }
	}

	@Override
	public void ack(Object msgId) {
		// TODO Auto-generated method stub	
	}

	@Override
	public void fail(Object msgId) {
		// TODO Auto-generated method stub	
	}

	public abstract List<Object> dbObjectToStormTuple(DBObject message);

}







© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
加载中

评论(1)

javawangyu
javawangyu
storm从mongodb中load数据 然后分析,和直接在mongodb中进行数据分析 有什么优势吗?
storm client command

最近在研究实时日志分析,storm确实不错,以下是命令参数: storm help Syntax: storm jar topology-jar-path class 运行jar包中类的主函数和指定的参数 Commands: activate storm activate ...

China_OS
2014/02/22
1.7K
0
Apache Slider + Storm

Apache Slider + Storm 系统环境 安装如下组件,部署可用环境 JDK 1.7.0_79 Apache Zookeeper 3.4.* Apache Zookeeper Apache Hadoop 2.6.* Apache Hadoop Apache Storm 0.9.4 Apache Storm......

Yulong_
2016/09/21
512
0
storm启动卡着不动,求高手

刚把storm集群部好,结果启动时卡着不动。 执行:./storm nimbus 提示: [root@WY_CentOS_100G bin]# ./storm nimbus Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/src......

lja
2013/07/11
2.2K
1
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
791
1
Storm概念讲解和工作原理介绍

Strom的结构 Storm与传统关系型数据库 传统关系型数据库是先存后计算,而storm则是先算后存,甚至不存 传统关系型数据库很难部署实时计算,只能部署定时任务统计分析窗口数据 关系型数据库重...

张超
2015/04/26
2.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

每天AC系列(一):三数之和

1 题目 LeetCode第15题,难度中等,题目描述: 给定一个包含 n 个整数的数组 nums,判断 nums 中是否存在三个元素 a,b,c ,使得 a + b + c = 0 ?找出所有满足条件且不重复的三元组。 注意:答...

Blueeeeeee
48分钟前
56
0
OSChina 周四乱弹 —— 水果你们都没吃全

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @ 莱布妮子:分享五月天的单曲《温柔》@小小编辑 @cIouddyy @clouddyy 《温柔》- 五月天 手机党少年们想听歌,请使劲儿戳(这里) @FalconChe...

小小编辑
今天
99
1
聚合支付网站被黑客攻击 导致数据库被篡改的防御办法

2020春节即将来临,收到新聚合支付平台网站客户的求助电话给我们Sinesafe,反映支付订单状态被修改由原先未支付修改为已支付,导致商户那边直接发货给此订单会员了,商户和平台的损失较大,很多码...

网站安全
昨天
63
0
MySQL-基于SELECT查询的UPDATE查询

我需要检查(从同一张表)基于日期时间的两个事件之间是否存在关联。 一组数据将包含某些事件的结束日期时间,另一组数据将包含其他事件的开始日期时间。 如果第一个事件在第二个事件之前完成...

javail
昨天
70
0
将PostgreSQL数据库复制到另一台服务器

我正在将生产PostgreSQL数据库复制到开发服务器。 什么是最快,最简单的方法? #1楼 pg_dump the_db_name > the_backup.sql 然后将备份复制到您的开发服务器,并使用以下命令进行还原: ps...

技术盛宴
昨天
130
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部