文档章节

Storm工作原理(1)

林中漫步
 林中漫步
发布于 2016/06/18 11:43
字数 1691
阅读 131
收藏 5

1 Storm 是什么

     Storm是一个分布式的实时计算框架;按照作者 Nathan Marz 的说法,Storm对于实时计算的意义类似于Hadoop对于批处理的意义。

     Storm是流式计算框架、实时计算框架, 而Hadoop本质上是一个批处理框架、离线计算框架。

     Storm的核心代码用的是clojure,另一部分代码用python;开发用户可使用java开发topology。

2 Storm的工作流程

     Storm集群中有两种节点,一种是Nimbus(控制结点),另一种是Supervisor(工作节点)。它们是这样工作的:
      1. 客户端提交topology到Nimbus;
      2. Nimbus为该topology创建本地目录,将topology拆分为一个个的task;
      3. 在zookeeper上创建assignments节点,以存储task与supervisor节点中woker的对应关系;
      4. 在zookeeper上创建taskbeats节点来监控task的心跳,启动topology;
      5. Supervisor轮询zookeeper,认领分配给自己的tasks,启动多个woker进程,每个work创建相应的task线程;根据topology信息进行初始化task之间的连接,最终整个拓扑运行起来。

     Topology处理流程图: 输入图片说明

3 Zookeeper集群

     Storm使用zookeeper来协调整个集群,但是要注意的是storm并不用zookeeper来传递消息。所以zookeeper上的负载是非常低的,单个节点的zookeeper在大多数情况下都已经足够了, 但是如果你要部署大一点的storm集群, 那么你需要的zookeeper也要大一点。关于如何部署zookeeper,参考:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html

     部署zookeeper有些需要注意的地方:

  1. 对zookeeper做好监控非常重要, zookeeper是fail-fast的系统,只要出现什么错误就会退出。 更多细节见 http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_supervision
  2. 在storm的生产环境场景,要配置一个cron job来压缩zookeeper的数据和业务日志。zookeeper自己是不会去压缩这些的,所以你如果不设置一个cron job, 那么你很快就会发现磁盘不够用了,更多细节可查看:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_maintenance

4 Topology的程序结构

     和同样是计算框架的MapReduce相比,在MapReduce集群上运行的是Job,而在Storm集群上运行的是Topology。但是Job在运行完毕后会自行结束,Topology只能手动kill掉,否则会一直运行下去。

     Storm不处理计算结果的保存,这是应用代码需要负责的事情,如果数据不大,你可以简单地保存在内存里,也可以每次都更新数据库,也可以采用NoSQL存储。这部分事情完全交给应用开发者。

     Topology的结构图:

输入图片说明

4.1 Component

     上图中,Spout 和Bolt都是 Component。所以,Storm定义了一个名叫IComponent的总接口。 家族类图如下:绿色部分是我们最常用、比较简单的部分;红色部分是与事务相关的。 输入图片说明

4.2 Spout

     Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他Spout类来完成,也可以通过实现IRichSpout接口来实现:

public interface ISpout extends Serializable { 
  void open(Map conf, TopologyContext context, SpoutOutputCollector collector); 
  void close(); 
  void nextTuple(); 
  void ack(Object msgId); 
  void fail(Object msgId); 
} 
  • open() -- 初始化方法
  • close() -- 在该spout将要关闭时调用。但是不保证其一定被调用,因为在集群中supervisor节点,可以使用kill -9来杀死worker进程。只有当Storm是在本地模式下运行,如果是发送停止命令,可以保证close的执行。
  • ack(Object msgId) -- 成功处理tuple时回调的方法,通常情况下,此方法的实现是将消息队列中的消息移除,防止消息重放
  • fail(Object msgId) -- 处理tuple失败时回调的方法,通常情况下,此方法的实现是将消息放回消息队列中然后在稍后时间里重放。
  • nextTuple() -- 这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。调用此方法时,storm向spout发出请求, 让spout发出元组(tuple)到输出器(ouput collector)。这种方法应该是非阻塞的,所以spout如果没有元组发出,这个方法应该返回。nextTuple、ack 和fail 都在spout任务的同一个线程中被循环调用。 当没有元组的发射时,应该让nextTuple睡眠一个很短的时间(如一毫秒),以免浪费太多的CPU。

     继承了BaseRichSpout后,不用实现close()、activate()、 deactivate()、ack()、fail() 和 getComponentConfiguration(),只关心最基本核心的部分。      通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。

