文档章节

storm-相关知识点总结

zhizhuode
 zhizhuode
发布于 2017/07/03 15:41
字数 3402
阅读 53
收藏 0

一、Storm topology提交到集群分析 storm目前1.x版本支持nimbus的高可用(其实也可以不需要高可用,因为nimbus是无状态的,只要运行的topology没有故障且没有新的任务需要提交到storm集群,那么也可以不要nimbus,因为是worker在运行任务,nimbus只是负责任务分配,资源调度且和supervisor保持心跳我们可以做好整个集群的监控即可,当nimbus挂了后,直接重启它,不会影响正在运行的topology)。 当我们将topology提交到storm集群的时候,如果你搭的环境是一套高可用的环境,首先需要找到leader nimbus节点,因为需要向leader节点提交我们的拓扑,当调用submitTopology的时候,首先会进行相关的配置校验,然后找到配置的nimbus的ip,循环找到为leader的ip地址将其构建成一个NimbusClient返回,然后开始提交jar,我们可以把这个过程抽象成,我们是thift client通过RPC向server提交jar,jar上传完成后,通知Nimbus上传任务已经完成,nimbus接收到jar后将jar重命名后保存到inbox目录下,然后进行配置检查,任务分配。 nimbus首先会检查它的配置信息,以及整个storm集群中可用的slots(可用理解为worker), 第一步:检查整个集群中可用的worker,然后根据配置的worker数分配 第二步:在zk中创建任务的心跳检测节点/storm/workbeats/,storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。任务将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。 第三步:开始分配任务,根据topology配置的task数和worker数,进行分配 3.1 如果task数目比worker数多 比如task 为4个,worker为2个 task1,task2,taks3,task4 worker1,woker2, 那么效果就是task1到worker1,task2到woker2,这样依次轮询 3.2 如果worker数比task数目多 比如task5个,worker 8 个 基本会轮询保证task不会全部分配到同一个worker上 第四步:supervisor下载分配给自己的topology下载完成后然后运行。

二、高可用测试 storm nimbus的单点问题解决了在1.版本,主要理由zk的Ledaer选举实现,其实主要是依靠分布式互斥锁来实现,我们可以将zk的一个数据节点代表一个锁,当多个客户端同时调用create()节点创建节点的时候,zookeeper会保证只会有一个客户端创建成功,那么我们就可以让这个创建成功的客户端让其持有锁,而其它的客户端则注册Watcher监听,当持有锁的客户端释放锁后,监听的客户端就会收到Watcher通知,然后再去试图获取锁。 1、直接停掉nimbus leader :已经运行的任务不会有任何问题     2、任务启动过程中,立刻停掉nimbus leader
如果此时新Leader还没选举出来,任务提交会失败 如果新的Leader已经产生了,任务提交成功 3、关闭worker所在机器的supervisor守护进程(默认3秒检测一次worker进程)--> Kill -9 $workerPid ---> 关闭Nimbus Leader --->worker没有重新启动/飘走-->1分钟后启动supervisor --->worker飘移走 4、storm的高可用利用分布式缓存API来实现数据的备份,实现文件在多个topology之间的共享。

三、storm的主要特点 1、水平扩展:可以直接通过加机器来提高整个集群处理任务的速率,不仅可以直接添加机器作为supervisor节点,而且可以动态调整并发度来改变运行某个topology的线程数 2、容错性比较好:当Nimbus挂了,可以通过高可用来选举出一个新的Leader(当然nimbus挂了也不是什么大问题,因为运行任务不在nimbus节点上),当supervisor挂了,nimbus会通过心跳检测到,然后会将运行在这个supervisor节点上的所有任务转移到其它节点上,当worker挂了,即运行任务的工作进程挂了,首先supervisor会尝试重启它,如果没启动成功,那么nimbus就会将运行在worker上的任务转移到其它可用的worker节点上运行。 3、消息的可靠性处理:storm默认提供3种类型, 仅仅一次:原生storm api无法实现,需要用的storm 高级部分trident实现
最多一次:也就是消息最多只发一次,不管这条消息有没有被成功处理 至少一次:消息如果处理失败后,会重发 4、节点的无状态:状态都保存在zk里面,当我们使用kill -9 $nimbus_pid的时候,不影响整个集群的运行。

四、运行Storm的相关术语 1、topology:拓扑,运行一组spout/bolt构成的计算逻辑组件的总称 2、spout:storm消息来源,一般的话是消息队列 3、bolt:处理消息逻辑的组件,它的上游可以是spout也可以是Bolt 4、tuple:运行的消息单元,比如句子 zhangsan is a man 这个消息就是一个tuple 5、Stream:流,由一条条消息组织的,抽象成流 6、Stream Grouping:流分组,定义各个计算组件之间流的连接,分组,分发策略等

