文档章节

storm原理剖析

loda0128
 loda0128
发布于 2017/03/20 23:05
字数 2942
阅读 471
收藏 26

为什么用Storm

storm是一个免费、开源的分布式实时计算框架。它让你更方便、可靠的处理实时发送的消息。如果你之前了解过hadoop,应该知道hadoop能很快速、方便的帮你完成批量数据处理,而storm可以认为是实时数据处理领域的hadoop。storm简单,虽然他是用jvm之上的clojure编写的,但是同样支持非jvm语言。

如果你不知道是否该使用storm,你可以先看看你有没有过这些需求:

  1. 实时数据分析
  2. 在线机器学习
  3. 实时计算
  4. 分布式rpc框架

如果你有其中某项需求,那么恭喜你,storm可以帮到你。storm性能好、可伸缩性强、容错能力好,并且能保证消息的可靠性。这些特点足以你拥有使用storm的理由。

介绍

要了解storm,首先需要了解这些概念:

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers

Topologies

Topology

storm中的实时处理的应用会被打包成topology,这个topology由一系列stream(数据流)、spout(数据流生产者)、bolt(数据处理逻辑)组成。类似hadoop中做mapreduce的job,有个区别就是mapreduce job会结束,而topology只要你不手动kill掉,它永远也不会结束。

Streams

Stream是数据流,有无穷无尽的tuple组成,而tuple则包含了用户发送的具体数据,比如整数、小数、字符串等,也可以包含自定义的数据类型,前提是你要为它实现序列化。

Spouts

Spout是数据流Stream的生产者。通常spout会从外部数据源(kafka等)读取数据tuple,并将它emit(发送)到topology中。

Spout中最主要的方法是nextTuple。nextTuple通常会生成一个新的tuple,然后emit到topology。由于storm会在一个线程中调用所有spout的nextTuple方法,所以千万不要让这个方法阻塞掉。尽量保持spout只处理数据的发送,不要让它处理业务逻辑。

Bolts

Bolt处理topology中所有的运算、业务逻辑,如果逻辑复杂,通常使用多个bolt也能很好的解决。bolt会订阅spout或者其他bolt发送的tuple,而整个应用可能会有多个spout和bolt,他们组成一起就会形成一个图装结构,也就是topology。

bolt中最主要的方法是execute,它会从订阅的spout或者bolt获取tuple,从tuple从取出数据,做响应的逻辑处理,然后生成新的tuple给emit出去。如果这个bolt是topology中最后一个bolt节点,就没有必要继续emit,而是自己来处理数据的归属。

Stream groupings

Stream groupings就是数据流分组,它定义了tuple该如何分发给bolt中不同的task。比如,一个topology中有ASpout和订阅了Apout的Abolt,为了保持并发量,给Abolt设置了4个task。数据流分组会决定Aspout发送出来的tuple,会怎样分配到4个task中

目前storm定义了八种不同的分组方式:

  1. Shuffle grouping:随机分组。随机分配给不同的task,保证最后每个task接受到的tuple数量均等
  2. Fields grouping:按字段分组。比如tuple中存在名为user-id的字段,那么所有该字段所在的tuple都会被分配到同一个task上。
  3. Partial Key grouping:部分key分组。同字段分组,唯一的区别是,它会在不同的task之间做负载均衡,保证tuple均匀分配。
  4. All grouping:全复制分组。将tuple复制后发给所有订阅的bolt,这种会导致网络传输量较大,小心使用。
  5. Global grouping:全局分组。将tuple发送给id最小的task。
  6. None grouping:不分组。目前实现上等同随机分组。
  7. Direct grouping:指向型分组。通过emitDirect(id,tuple)发给指定id的task。
  8. Local or shuffle grouping:本地或者随机分组。如果同一个work内有目标bolt的task,会在这几个task中做随机分发。其他情况下,采用随机分组方式。这种分组实现的目的是减少网络传输,尽量选择本地的task做随机分发,如果没有再选择远程task。

Reliability

storm提供了可靠的和不可靠的实时处理方式,需要自己通过api指定。通过追踪tuple树中的消息传递,spout可以保证一旦消息丢失或者传送超时,就会重发。具体可以参见后面描述。

Tasks

每个spout和bolt都可以被分解成多个task,运行在不同的线程中,通过并发执行保持高效。对应的api为TopologyBuilder的setSpout和setBolt方法。

Workers

topology可以运行在多个worker进程中,每个worker进程都是一个独立的jvm,每个进程里面运行着很多task

storm如何序列化

Storm采用Kryo作为序列化框架。默认情况下,Storm支持基本数据类型, strings, byte arrays, ArrayList, HashMap, HashSet,以及Clojure的集合类型。如果你希望在tuple中存储自定义数据类型,保证它能在topology中传递,你就需要注册自定义数据类型。

