文档章节

Storm tuple发送机制中的重发

DJZhu
 DJZhu
发布于 2017/03/03 14:59
字数 1439
阅读 91
收藏 0

#从一个程序异常说起 最近的一个项目走到线下测试阶段,同事写了一堆测试数据进Kafka,我的代码负责通过KafkaSpout消费消息。结果出现一个很怪异的事情,对方每20秒写10000条消息进Kafka,我的Spout却读到了这样的消息:

1. 20秒统计一次进入Spout的消息数量,第一个20秒正常,有10000条整。(因为写入速度够快,20秒足够完成写入和读取工作)
2. 第二个20秒也正常,10000条消息整。
3. 第三个20秒就开始异常了,数据量在20000~30000之间。
4. 越往后数据量越来越多。

经过一顿排查终于发现问题出在Bolt没有ack()应答上级Spout,导致Spout重发消息。

#BaseRichBolt,BaseBasicBolt,BaseBatchBolt,BaseTransactionalBolt的差别 但是隐隐然记得当初在某个地方还看过介绍说某个Bolt是不需要用户手动ack的,但是这里却因为没有现实调用ack而出错,看来是Storm提供的多个Bolt实现在这方面是有不同点的。借这个机会,就来看看这些Bolt之间的异同点。 先来看看这几个Bolt的继承关系: 这里写图片描述

首先可见BaseTransactionalBolt其实是继承自BaseBatchBolt的一个拓展,有更本质区别的应该是初前者之外剩下的三个类。他们都继承自BaseComponent,并分别实现了IRichBoltIBasicBoltIBatchBolt接口。

##IComponent BaseRichBolt,BaseBasicBolt,BaseBatchBolt三个抽象类直接继承的BaseComponent其实并没有做什么事情。

# BaseComponent.java
package backtype.storm.topology.base;

import backtype.storm.topology.IComponent;
import java.util.Map;

public abstract class BaseComponent implements IComponent {
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }    
}

而它所实现的IComponent核心则定义了两个方法,一个是上面被BaseComponent实现了的getComponentConfiguration(),负责返回Component的配置参数。一个是需要实现类自己定义的declareOutputFields()。方法中声明了该bolt/spout输出的字段个数,供下游使用,在该bolt中的execute方法中,emit发射的字段个数必须和声明的相同,否则报错:

Tuple created with wrong number of fields. Expected 2 fields but got 1 fields。 

当我们使用fieldsGrouping(id,fields)方法来做tuple分发时,Fields的定义也要根绝这里的fields声明来,否则会报错:

“Topology submission exception. (topology name='HelloStorm')
 <InvalidTopologyException InvalidTopologyException(msg:Component: [3] subscribes from
 stream: [default] of component [2] with non-existent fields: #{"ahah1"})>”

BaseComponent的定义很清晰简单,但是几个Bolt的差别却主要体现在他们实现的IxxxBolt接口。

##IBasicBolt vs IRichBolt 三个Bolt刚好实现了三个不同的IxxxBolt,而其中的IBasicBolt和IRichBolt都在backtype.storm.topology包内,可见他们两个还是比较相像的。 两者都有三个抽象方法:

/**
* 在Bolt启动前执行,提供Bolt启动环境配置的入口
*/
void prepare();
/**
* 每次调用处理一个输入的tuple,当然,也可以把tuple暂存起来批量处理。
* 但是!!!千万注意,所有的tuple都必须在一定时间内应答,可以是ack或者fail。否则,spout就会重发tuple。这也正是上文描述的意外事件的根本原因。
* 两个bolt不同的地方在于,`IBasicBolt`自动帮你ack,而`IRichBolt`需要你自己来做。
*/
void exexute();
/**
* 当组件关闭时被调用,但是当supervisor使用`kill -9`强制关闭worker进程时,不能保证这个方法一定会被执行。
*/
void cleanup();

##Spout重发策略的配置 现在我们知道了默认情况下,Spout如果在一定时间内没有发出去的tuple对应的ack,就会触发fail。那么这一套策略是怎么实现的呢?我们又可以做什么配置呢? 且先看看用来发射tuple的SpoutOutputCollector,他的类图很简单: 这里写图片描述

主要提供了以下几个方法:

public List<Integer> emit(String streamId, List<Object> tuple, Object messageId)
public List<Integer> emit(List<Object> tuple, Object messageId)
public List<Integer> emit(List<Object> tuple) 
public List<Integer> emit(String streamId, List<Object> tuple)

public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)
public void emitDirect(int taskId, List<Object> tuple, Object messageId)
public void emitDirect(int taskId, String streamId, List<Object> tuple) 
public void emitDirect(int taskId, List<Object> tuple) 