五、看一个简单的例子(测试storm的容错性) 从消息队列Kafka接收消息,然后进行单词拆分,然后统计单词出现的次数.起一个线程从消息队列收消息,3个线程进行单词拆分,1个线程进行全局计数 1、如果nimbus挂了怎么办? 2、如果supervisor 挂了怎么办? 3、如果worker挂了怎么办? 4、节点挂了怎么办? 第一个问题:nimbus是无状态的,nimbus挂了不影响整个集群中已经在运行的topology的运行。 第二个问题:supervisor守护进程挂了,没有影响。 第三个问题:worker挂了怎么办? supervisor会尝试重启,如果没有启动成功,则发生worker转移,转移到其它可用的节点上运行。 第四个问题:Node节点挂了怎么办? 这个实际上也可用理解为worker挂了,也会发生worker转移。

六、storm的并发度 一个topology可用跑在多个worker进程上,自然可用想的在一个进程里面,我们可以跑多个线程,再一次加快数据的处理效率,默认情况下,如果我们不设置线程数,就是1,且默认情况下,一个线程对应一个task任务(task可以理解为spout/bolt实例) 其中,每一个worker默认起一个acker,也就是说每一个worker默认起一个线程来执行acker,而acker实际上就是一个特殊的bolt,所以在计算整个集群线程数的时候,需要加上acker

七、流分组 流分组定义了上层tuple如何路由到下一层tuple,storm里面默认有以下几种。注意,首先下层必须有多个线程,不然一个线程定义流分组没有意义。 1、shuffleGrouping : 轮询 2、filedGrouping:按照上一层的输出属性,相同属性的会被路由到同一个线程处理 3、allGrouping:广播分组,对于每一个tuple,所有的Bolt都会收到 4、globalGrouping:全局分组,只会到bolt里面其中task-id最小的那一个 5、noGrouping:不分组,随机 6、DirectGrouping:直接分组上一层可以直接指定想把该消息路由到下一层的哪个bolt 7、自定义分组 8、localorshuffleGrouping:如果上层bolt和下层bolt运行在同一个进程里面,优先在进程内通信,使用Disruptor有界队列

八、storm的可靠性测试 8.1: 消息的ack 当一个spout数据源发射了消息后,一个tuple经过了所有的Bolt处理之后,这个tuple才被认为是处理完了,而acker就是一个用来追踪这些消息的组件, acker内部类似于一个map,维护了消息id和顶层spoutid直接的关系
1、如果task挂了,一个tuple没有被ack,那么会在超时之后(30s)spout负责重发 2、如果acker挂了,那么由这个tuple追踪的所有的tuple都会超时,也会被重发 3、那么spout挂了,第三方消息源负责重发 8.2 如何实现可靠的spout storm的bolt和spout有很多接口和抽象类,我们可以实现ISpout接口,首先需要注意的是spout里面,nextTuple,ack,fail是运行在同一个线程里面,所有不要在nextTuple里面执行一些比较耗时的处理,默认30s超时,如果acker-30s还没有收到那么就会超时重发,且nextTuple不能阻塞,如果没有消息发射,他会sleep一下,释放cpu,而如果我们想实现可靠的spout,需要自己维护msgId和消息直接的对应关系,可以放在map,redis里面,并且自己实现fail函数进行消息重发。 8.3 如何实现可靠的bolt storm提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt都可以实现消息的可靠性处理,其中BaseBasicBolt,storm已经帮我们内部自己实现了消息的可靠处理,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack 。 而BaseRichBolt则需要我们自己维护msgid和tuple之间的关系并手动实现ack或者fail 下面来分析一下BaseBasicBolt是怎么实现了,当我们调用emit的时候,由于没有streamId和tuple之间的对应关系,storm会给我们自己生成一个默认的流id,最后会起一个BasicBoltExecutor,这里面的excetue方法自己给我们实现了ack和失败后的fail。 8.4 针对storm提交的3种消息保证语义,在ack的基础上来看一下如何实现 8.4.1、如何实现最多一次(下面几个条件只要有一个满足,都只能实现最多一次语义) 1)将ack的个数设置为0 2)Spout不实现可靠的消息处理 不带msgId或者不实现fail函数 3)bolt不把处理的消息发送给acker 8.4.2 如何实现至少一次 1)开启ack机制,即ack数目大于0(默认每个worker一个ack) 2)Spout 实现可靠性传输保证 Spout 发送消息时附带 message 的 ID 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数 3)Bolt 在处理成功或失败后需要调用相应的方法通知 Acker
8.4.3 实现仅仅一次 storm原生api不能做到,要实现仅仅一次,需要存储tuple的状态,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现

