JStorm - ACK机制

原创
2017/03/16 16:17
阅读数 834

适用场景

为了确保每条数据仅会被正确的处理一次,JStorm提供了ack机制.
JStorm的ack机制可以追踪totology作业流中的每一个环节是否被正常处理,从而决定作为源头的spout是否需要重发tuple.
所以,该机制适用于对数据完整性与正确性有一定要求的topology,如果认为偶尔一两条数据的处理失败无伤大雅,则大可以取消该机制以获得性能上的提升.

使用方式

在描述具体的启停实现前,需要先熟悉两个Bolt : BaseBasicBolt, BaseRichBolt, 他们的区别在于继承了BaseBasicBolt的bolt,每一个tuple处理完会被自动ack, 而BaseRichBolt的子类则需要自己处理ack相关的逻辑。

启用ack

设置acker数量

首先需要确保topology的acker数量不能为0, 调用以下方法可以修改拓扑的acker数量:

conf.setNumAckers(1);

为spout发射的tuple指定msgId

spout在emit一个tuple的时候,必须为每一个tuple指定一个全局唯一的msgId作为状态监控的依据,如果未指定msgId,则该tuple不会被ack或fail,可以看下SpoutOutputCollector类里对相关方法的说明:

带有msgId的:
/**
* Emits a new tuple to the default output stream with the given message ID.
* When Storm detects that this tuple has been fully processed, or has
* failed to be fully processed, the spout will receive an ack or fail
* callback respectively with the messageId as long as the messageId was not
* null. If the messageId was null, Storm will not track the tuple and no
* callback will be received. The emitted values must be immutable.
*
* @return the list of task ids that this tuple was sent to
*/
public List<Integer> emit(List<Object> tuple, Object messageId) {
    return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
}
    
不带msgId的:
/**
* Emits a tuple to the default output stream with a null message id. Storm
* will not track this message so ack and fail will never be called for this
* tuple. The emitted values must be immutable.
*/
public List<Integer> emit(List<Object> tuple) {
    eturn emit(tuple, null);
}

bolt的处理

在满足前面两个条件的情况下,bolt任务直接继承BaseBasicBolt即可保证每一个tuple可以被正确监控。如果继承了BaseRichBolt,想自己处理ack的业务逻辑,也很简单,需要关注两点:
emit: 在emit下一个tuple的时候,需要anchor输入的tuple,具体为调用OutputCollector类中带anchor参数的emit方法.
ack: 在执行完毕后需要ack输入的tuple,可以直接参考下BaseBasicBolt里对ack的处理:

public void execute(Tuple input) {
    _collector.setContext(input);
    try {
        _bolt.execute(input, _collector);
        _collector.getOutputter().ack(input);
    } catch (FailedException e) {
        if (e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        _collector.getOutputter().fail(input);
    }
}

可以看到,在业务逻辑执行完毕后,调用ack()方法提交状态,如果发生异常,则调用fail()方法。 同时可以发现,继承BaseBasicBolt时,如果执行体抛出的不是FailedException的话,任务处理线程不会自动fail(),这时线程会发生什么?

停用ack

全部停用

要停止对整条topology任务流的追踪有两个方案,任选其一即可:

  1. 调用conf.setNumAckers(0),当acker num为0时,spout发出的tuple会被立即ack,storm不会追踪数据的处理结果.
  2. 当spout发射tuple时,不为其指定msgId.

部分停用

如果不关心某一bolt之后的处理结果,只想确保之前的任务流能被成功处理,则处于转折点的bolt需要继承BaseRichBolt,并且在emit新的tuple时,不与之前的tuple锚定,方式为调用OutputCollector的相关方法:

/**
* Emits a new unanchored tuple to the specified stream. Because it's
* unanchored, if a failure happens downstream, this new tuple won't affect
* whether any spout tuples are considered failed or not. The emitted values
* must be immutable.
*
* @param streamId the stream to emit to
* @param tuple the new output tuple from this bolt
* @return the list of task ids that this new tuple was sent to
*/
public List<Integer> emit(String streamId, List<Object> tuple) {
    return emit(streamId, (List) null, tuple);
}

实现原理

ack的实现原理十分精妙,作者大才...
Storm的ack机制是利用异或运算的特殊性质来实现的,异或运算相关性质:

a ^ a = 0
a ^ 0 = a
a ^ b ^ a = b
a ^ b = b ^ a
a ^ b ^ c = a ^ (b ^ c) = (a ^ b) ^ c;

在Storm中,每一个作业流都有一个初始状态值ack-val:0,每一个tuple在被发射的时候会有一个anchor操作,完成时有一个ack操作, 这两个操作实际上都是与ack-val进行异或操作,由异或的性质可以知道,如果所有tuple都被成功处理,则ack-val的最终值一定会等于初始值,也就是0:

ack-val ^ 1001(emit) ^ 1002(emit) ^ 1001(ack) ^ 1003(emit) ^ 1002(ack) ^ 1003(ack) = ack-val

这样通过检测ack-val的最终值就能得知任务流的执行结果,从而决定是ack还是fail.

需要关注的点

  1. 继承BaseBasicBolt时,如果方法抛出的异常不是FailedException,storm不会调用fail方法,task线程执行失败。
  2. bolt确认结果为fail时,spout会进行tuple重发,但上一轮处理过程中被持久化到第三方存储系统的数据无法回滚,需要自己增加额外的措施来保证数据不会被重复写入。JStorm自己实现了一套事务机制,还未研究。
  3. spout为了实现数据重发,需要自己维护一份缓存信息,处理ack和fail方法。需要有重试次数的限制,否则一直失败的消息会占用大量的内存。
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部