public void reportError(Throwable error) 

我们发送消息用的emit()在这里提供了四种不同的实现方式,但是要注意的是,只有提供了messageId参数,Storm才会追踪这条消息是否发送成功。而当我们一路追踪KafkaSpout的tuple发射机制,会发现它的底层使用的是:

collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

即它用了自己自定义的一个对象(KafkaMessageId有两个字段,一个是partition一个是offset,可以定位消息以便重发)来作为MessageId。

来到这里,很自然的就会产生几个疑问,每条消息的过期时间怎么定呢?消息过期是不是全部都会被重发呢?重发后如果还不成功怎么办?这是另一个话题了,这里且不谈,只对涉及到的几个参数做一下交代:

topology.max.spout.pending		/*在一个spout task消息发送队列中最多可保存的未ack或者fail的数量,默认为null。如果超过这个值,storm便选择不再发送新数据,知道有消息ack或者fail,腾出队列空间为止。*/
topology.message.timeout.secs	/*消息过期时间,通过public void backtype.storm.Config.setMessageTimeoutSecs(int secs)设置。超时之后storm会调用fail()方法,可以自定义重发或者别的动作。*/

© 著作权归作者所有

DJZhu
粉丝 3
博文 26
码字总数 30529
作品 0
广州
程序员
私信 提问
storm翻译(2):Concepts

Concepts:概念 原文:http://storm.apache.org/documentation/Concepts.html 这个列表展示了storm中的主要概念和相关详细信息。这些概念是: Topologies Streams Spouts Bolts Stream group...

岩之有理
2015/02/10
169
0
六、Storm的高级原语之Transactional Topology

1、什么是Transactional Topology? ○ 是一个每个tuple仅被处理一次的框架 ○ 由Storm0.7引入,于Storm0.9被弃用,被triden取而代之 ○ 底层依靠spoutbolttopologystream抽象的一个特性 2、...

datapro
2015/06/24
376
0
Storm目录树、任务提交、消息容错、通信机制

Storm技术增强 注:学习本课程,请先学习Storm基础 课程目标: 通过本模块的学习,能够掌握Storm底层的通信机制、消息容错机制、storm目录树及任务提交流程。 课程大纲: 1、 Storm程序的并发...

飓风2000
2018/09/26
15
0
Storm【技术文档】 - Storm的Acker机制

基本概念的解析 对于Storm,有一个相对比较重要的概念就是 "Guarantee no data loss" -- 可靠性 很明显,要做到这个特性,必须要tracker 每一个data的去向和结果,Storm是如何做到的 -》 那就...

止静
2014/06/23
637
0
storm入门 第四章 消息的可靠处理

4.1 简介 storm可以确保spout发送出来的每个消息都会被完整的处理。本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。 4.2 ...

坏坏一笑
2014/12/03
38
0

没有更多内容

加载失败,请刷新页面

加载更多

JS基础-该如何理解原型、原型链?

JS的原型、原型链一直是比较难理解的内容,不少初学者甚至有一定经验的老鸟都不一定能完全说清楚,更多的"很可能"是一知半解,而这部分内容又是JS的核心内容,想要技术进阶的话肯定不能对这个...

OBKoro1
今天
6
0
高防CDN的出现是为了解决网站的哪些问题?

高防CDN是为了更好的服务网络而出现的,是通过高防DNS来实现的。高防CDN是通过智能化的系统判断来路,再反馈给用户,可以减轻用户使用过程的复杂程度。通过智能DNS解析,能让网站访问者连接到...

云漫网络Ruan
今天
14
0
OSChina 周一乱弹 —— 熟悉的味道,难道这就是恋爱的感觉

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @xiaoshiyue :好久没分享歌了分享张碧晨的单曲《今后我与自己流浪》 《今后我与自己流浪》- 张碧晨 手机党少年们想听歌,请使劲儿戳(这里)...

小小编辑
今天
2.9K
24
SpringBoot中 集成 redisTemplate 对 Redis 的操作(二)

SpringBoot中 集成 redisTemplate 对 Redis 的操作(二) List 类型的操作 1、 向列表左侧添加数据 Long leftPush = redisTemplate.opsForList().leftPush("name", name); 2、 向列表右......

TcWong
今天
46
0
排序––快速排序(二)

根据排序––快速排序(一)的描述,现准备写一个快速排序的主体框架: 1、首先需要设置一个枢轴元素即setPivot(int i); 2、然后需要与枢轴元素进行比较即int comparePivot(int j); 3、最后...

FAT_mt
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部