九、思考? 1、storm消息重发,重发的消息从哪来?重复的消息怎么处理? storm需要维护msgId和具体tuple之间的关系
2、如果一个bolt总是处理失败导致spout重发,维护的数据越来越多,最后OOM,怎么办?
storm在以前的版本里面可以设置max.pending数(前提条件是必须开启ack),即如果spout组件发现还有这么多消息没有给他ack,当达到这个阀值的时候,就不会在接着发tuple给bolt PS:这种设置方法个人感觉值得商榷,首先这个值应该设置多少,设置小了,那么整个集群的吞吐量上不去,设置大了很有可能导致内存溢出 3、多个bolt,其中一个bolt处理失败,其它bolt处理成功,重发的时候,处理成功的bolt如何回滚?

十、事物 storm事物在0.7版本后封装到trident中,虽然原生的事物api已经废弃,但是对于我们理解高级部分trident还是非常有帮助的。事物在storm里面主要分为2个节点,事物的处理阶段和提交阶段,事物的处理阶段是可以并行处理的,但是是提交阶段必须按照事物id依次排队处理。 storm里面事物主要分为3种,一种是普通的事物,一种是分区事物,一种是不透明事物(事物的更高级封装) 1、普通事物ITransactionSpout接口,事物处理阶段起多个线程,提交阶段起1个线程 Coordinator:初始化事物 Emitter:emitBatch接收初始化事物生成的事物元数据,发送出去 2、分区事物IPartitionTransactionSpout接口,主流的事物Spout Coordinator:isReady和返回分区个数 Emitter: emitPartitionBatchNew 发送新分区的元数据 emitPartitionBatch 事物处理失败后重发 3、不透明事物IOpaqueTransactionSpout,容错性最好的事物Spout 它不区分每一批是不是发送的tuple是一样的,因为即使某个分区不可用,它还会继续发送可用的分区,等其他不可用的分区可用后,这些分区里面的数据被放到下一个事物batch里面执行

© 著作权归作者所有

zhizhuode
粉丝 1
博文 40
码字总数 39020
作品 0
海淀
高级程序员
私信 提问
加载中

评论(0)

Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli
2015/06/18
0
0
这篇Storm入门,你不可能看不懂

前言 只有光头才能变强。 文本已收录至我的GitHub精选文章,欢迎Star:https://github.com/ZhongFuCheng3y/3y 听说过大数据的同学应该都听说过Storm吧?其实我现在负责的系统用的就是Storm,...

Java_3y
05/11
0
0
花了几天入门Storm,上了一版,全是Bug

前言 只有光头才能变强。 文本已收录至我的GitHub精选文章,欢迎Star:https://github.com/ZhongFuCheng3y/3y 听说过大数据的同学应该都听说过Storm吧?其实我现在负责的系统用的就是Storm,...

Java3y
05/11
0
0
[电子书]Mastering Apache Storm PDF下载

阿里云智能数据库事业部 OLAP 平台团队现向 2021 届海内外院校应届毕业生(毕业时间在2020年11月-2021年10月之间)招聘大数据系统工程师,有机会转正,有意请加 iteblog 微信内推。 本书于2...

电子书
2017/08/30
0
0
亿级流量场景下,大型架构设计实现【2】---storm篇

正文前先来一波福利推荐: 福利一: 百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。 福利二:...

osc_s8kmhvea
2019/03/17
4
0

没有更多内容

加载失败,请刷新页面

加载更多

Python基础-04元组

4.元组     元组的主要特性为: 1.元组在创建之后,具有不可以更改的特性,因此不能直接给元组的元素赋值 2.元组的元素类型可以为任意类型,如字典、字符串、列表等 3.元组常用于在程序的整...

osc_b2jll5m6
38分钟前
22
0
怎么在流程图中插入超链接?迅捷画图带你两步解决!

怎么在流程图中插入超链接?如何在WORD中插入超链接很多人都知道,但是对于陌生的流程图,很多人在进行流程图展示和讲解的时候,都会选择提前将需要的网页打开,然后手动进行更换。 这种手动...

真不莲
38分钟前
19
0
直播中音视频处理的一般流程

数据采集→数据编码→数据传输(流媒体服务器) →解码数据→播放显示 1、数据采集: 摄像机及拾音器收集视频及音频数据,此时得到的为原始数据 涉及技术或协议: 摄像机:CCD、CMOS 拾音器:声...

图玩智能科技
39分钟前
27
0
IntelliJ中的main函数和System.out.println()快捷键

https://blog.csdn.net/shijiebei2009/article/details/44726433

诗书易经
39分钟前
19
0
python 数据可视化实战(1)折线图绘制

  本篇博客新开一个数据分析后的数据可视化的例子讲解,每一篇博客是一个例子。   这节课学习如何绘制一个折线图。题目如下:   代码如下: import matplotlib.pyplot as pltimport m...

osc_xdc1vjza
39分钟前
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部