storm1.x介绍
storm1.x介绍
空中小鸟 发表于2个月前
storm1.x介绍
  • 发表于 2个月前
  • 阅读 5
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云实验室 1小时搭建人工智能应用,让技术更容易入门 免费体验 >>>   

storm目前1.x版本支持nimbus的高可用(其实也可以不需要高可用,因为nimbus是无状态的,只要运行的topology没有故障且没有新的任务需要提交到storm集群,那么也可以不要nimbus,因为是worker在运行任务,nimbus只是负责任务分配,资源调度且和supervisor保持心跳我们可以做好整个集群的监控即可,当nimbus挂了后,直接重启它,不会影响正在运行的topology)。 当我们将topology提交到storm集群的时候,如果你搭的环境是一套高可用的环境,首先需要找到leader nimbus节点,因为需要向leader节点提交我们的拓扑,当调用submitTopology的时候,首先会进行相关的配置校验,然后找到配置的nimbus的ip,循环找到为leader的ip地址将其构建成一个NimbusClient返回,然后开始提交jar,我们可以把这个过程抽象成,我们是thift client通过RPC向server提交jar,jar上传完成后,通知Nimbus上传任务已经完成,nimbus接收到jar后将jar重命名后保存到inbox目录下,然后进行配置检查,任务分配。 nimbus首先会检查它的配置信息,以及整个storm集群中可用的slots(可用理解为worker), 第一步:检查整个集群中可用的worker,然后根据配置的worker数分配 第二步:在zk中创建任务的心跳检测节点/storm/workbeats/,storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。任务将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。 第三步:开始分配任务,根据topology配置的task数和worker数,进行分配 3.1 如果task数目比worker数多 比如task 为4个,worker为2个 task1,task2,taks3,task4 worker1,woker2, 那么效果就是task1到worker1,task2到woker2,这样依次轮询 3.2 如果worker数比task数目多 比如task5个,worker 8 个 基本会轮询保证task不会全部分配到同一个worker上 第四步:supervisor下载分配给自己的topology下载完成后然后运行。

二、高可用测试 storm nimbus的单点问题解决了在1.版本,主要理由zk的Ledaer选举实现,其实主要是依靠分布式互斥锁来实现,我们可以将zk的一个数据节点代表一个锁,当多个客户端同时调用create()节点创建节点的时候,zookeeper会保证只会有一个客户端创建成功,那么我们就可以让这个创建成功的客户端让其持有锁,而其它的客户端则注册Watcher监听,当持有锁的客户端释放锁后,监听的客户端就会收到Watcher通知,然后再去试图获取锁。 1、直接停掉nimbus leader :已经运行的任务不会有任何问题     2、任务启动过程中,立刻停掉nimbus leader
如果此时新Leader还没选举出来,任务提交会失败 如果新的Leader已经产生了,任务提交成功 3、关闭worker所在机器的supervisor守护进程(默认3秒检测一次worker进程)--> Kill -9 $workerPid ---> 关闭Nimbus Leader --->worker没有重新启动/飘走-->1分钟后启动supervisor --->worker飘移走 4、storm的高可用利用分布式缓存API来实现数据的备份,实现文件在多个topology之间的共享。

三、storm的主要特点 1、水平扩展:可以直接通过加机器来提高整个集群处理任务的速率,不仅可以直接添加机器作为supervisor节点,而且可以动态调整并发度来改变运行某个topology的线程数 2、容错性比较好:当Nimbus挂了,可以通过高可用来选举出一个新的Leader(当然nimbus挂了也不是什么大问题,因为运行任务不在nimbus节点上),当supervisor挂了,nimbus会通过心跳检测到,然后会将运行在这个supervisor节点上的所有任务转移到其它节点上,当worker挂了,即运行任务的工作进程挂了,首先supervisor会尝试重启它,如果没启动成功,那么nimbus就会将运行在worker上的任务转移到其它可用的worker节点上运行。 3、消息的可靠性处理:storm默认提供3种类型, 仅仅一次:原生storm api无法实现,需要用的storm 高级部分trident实现
最多一次:也就是消息最多只发一次,不管这条消息有没有被成功处理 至少一次:消息如果处理失败后,会重发 4、节点的无状态:状态都保存在zk里面,当我们使用kill -9 $nimbus_pid的时候,不影响整个集群的运行。

四、运行Storm的相关术语 1、topology:拓扑,运行一组spout/bolt构成的计算逻辑组件的总称 2、spout:storm消息来源,一般的话是消息队列 3、bolt:处理消息逻辑的组件,它的上游可以是spout也可以是Bolt 4、tuple:运行的消息单元,比如句子 zhangsan is a man 这个消息就是一个tuple 5、Stream:流,由一条条消息组织的,抽象成流 6、Stream Grouping:流分组,定义各个计算组件之间流的连接,分组,分发策略等

五、看一个简单的例子(测试storm的容错性) 从消息队列Kafka接收消息,然后进行单词拆分,然后统计单词出现的次数.起一个线程从消息队列收消息,3个线程进行单词拆分,1个线程进行全局计数 1、如果nimbus挂了怎么办? 2、如果supervisor 挂了怎么办? 3、如果worker挂了怎么办? 4、节点挂了怎么办? 第一个问题:nimbus是无状态的,nimbus挂了不影响整个集群中已经在运行的topology的运行。 第二个问题:supervisor守护进程挂了,没有影响。 第三个问题:worker挂了怎么办? supervisor会尝试重启,如果没有启动成功,则发生worker转移,转移到其它可用的节点上运行。 第四个问题:Node节点挂了怎么办? 这个实际上也可用理解为worker挂了,也会发生worker转移。