4.3 Bolt

     Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口等来完成。

  • prepare() -- 此方法和Spout中的open方法类似,在集群中一个worker中的task初始化时调用。 它提供了bolt执行的环境。
  • declareOutputFields() -- 用于声明当前Bolt发送的Tuple中包含的字段(field),和Spout中类似。
  • cleanup() -- 同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
  • execute() -- 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送是通过emit方法来完成的。execute接受一个 tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。

     Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。如果你确实要反馈失败,可以抛出FailedException。      通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现 IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动实现了collector.emit.ack(inputTuple)。

5 Topology的运行方式

     在开始创建项目之前,了解Storm的操作模式(operation modes)是很重要的。 Storm有两种运行方式:

  • 本地运行的提交方式:
LocalCluster cluster = new LocalCluster(); 
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); 
Thread.sleep(2000); 
cluster.shutdown(); 
  • 分布式提交方式:
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); 

     需要注意的是,在编写完Topology代码之后,需要打包成jar,然后放到Nimbus上运行。打包的时候,不需要把依赖的storm.jar打进去,否则运行时会报错。因为在集群模式下,topology是依赖集群环境而执行的(见storm.yaml 配置文件)。

     运行命令如下: storm jar StormTopology.jar mainclass [args]

© 著作权归作者所有

共有 人打赏支持
林中漫步
粉丝 98
博文 55
码字总数 33266
作品 0
深圳
架构师
私信 提问
storm-环境搭建和第一个topology

从原理到操作,还是有点距离 :) 基于 Linux ubuntu 3.13.0-24-generic ------------- 预备工作 * java * python(>=2.6) * zeromq * jzmq * zookeeper 下载(不需安装) wget https://github......

深蓝苹果
2014/06/10
0
0
熟悉storm/spark源码 开发

项目属性 工作模式 : 网上工作 技能要求 : 1. 可以对storm/spark 源码进行二次开发! 2. 熟悉storm/spark源码,底层实现原理,细节,调优方案! 项目描述 1、公司一个用storm/spark 源码开发...

水中月10
2016/10/07
1
0
Storm概念讲解和工作原理介绍

Strom的结构 Storm与传统关系型数据库 传统关系型数据库是先存后计算,而storm则是先算后存,甚至不存 传统关系型数据库很难部署实时计算,只能部署定时任务统计分析窗口数据 关系型数据库重...

张超
2015/04/26
0
0
Apache Storm简介及安装部署

Apache Storm是一个分布式的、可靠的、容错的实时数据流处理框架。它与Spark Streaming的最大区别在于它是逐个处理流式数据事件,而Spark Streaming是微批次处理,因此,它比Spark Streaming...

风火数据
2018/07/20
0
0
storm 原理简介及单机版安装指南(转)

本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的、高容错的实时计算系统。 Storm对于实时计算的的意义相当于Hadoop对于批处理的意义。Hadoop为我们提供...

Jacos
2014/12/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

欧拉公式

欧拉公式表达式 欧拉公式的几何意 cosθ + j sinθ 是个复数,实数部分也就是实部为 cosθ ,虚数部分也就是虚部为 j sinθ ,对应复平面单位圆上的一个点。 根据欧拉公式和这个点可以用 复指...

sharelocked
27分钟前
2
0
burpsuite无法抓取https数据包

1.将浏览器和burpsuite的代理都设置好 2.在浏览器地址栏输入: http://burp 3.下载下面的证书,并将证书导入浏览器 cacert.der

Frost729
52分钟前
1
0
JeeSite4.x 消息管理、消息推送、消息提醒

实现统一的消息推送接口,包含PC消息、短信消息、邮件消息、微信消息等,无需让所有开发者了解消息是怎么发送出去的,只需了解消息发送接口即可。 所有推送消息均通过 MsgPushUtils 工具类发...

ThinkGem
今天
6
0
OpenML

https://www.openml.org/search?type=data

shengjuntu
今天
2
0
java强引用,软引用,弱引用和虚引用

先来简要说一下这四种引用的特性: 强引用:如果一个对象具有强引用,那垃圾回收器绝不会回收它 软引用:如果一个对象只具有软引用,则内存空间足够,垃圾回收器就不会回收它 弱引用:在垃圾...

woshixin
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部