文档章节

Twitter Storm进阶初步,Storm能做什么

震秦
 震秦
发布于 2014/04/13 13:08
字数 1127
阅读 2.7K
收藏 55

本篇Blog是一个简单的Storm入门例子,目的让读者明白Storm是怎样的运行机制。以及后续会放出的几篇Storm高级特性以及最终将Storm融入Hadoop 2.x的YARN中。目的读者是已经进阶大数据的Hadoop,Spark用户,或者了解Storm想深入理解Storm的读者用户。

项目Pom(Storm jar没有提交到Maven中央仓库,需要在项目中加入下面的仓库地址):

<repositories>
    <repository>
        <id>central</id>
        <name>Maven Repository Switchboard</name>
        <layout>default</layout>
        <url>http://maven.oschina.net/content/groups/public/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>clojars</id>
        <url>https://clojars.org/repo/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <releases>
            <enabled>true</enabled>
        </releases>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.yaml</groupId>
        <artifactId>snakeyaml</artifactId>
        <version>1.13</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.3.3</version>
    </dependency>

    <dependency>
        <groupId>org.clojure</groupId>
        <artifactId>clojure</artifactId>
        <version>1.5.1</version>
    </dependency>

    <dependency>
        <groupId>storm</groupId>
        <artifactId>storm</artifactId>
        <version>0.9.0.1</version>
    </dependency>

    <dependency>
        <groupId>storm</groupId>
        <artifactId>libthrift7</artifactId>
        <version>0.7.0</version>
    </dependency>
</dependencies>

下面是一个Storm的HelloWord的例子,代码有删减,熟悉Storm的读者自然能把代码组织成一个完整的例子。

public static void main(String[] args) {
	Config conf = new Config();
	conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm");
	conf.put(Config.STORM_CLUSTER_MODE, "local");
	//conf.put("storm.local.mode.zmq", "false");
	conf.put("storm.zookeeper.root", "/storm");
	conf.put("storm.zookeeper.session.timeout", 50000);
	conf.put("storm.zookeeper.servers", "nowledgedata-n15");
	conf.put("storm.zookeeper.port", 2181);
	//conf.setDebug(true);
	//conf.setNumWorkers(2);
	
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("words", new TestWordSpout(), 2); 
	
	builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
	       .shuffleGrouping("words");
	LocalCluster cluster = new LocalCluster();
	cluster.submitTopology("test", conf, builder.createTopology());
}
  • Config.STORM_LOCAL_DIR是配置一个本地路径,Storm会在这个路径写入一些配置信息和临时数据。
  • Config.STORM_CLUSTER_MODE是运行模式,local和distributed两个选项,即本地模式和分布式模式。本地模式在运行时时多线程模拟的,开发测试用;分布式模式在分布式集群下是多进程的,真正的分布式。
  • Storm的Spout和Blot高可用是通过ZooKeeper协调的,storm.zookeeper.root是一个ZooKeeper地址,并且有对应的端口号
  • Debug是测试模式,有更详细的日志信息。

TestWordSpout是一个Storm自带的例子,用来随机的产生<code>new String[] {"nathan", "mike", "jackson", "golda", "bertels"};</code>列表中的字符串,用来提供数据源。

其中DefaultStringBolt的源码:

OutputCollector collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

public void execute(Tuple tuple) {
	log.info("rev a message: " + tuple.getString(0));
	collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    collector.ack(tuple);
}

运行日志:

10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson

数据由一个Storm叫做喷嘴(Spout,也相当一个水龙头,能产生数据的来源端)产生,然后传递给后端一连串的的Blot,最终被转换和消费。而Spout和Blot都是并行的,并行度都可以自己设置(本地运行是靠多线程模拟的)。如:

builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)

喷嘴TestWordSpout的并行度是2,DefaultStringBolt的并行度是5.

从日志可以看出,数据经过喷嘴到达预先定于的一个Blot,打印了日志。我测试代码设置的并行度是5,日志中统计,确实是5个线程:

  1. Thread-29-exclaim2
  2. Thread-31-exclaim2
  3. Thread-26-exclaim2
  4. Thread-33-exclaim2
  5. Thread-35-exclaim2

关于Storm是是什么?http://storm.incubator.apache.org/http://www.ibm.com/developerworks/cn/opensource/os-twitterstorm/ 有详细的介绍。

借用OSC网友的话说,Hadoop就是商场里自动升降式的电梯,用户需要排队等待,选按楼层,然后到达;而Storm就像是自动扶梯,扶梯预先设置好运行后,来人就立即运走,目的地是明确的。