六、storm的并发度 一个topology可用跑在多个worker进程上,自然可用想的在一个进程里面,我们可以跑多个线程,再一次加快数据的处理效率,默认情况下,如果我们不设置线程数,就是1,且默认情况下,一个线程对应一个task任务(task可以理解为spout/bolt实例) 其中,每一个worker默认起一个acker,也就是说每一个worker默认起一个线程来执行acker,而acker实际上就是一个特殊的bolt,所以在计算整个集群线程数的时候,需要加上acker

七、流分组 流分组定义了上层tuple如何路由到下一层tuple,storm里面默认有以下几种。注意,首先下层必须有多个线程,不然一个线程定义流分组没有意义。 1、shuffleGrouping : 轮询 2、filedGrouping:按照上一层的输出属性,相同属性的会被路由到同一个线程处理 3、allGrouping:广播分组,对于每一个tuple,所有的Bolt都会收到 4、globalGrouping:全局分组,只会到bolt里面其中task-id最小的那一个 5、noGrouping:不分组,随机 6、DirectGrouping:直接分组上一层可以直接指定想把该消息路由到下一层的哪个bolt 7、自定义分组 8、localorshuffleGrouping:如果上层bolt和下层bolt运行在同一个进程里面,优先在进程内通信,使用Disruptor有界队列

八、storm的可靠性测试 8.1: 消息的ack 当一个spout数据源发射了消息后,一个tuple经过了所有的Bolt处理之后,这个tuple才被认为是处理完了,而acker就是一个用来追踪这些消息的组件, acker内部类似于一个map,维护了消息id和顶层spoutid直接的关系
1、如果task挂了,一个tuple没有被ack,那么会在超时之后(30s)spout负责重发 2、如果acker挂了,那么由这个tuple追踪的所有的tuple都会超时,也会被重发 3、那么spout挂了,第三方消息源负责重发 8.2 如何实现可靠的spout storm的bolt和spout有很多接口和抽象类,我们可以实现ISpout接口,首先需要注意的是spout里面,nextTuple,ack,fail是运行在同一个线程里面,所有不要在nextTuple里面执行一些比较耗时的处理,默认30s超时,如果acker-30s还没有收到那么就会超时重发,且nextTuple不能阻塞,如果没有消息发射,他会sleep一下,释放cpu,而如果我们想实现可靠的spout,需要自己维护msgId和消息直接的对应关系,可以放在map,redis里面,并且自己实现fail函数进行消息重发。 8.3 如何实现可靠的bolt storm提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt都可以实现消息的可靠性处理,其中BaseBasicBolt,storm已经帮我们内部自己实现了消息的可靠处理,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack 。 而BaseRichBolt则需要我们自己维护msgid和tuple之间的关系并手动实现ack或者fail 下面来分析一下BaseBasicBolt是怎么实现了,当我们调用emit的时候,由于没有streamId和tuple之间的对应关系,storm会给我们自己生成一个默认的流id,最后会起一个BasicBoltExecutor,这里面的excetue方法自己给我们实现了ack和失败后的fail。 8.4 针对storm提交的3种消息保证语义,在ack的基础上来看一下如何实现 8.4.1、如何实现最多一次(下面几个条件只要有一个满足,都只能实现最多一次语义) 1)将ack的个数设置为0 2)Spout不实现可靠的消息处理 不带msgId或者不实现fail函数 3)bolt不把处理的消息发送给acker 8.4.2 如何实现至少一次 1)开启ack机制,即ack数目大于0(默认每个worker一个ack) 2)Spout 实现可靠性传输保证 Spout 发送消息时附带 message 的 ID 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数 3)Bolt 在处理成功或失败后需要调用相应的方法通知 Acker
8.4.3 实现仅仅一次 storm原生api不能做到,要实现仅仅一次,需要存储tuple的状态,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现

九、思考? 1、storm消息重发,重发的消息从哪来?重复的消息怎么处理? storm需要维护msgId和具体tuple之间的关系
2、如果一个bolt总是处理失败导致spout重发,维护的数据越来越多,最后OOM,怎么办?
storm在以前的版本里面可以设置max.pending数(前提条件是必须开启ack),即如果spout组件发现还有这么多消息没有给他ack,当达到这个阀值的时候,就不会在接着发tuple给bolt PS:这种设置方法个人感觉值得商榷,首先这个值应该设置多少,设置小了,那么整个集群的吞吐量上不去,设置大了很有可能导致内存溢出 3、多个bolt,其中一个bolt处理失败,其它bolt处理成功,重发的时候,处理成功的bolt如何回滚?

十、事物 storm事物在0.7版本后封装到trident中,虽然原生的事物api已经废弃,但是对于我们理解高级部分trident还是非常有帮助的。事物在storm里面主要分为2个节点,事物的处理阶段和提交阶段,事物的处理阶段是可以并行处理的,但是是提交阶段必须按照事物id依次排队处理。 storm里面事物主要分为3种,一种是普通的事物,一种是分区事物,一种是不透明事物(事物的更高级封装) 1、普通事物ITransactionSpout接口,事物处理阶段起多个线程,提交阶段起1个线程 Coordinator:初始化事物 Emitter:emitBatch接收初始化事物生成的事物元数据,发送出去 2、分区事物IPartitionTransactionSpout接口,主流的事物Spout Coordinator:isReady和返回分区个数 Emitter: emitPartitionBatchNew 发送新分区的元数据 emitPartitionBatch 事物处理失败后重发 3、不透明事物IOpaqueTransactionSpout,容错性最好的事物Spout 它不区分每一批是不是发送的tuple是一样的,因为即使某个分区不可用,它还会继续发送可用的分区,等其他不可用的分区可用后,这些分区里面的数据被放到下一个事物batch里面执行

共有 人打赏支持
粉丝 0
博文 39
码字总数 37052
×
空中小鸟
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: