文档章节

五、Storm入门之Bolt

datapro
 datapro
发布于 2015/06/18 01:23
字数 1300
阅读 234
收藏 0

正如你已经看到的,bolts是一个Storm集群中的关键组件。你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子。

Bolt生命周期

Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出。实现一个bolt时,通常需要实现IRichBolt接口。Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机。然后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。

NOTE:要创建一个bolt对象,它通过构造器参数初始化成员属性,bolt被提交到集群时,这些属性值会随着一起序列化。

Bolt结构

Bolts拥有如下方法:

declareOutputFields(OutputFieldsDeclarer declarer)
{
    为bolt声明输出模式
}
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
{

   仅在bolt开始处理元组之前调用
}
execute(Tuple input){
处理输入的单个元组
}
cleanup(){
   在bolt即将关闭时调用
}

下面看一个例子,在这个例子中bolt把一句话分割成单词列表:

class SplitSentence implements IRichBolt {

   private OutputCollector collector;

   publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {

       this.collector = collector;

   }

   public void execute(Tuple tuple) {

       String sentence = tuple.getString(0);

       for(String word : sentence.split(" ")) {

           collector.emit(new Values(word));

       }

   }

   public void cleanup(){}

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields("word"));

   }}

正如你所看到的,这是一个很简单的bolt。值得一提的是在这个例子里,没有消息担保。这就意味着,如果bolt因为某些原因丢弃了一些消息——不论是因为bolt挂了,还是因为程序故意丢弃的——生成这条消息的spout不会收到任何通知,任何其它的spoutsbolts也不会收到。

然而在许多情况下,你想确保消息在整个拓扑范围内都被处理过了。

可靠的bolts和不可靠的bolts

正如前面所说的,Storm保证通过spout发送的每条消息会得到所有bolt的全面处理。基于设计上的考虑,这意味着你要自己决定你的bolts是否保证这一点。

拓扑是一个树型结构,消息(元组)穿过其中一条或多条分支。树上的每个节点都会调用ack(tuple)fail(tuple),Storm因此知道一条消息是否失败了,并通知那个/那些制造了这些消息的spout(s)。既然一个Storm拓扑运行在高度并行化的环境里,跟踪始发spout实例的最好方法就是在消息元组内包含一个始发spout引用。这一技巧称做锚定(译者注:原文为Anchoring)。修改一下刚刚讲过的SplitSentence,使它能够确保消息都被处理了。

class SplitSentence implenents IRichBolt {

   private OutputCollector collector;

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

       this.collector = collector;

   }

   public void execute(Tuple tuple) {

       String sentence = tuple.getString(0);

       for(String word : sentence.split(" ")) {

           collector.emit(tuple, new Values(word));

       }

       collector.ack(tuple);

   }

   public void cleanup(){}

   public void declareOutputFields(OutputFieldsDeclarer declarer){

       declar.declare(new Fields("word"));

   }}

锚定发生在调用collector.emit()时。正如前面提到的,Storm可以沿着元组追踪到始发spoutcollector.ack(tuple)collector.fail(tuple)会告知spout每条消息都发生了什么。当树上的每条消息都已被处理了,Storm就认为来自spout的元组被全面的处理了。如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。

NOTE:你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。

当然了spout需要考虑消息的失败情况,并相应的重试或丢弃消息。

NOTE:你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。

多数据流

一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。

多锚定

为了用bolt连接或聚合数据流,你需要借助内存缓冲元组。为了在这一场景下确保消息完成,你不得不把流锚定到多个元组上。可以向emit方法传入一个元组列表来达成目的。

...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...

通过这种方式,bolt在任意时刻调用ackfail方法,都会通知消息树,而且由于流锚定了多个元组,所有相关的spout都会收到通知。

使用IBasicBolt自动确认

你可能已经注意到了,在许多情况下都需要消息确认。简单起见,Storm提供了另一个用来实现bolt的接口,IBasicBolt。对于该接口的实现类的对象,会在执行execute方法之后自动调用ack方法。

class SplitSentence extends BaseBasicBolt {

   public void execute(Tuple tuple, BasicOutputCollector collector) {

       String sentence = tuple.getString(0);

       for(String word : sentence.split(" ")) {

           collector.emit(new Values(word));

       }}

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields("word"));

   }
 }

NOTE:分发消息的BasicOutputCollector自动锚定到作为参数传入的元组。

本文转载自:http://ju.outofmemory.cn/entry/69331

datapro
粉丝 16
博文 37
码字总数 22063
作品 0
广州
高级程序员
私信 提问
加载中

评论(0)

Storm 入门的Demo教程

Storm介绍 Storm是Twitter开源的分布式实时大数据处理框架,最早开源于github,从0.9.1版本之后,归于Apache社区,被业界称为实时版Hadoop。随着越来越多的场景对Hadoop的MapReduce高延迟无法...

虚无境
2018/03/16
0
0
2018-11-01期 Storm组件概述

一、Storm简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。 Storm 很简单,可用于任...

JackmaSong
2018/11/02
0
0
Storm笔记整理(一):简介与设计思想

[TOC] 实时计算概述 有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统...

xpleaf
2018/04/12
0
0
探秘Hadoop生态13:初探Storm和入门实例

这位大侠,这是我的公众号:程序员江湖。 分享程序员面试与技术的那些事。 干货满满,关注就送。 Storm:最火的流式处理框架 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获...

你的猫大哥
2017/03/08
0
0
一篇文带你快速起步Apache Storm

作者介绍 杜亦舒,创业中,技术合伙人,喜欢研究分享技术。个人订阅号:性能与架构。 本文介绍了 Apache Storm 的基本原理和开发方法,包括一个 PDF 和 2 个示例的源码。 内容大纲: Storm是...

杜亦舒
2017/07/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

2020 年,CEO 们对管理的理解又要变了

摘要 看到「工作流」这一层,才能看到「执行力」和「创造力」这两个大词儿,到底是怎么实现的。 从疫情爆发开始,极客公园前沿社的企业家微信群就没消停过。 群里的话题从一开始的动用能力向...

osc_7cws6vmd
17分钟前
25
0
redisson集群转载

转载:https://www.jianshu.com/p/f302aa345ca8 参考:https://segmentfault.com/a/1190000016976564 前几天发的一篇文章《Redlock:Redis分布式锁最牛逼的实现》,引起了一些同学的讨论,也...

hexiaoming123
19分钟前
48
0
postman(待续)

postman

osc_n1fa9m3z
19分钟前
17
0
jmeter(待续)

jmeter

osc_15fvklrg
20分钟前
19
0
createSQLQuery().list()查出的时间没有时分秒

在开发过程中发现提取Oracle中Date类型的数据, 开始发现本地运行的时候有时分秒, 2020-04-09 13:26:20.0 部署在服务器就没有时分秒, 2020-04-09 找了代码和jab包都没有发现差异,在网上搜...

zb1021
20分钟前
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部