官网提供了两种注册方式。

  1. 配置文件
  2. Config对象的registerSerialization方法

任选一种方式,将自定义数据结构注册进去,就能使用FieldsSerializer来序列化(kryo提供),否则就要自己提供序列化方式。

如果tuple中存储的数据没有注册过,就会采用默认java序列化方案,如果它无法没java序列化方案处理,storm会抛出异常。为了性能考虑,如果存在自定义数据,最好使用storm提供的方案注册,采用kryo等优秀的序列化方案。否则,java序列化的性能开销非常大。

storm并发机制

要理解storm的并发机制,首先得理解下面几个概念:

  1. Workers:每个woker是topology中独立的jvm进程
  2. Executors:executors是woker中运行的线程,执行具体task
  3. Tasks:每个task相当于spout或者bolt实例

并发topology

如图所示,该topology设置了2个wokers,10个executors,12个task。这些资源会被平均分配。

其中,executors数量不能多于task数量,这样就保证了每个executor至少会分配到一个task。默认情况executors数量等于task数量。

如果用户希望改变task并行能力,可以通过改变executors数量来实现。之所以没有仅仅使用task来代表线程,而是引入executor,主要是考虑到在一个运行的topology中,task数量无法改变,因为一旦改变,可能导致Fields grouping这种分组方式出现bug。

举个例子,用户希望消息A分配到某个task,并且以后都由这个task接收,那采用Fields grouping可以将获取消息id,取n = hash(id) % task数量,n就是A要去的task编号。如果运行期间可以修改task数量,那么n可能会发生变化,带来的影响就是A会跑去另外一个task。很明显,这是不允许的。

随着executor的引入,用户可以根据自己的需求,在topology运行时调整task的并行能力,更加自由灵活。(不过jstorm取消了executor这个语义,转而采用task来代表任务和线程,主要考虑到storm这种模型的实现复杂性与收益微小性,往往大部分人采用默认配置)

storm可靠性保证

storm提供了三个级别的消息处理保障机制:

  1. 尽量保障消息发送
  2. 保障消息至少发送一次
  3. 保障消息发送并且仅发一次

尽量保障消息发送

这是最简单的模式,就是发送消息,丢了就不做处理。

保障消息至少发送一次

storm提供了一种api保证每个tuple都会被完整的处理。要保证storm的消息可靠性,就得保证spout和bolt两个角色的可靠性。

  • spout可靠性:在nextTuple中通过SpoutOutputCollector来emit消息的时候加上消息id,如
_collector.emit(new Values("field1", "field2", 3) , msgId);

加上msgId就将emit出去的tuple打上了标识,一旦tuple在timeout(默认为30s)时间范围内被完全处理,系统就会调用ack(msgId),表示表示已经被完整的处理过了,否则就调用fail(msgId)做重发处理。(ack和fail方法都得自己实现,通常ack回复数据来源消息已经被处理,fail做重发操作

  • bolt可靠性:通常bolt都会读入tuple,接着取出数据,最后emit新的tuple。bolt要保证可靠性,首先 需要在emit时锚定读入的tuple和新生成的tuple。

    	_collector.emit(tuple, new Values(word));
    

    接着根据消息处理成功或者失败的情况分别做ack或者fail调用。

    	//_collector是SpoutOutputCollector的对象
    	if(success){
    		_collector.ack(tuple);
    	}else{
    		_collector.ack(tuple);
    	}
    

那么,storm是怎样保障消息的可靠性的呢?

要了解实现原理,首先得有tuple树的背景知识,通过下面一幅图来看看tuple树的处理流程。

tuple树

假设图中A是Aspout发送的tuple,BC是Bbolt发送的tuple,DE是Cbolt发送的tuple,Bbolt订阅Aspout,Cbolt订阅Bbolt。

  1. 当Aspout emit A时,会将A加入tuple树,当Bbolt接收到A,锚定新生成的tuple B和C时,会将B和C加入tuple树,最后ack(A),于是A在tuple树中标记为已处理。目前的状态就是上图左边所示。
  2. 当Cbolt接收到C以后,经过处理,会锚定新生成的D和E,接着ack(C),于是C也被标记为已处理。目前的状态就是上图右边所示。

storm的topology中会运行名为acker的任务,acker会监控这个tuple树,当发现tuple树中所有的tuple都被完整的处理过了,并且没有新的tuple生成,就会调用spout的ack方法,表示消息被成功处理。否则就调用fail方法。

当topology越大,tuple树也就越大,完整的在内存生成这个tuple树并跟踪它是不现实的。storm通过一个很巧妙的方式,来实现对tuple树的监控。 acker存储了一个64位的数字,名为ack val。当新加入一个tuple时,会生成一个64位随机数字作为id,让tuple被emit时,会将更新ack val为ack val xor id。当ack(tuple)时,也会按同样的方法更新ack val。根据xor的原理,如果acker发现最后ack val的值变为0,则说明所有生成的tuple都被ack,也就是tuple树中所有的tuple都已经被处理了。通过这种方式,storm保证了acker能高效的识别消息是否被完整的处理。

保障消息发送并且仅发一次

由storm的高级api Trident来保证消息不会丢失,并且不会多发。具体信息本章不会描述,会在后面继续补充。

storm高可用性(HA)

  • 如果woker挂了,supervisor会重新创建
  • 如果机器节点挂了,nimbus会把该节点上的task转移到其他节点
  • 如果nimbus或者supervisor挂了,重启就行了。nimbus和supervisor被设计成无状态,状态都被存到zookeeper里面了
  • 为防止nimbus挂掉,worker节点也挂掉,导致任务无法被nimbus转移到其他机器。nimbus也被设计成HA的,利用主从结构保证主节点挂了之后从节点一样能服务

学习资料

© 著作权归作者所有

上一篇: druid.io剖析
loda0128
粉丝 93
博文 96
码字总数 119400
作品 0
朝阳
程序员
私信 提问
Storm同步调用之DRPC模型探讨

  摘要:Storm的编程模型是一个有向无环图,决定了storm的spout接收到外部系统的请求后,spout并不能得到bolt的处理结果并将结果返回给外部请求。所以也就决定了storm无法提供对外部系统的同...

刘洋intsmaze
2017/09/28
0
0
Apache Storm 0.9.6/0.10.0 发布

Apache Storm 0.10.0 发布,此版本是个稳定版本,相比之前的 Beta 版本主要包括 bug 修复和改进: STORM-1108: Fix NPE in simulated time STORM-1106: Netty should not limit attempts to...

oschina
2015/11/06
4.7K
8
storm启动卡着不动,求高手

刚把storm集群部好,结果启动时卡着不动。 执行:./storm nimbus 提示: [root@WY_CentOS_100G bin]# ./storm nimbus Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/src......

lja
2013/07/11
2.2K
1
Storm概念讲解和工作原理介绍

Strom的结构 Storm与传统关系型数据库 传统关系型数据库是先存后计算,而storm则是先算后存,甚至不存 传统关系型数据库很难部署实时计算,只能部署定时任务统计分析窗口数据 关系型数据库重...

张超
2015/04/26
2.8K
0
Apache Slider + Storm

Apache Slider + Storm 系统环境 安装如下组件,部署可用环境 JDK 1.7.0_79 Apache Zookeeper 3.4.* Apache Zookeeper Apache Hadoop 2.6.* Apache Hadoop Apache Storm 0.9.4 Apache Storm......

Yulong_
2016/09/21
415
0

没有更多内容

加载失败,请刷新页面

加载更多

64.监控平台介绍 安装zabbix 忘记admin密码

19.1 Linux监控平台介绍 19.2 zabbix监控介绍 19.3/19.4/19.6 安装zabbix 19.5 忘记Admin密码如何做 19.1 Linux监控平台介绍: 常见开源监控软件 ~1.cacti、nagios、zabbix、smokeping、ope...

oschina130111
今天
13
0
当餐饮遇上大数据,嗯真香!

之前去开了一场会,主题是「餐饮领袖新零售峰会」。认真听完了餐饮前辈和新秀们的分享,觉得获益匪浅,把脑子里的核心纪要整理了一下,今天和大家做一个简单的分享,欢迎感兴趣的小伙伴一起交...

数澜科技
今天
7
0
DNS-over-HTTPS 的下一代是 DNS ON BLOCKCHAIN

本文作者:PETER LAI ,是 Diode 的区块链工程师。在进入软件开发领域之前,他主要是在做工商管理相关工作。Peter Lai 也是一位活跃的开源贡献者。目前,他正在与 Diode 团队一起开发基于区块...

红薯
今天
10
0
CC攻击带来的危害我们该如何防御?

随着网络的发展带给我们很多的便利,但是同时也带给我们一些网站安全问题,网络攻击就是常见的网站安全问题。其中作为站长最常见的就是CC攻击,CC攻击是网络攻击方式的一种,是一种比较常见的...

云漫网络Ruan
今天
12
0
实验分析性专业硕士提纲撰写要点

为什么您需要研究论文的提纲? 首先当您进行研究时,您需要聚集许多信息和想法,研究论文提纲可以较好地组织你的想法, 了解您研究资料的流畅度和程度。确保你写作时不会错过任何重要资料以此...

论文辅导员
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部