Storm按我的理解,Storm和Hadoop是完全不同的,设计上也没有半点拟合的部分。Storm更像是我之前介绍过的Spring Integration,是一个数据流系统。它能把数据按照预设定的流程,把数据做各种转换,传递,分解,合并,最后数据到达后端存储。只不过Storm是可以分布式,而且分布式的能力也是可以自己设置。

Storm的这种特性很适合大数据类的ETL系统开发。

© 著作权归作者所有

震秦
粉丝 192
博文 34
码字总数 35126
作品 0
西安
架构师
私信 提问
加载中

评论(8)

震秦
震秦 博主
ETL一般也都是实时的。可以理解为分布式ETL,也可以理解为分布式消息。适应场景为有连续数据流,需要把数据做转化后向后端存储,转发的应用。
xwqfudimo
xwqfudimo
??
xwqfudimo
xwqfudimo
博主,可不可以理解为实时版的ETL工具(比如kettle)。
刘晓飞

引用来自“影之歌”的评论

这个? 怎么进阶了?…… 表示没看出来……

引用来自“震秦”的评论

不好意思,震惊到你了。 后续还有几篇,直到把Storm融合到YARN中。

哈哈、。。。。[53]

震秦
震秦 博主

引用来自“影之歌”的评论

这个? 怎么进阶了?…… 表示没看出来……

不好意思,震惊到你了。 后续还有几篇,直到把Storm融合到YARN中。

散关清渭
散关清渭
这个? 怎么进阶了?…… 表示没看出来……
java10001
java10001
可以对比一下ali的jstorm
深蓝苹果
深蓝苹果
对storm这种增量机制下,底层存储的结构不够了解,有相关文章推荐吗
Apache Storm 的历史及经验教训

Apache Storm 最近成为了ASF的顶级项目,这对于该项目和我个人而言是一个重大的里程碑。很难想像4年前Storm只是我脑海中的一个想法,但现在却成为了一个有着大社区支持并被无数企业使用的繁荣...

run_mei
2014/10/14
1.2W
10
使用 Twitter Storm 处理实时的大数据

使用 Twitter Storm 处理实时的大数据 流式处理大数据简介 IBM DW/M. Tim Jones, 独立作家, 顾问 简介: Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与...

IBMdW
2012/12/06
6.4K
3
Apache Storm 0.9.7 发布,分布式实时计算

Apache Storm 0.9.7 发布了,Apache Storm 的前身是 Twitter Storm 平台,目前已经归于 Apache 基金会管辖。 Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像...

开源中国股侠
2016/09/08
814
1
Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli
2015/06/18
0
0
Apache Storm简介及安装部署

Apache Storm是一个分布式的、可靠的、容错的实时数据流处理框架。它与Spark Streaming的最大区别在于它是逐个处理流式数据事件,而Spark Streaming是微批次处理,因此,它比Spark Streaming...

风火数据
2018/07/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周四乱弹 —— 水果你们都没吃全

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @ 莱布妮子:分享五月天的单曲《温柔》@小小编辑 @cIouddyy @clouddyy 《温柔》- 五月天 手机党少年们想听歌,请使劲儿戳(这里) @FalconChe...

小小编辑
31分钟前
91
1
聚合支付网站被黑客攻击 导致数据库被篡改的防御办法

2020春节即将来临,收到新聚合支付平台网站客户的求助电话给我们Sinesafe,反映支付订单状态被修改由原先未支付修改为已支付,导致商户那边直接发货给此订单会员了,商户和平台的损失较大,很多码...

网站安全
昨天
63
0
MySQL-基于SELECT查询的UPDATE查询

我需要检查(从同一张表)基于日期时间的两个事件之间是否存在关联。 一组数据将包含某些事件的结束日期时间,另一组数据将包含其他事件的开始日期时间。 如果第一个事件在第二个事件之前完成...

javail
昨天
70
0
将PostgreSQL数据库复制到另一台服务器

我正在将生产PostgreSQL数据库复制到开发服务器。 什么是最快,最简单的方法? #1楼 pg_dump the_db_name > the_backup.sql 然后将备份复制到您的开发服务器,并使用以下命令进行还原: ps...

技术盛宴
昨天
130
0
[译]软件架构师之路

今天给大家带来一篇自己翻译的干货《软件架构师之路》。本周Github上升很快的项目。其内容对致力于成为软件架构师(不论前后端)的同学应该都会有极大的帮助。 项目地址: 中文地址 https://...

gamedilong
昨天
79
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部