文档章节

深度剖析 Kafka/RocketMQ 顺序消息的一些坑

后端进阶
 后端进阶
发布于 05/08 14:06
字数 3087
阅读 2.9W
收藏 64

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

我不记得有多少人问过以下这个问题了:

我觉得这个问题问得很频繁,而且非常经典,在这里我就以 Kafka 为例子,说说我对 Kafka 顺序消息的一些理解吧,如有理解不对的地方麻烦留言指点一下。

通常我们在说顺序消费指的是生产者按照顺序发送,消费者按照顺序进行消费,听起来简单,但做起来却非常困难。

我们都知道无论是 Kafka 还是 RocketMQ,每个主题下面都有若干分区(RocketMQ 叫队列),如果消息被分配到不同的分区中,那么 Kafka 是不能保证消息的消费顺序的,因为每个分区都分配到一个消费者,此时无法保证消费者的消费先后,因此如果需要进行消息具有消费顺序性,可以在生产端指定这一类消息的 key,这类消息都用相同的 key 进行消息发送,kafka 就会根据 key 哈希取模选取其中一个分区进行存储,由于一个分区只能由一个消费者进行监听消费,因此这时候消息就具有消息消费的顺序性了。

生产端

但以上情况只是在正常情况下可以保证顺序消息,但发生故障后,就没办法保证消息的顺序了,我总结以下两点:

1、当生产端是异步发送时,此时有消息发送失败,比如你异步发送了 1,2,3 消息,2 消息发送异常重试发送,这时候顺序就乱了;

2、当 Broker 宕机出现问题,此时生产端有可能会把顺序消息发送到不同的分区,这时会发生短暂消息顺序不一致的现象。

针对以上两点,生产端必须保证单线程同步发送,这还好解决,针对第二点,想要做到严格的消息顺序,就要保证当集群出现故障后集群立马不可用,或者主题做成单分区,但这么做大大牺牲了集群的高可用,单分区也会另集群性能大大降低。

针对以上第二点,下面盘点一下 Kafka 集群中有哪些意外情况会打乱消息的顺序。

1、分区变更的情况

假设有集群中有两个分区的主题 A,生产端需要往分区 1 发送 3 条顺序消息,我们都知道生产端是根据消息 Key 取模计算决定消息发往哪个分区的,如果此时生产端发送第三条消息前,主题 A 增加了一个分区,生产端根据 Key 取模得出的分区号就不一样了,第三条消息路由到其它分区,结果就是这三条顺序消息就不在同一个分区了,此时就不能保证这三条消息的消费顺序了。

2、分区不变更

1.1、分区单副本

假设此时集群有两个分区的主题 A,副本因子为 1,生产端需要往分区 1 发送 3 条顺序消息,前两条消息已成功发送到分区 1,此时分区 1 所在的 broker 挂了(由于副本因子只有 1,因此会导致分区 1 不可用),当生产端发送第三条消息时发现分区 1 不可用,就会导致发送失败,然后尝试进行重试发送,如果此时分区 1 还未恢复可用,这时生产端会将消息路由到其它分区,导致了这三条消息不在同一个分区。

1.2、分区多副本

针对分区单副本情况,我们自然会想到将分区设置为多副本不就可以避免这种情况发生吗?多副本情况下,发送端同步发送,acks = all,即保证消息都同步到全部副本后,才返回发送成功,保证了所有副本都处在 ISR 列表中,如果此时其中一个 broker 宕机了,也不会导致分区不可用的情况,看起来确实避免了分区单副本分区不可用导致消息路由到其它分区的情况发生。

但我想说的是,还有一种极端的现象会发生,当某个 broker 宕机了,处在这个 broker 上的 leader 副本就不可用了,此时 controller 会进行该分区的 leader 选举,在选举过程中分区 leader不可用,生产端会短暂报 no leader 警告,这时生产端也会出现消息被路由到其它分区的可能。

消费端

当然,还有一个读者是这么问的:

以下分析假设生产端已经将顺序消息成功发送到同一个分区。

Kafka

kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。

1、每个线程维护一个 KafkaConsumer

这样相当于一个进程内拥有多个消费者,也可以说消费组内成员是有多个线程内的 KafkaConsumer 组成的。

但其实这个消费模型是存在很大问题的,从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大,项目中一般不用该线程模型去消费。

2、单 KafkaConsumer 实例 + 多 worker 线程

针对第一个线程模型的缺点,我们可采取 KafkaConsumer 实例与消息消费逻辑解耦,把消息消费逻辑放入单独的线程中去处理,线程模型如下:

从消费线程模型可看出,当 KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性,在公司内部使用的多线程消费模型就是用的单 KafkaConsumer 实例 + 多 worker 线程模型。

但这个消费模型由于消费逻辑是利用多线程进行消费的,因此并不能保证其消息的消费顺序,在这里我们可以引入阻塞队列的模型,一个 woker 线程对应一个阻塞队列,线程不断轮训从阻塞队列中获取消息进行消费,对具有相同 key 的消息进行取模,并放入相同的队列中,实现顺序消费, 消费模型如下:

但是以上两个消费线程模型,存在一个问题:

在消费过程中,如果 Kafka 消费组发生重平衡,此时的分区被分配给其它消费组了,如果拉取回来的消息没有被消费,虽然 Kakfa 可以实现 ConsumerRebalanceListener 接口,在新一轮重平衡前主动提交消费偏移量,但这貌似解决不了未消费的消息被打乱顺序的可能性?

因此在消费前,还需要主动进行判断此分区是否被分配给其它消费者处理,并且还需要锁定该分区在消费当中不能被分配到其它消费者中(但 kafka 目前做不到这一点)。

参考 RocketMQ 的做法:

在消费前主动调用 ProcessQueue#isDropped 方法判断队列是否已过期,并且对该队列进行加锁处理(向 broker 端请求该队列加锁)。

RocketMQ

RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你准备好了你的需求,它本身的消费模型就是单 consumer 实例 + 多 worker 线程模型,有兴趣的小伙伴可以从以下方法观摩 RocketMQ 的消费逻辑:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

RocketMQ 会为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理,ConsumeMessageService 有两个子接口:

// 并发消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 顺序消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

其中,ConsumeMessageConcurrentlyService 内部有一个线程池,用于并发消费,同样地,如果需要顺序消费,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 类进行顺序消息消费处理。

经过对 Kafka 消费线程模型的思考之后,从 ConsumeMessageOrderlyService 源码中能够看出 RocketMQ 能够实现局部消费顺序,我认为主要有以下两点:

1)RocketMQ 会为每个消息队列建一个对象锁,这样只要线程池中有该消息队列在处理,则需等待处理完才能进行下一次消费,保证在当前 Consumer 内,同一队列的消息进行串行消费。

2)向 Broker 端请求锁定当前顺序消费的队列,防止在消费过程中被分配给其它消费者处理从而打乱消费顺序。

总结

经过这篇文章的分析后,尝试回答读者的问题:

1、生产端:

1)生产端必须保证单线程同步发送,将顺序消息发送到同一个分区(当然如果发生了文中所描述的 Kafka 集群中意外情况,还是有可能会打乱消息的顺序,因此无论是 Kafka 还是 RocketMQ 都无法保证严格的顺序消息);

2、消费端:

2)多分区的情况下:

如果想要保证 Kafka 在消费时要保证消费的顺序性,可以使用每个线程维护一个 KafkaConsumer 实例,并且是一条一条地去拉取消息并进行消费(防止重平衡时有可能打乱消费顺序);对于能容忍消息短暂乱序的业务(话说回来, Kafka 集群也不能保证严格的消息顺序),可以使用单 KafkaConsumer 实例 + 多 worker 线程 + 一条线程对应一个阻塞队列消费线程模型。

3)单分区的情况下:

由于单分区不存在重平衡问题,以上两个线程模型的都可以保证消费的顺序性。

另外如果是 RocketMQ,使用 MessageListenerOrderly 监听消费可保证消息消费顺序。

很多人也有这个疑问:既然 Kafka 和 RocketMQ 都不能保证严格的顺序消息,那么顺序消费还有意义吗?

一般来说普通的的顺序消息能够满足大部分业务场景,如果业务能够容忍集群异常状态下消息短暂不一致的情况,则不需要严格的顺序消息。

如果你对文章还有什么疑问和补充或者发现文中有错误的地方,欢迎关注我的公众号「后端进阶」留言给我,我们一起探讨。

作者简介

作者张乘辉,擅长消息中间件技能,负责公司百万 TPS 级别 Kafka 集群的维护,作者维护的公号「后端进阶」不定期分享 Kafka、RocketMQ 系列不讲概念直接真刀真枪的实战总结以及细节上的源码分析;同时作者也是阿里开源分布式事务框架 Seata Contributor,因此也会分享关于 Seata 的相关知识;当然公号也会分享 WEB 相关知识比如 Spring 全家桶等。内容不一定面面俱到,但一定让你感受到作者对于技术的追求是认真的!

公众号:后端进阶

技术博客:https://objcoding.com/

GitHub ID:https://github.com/objcoding/

公众号「后端进阶」,专注后端技术分享!

© 著作权归作者所有

后端进阶
粉丝 134
博文 46
码字总数 90233
作品 0
广州
程序员
私信 提问
加载中

评论(5)

xdev
xdev
1、分区变更问题,可以自己实现分区算法,保证消息始终到指定分区。2、一般生产环境都是多幅本,否则没有高可用可言。3、分区多副本,leader不可用kafka还能够接收生产端数据?能不能有真实案例或者分析可分享。4、每个线程维护一个 KafkaConsumer性能问题不能归结为kafka问题把,那是业务处理问题,业务可以多节点不一定要要靠增加线程量把。在说如果担心kafka集群,也也可以增加集群数量,把分区改为单分区多副本topic。当然这种不时最解。5、针对数据被rebalance,消息无法保证顺序问题,如果可以提交offset会报错,这时业务数据处理主动回滚掉(采用同步批量commit offset)。总之要保证绝对顺序确实不易。
后端进阶
后端进阶 博主
第三个问题,文中已经说明白了,分区在选举leader时 分区 leader 副本不可用,生产端会重试并可能会将消息路由到其它分区;第四个问题,文中没有说是kafka的问题,因为kafka消费客户端并不提供多线程消费逻辑,需要用户自行实现,具体你可以看看RocketMQ的多线程消费模型,如果topic是单分区,无论你增加多少副本增加多少机器,都不能提高性能;第五个问题,只要你消费时没有将该分区进行锁定,在极端情况有可能出现消费顺序被打乱情况,可以看看RocketMQ顺序消费的逻辑实现,kafka assign 模式可以避免重平衡,也许可以解决这个问题。总之,无论RocketMQ和Kafka在多分区情况下,都不能保证严格的顺序消息。感谢你的深入思考,可以关注我的公众号并加我好友,进一步深入讨论。
xdev
xdev
感谢你的回复,互相交流会加深我对kafka理解,您说的问题三,我回答的1应该可以解决吧,我没有测试过。我回答的4是想说,如果服务器节点数足够多,分区leader会在不同节点,单个KafkaConsumer不会减少socket数,如果多个分区leader在一个节点上,socket数过多可以增加消费端节点数,而不是说一定得靠增加线程数,只是这种方式不是最优。针对topic多分区多副本使用方式,有可能导致乱序问题,我的意思是使用另一种模式:多topic单分区,多副本;分区leader挂了,也不会发送到其他分区,因为只有一个分区(和kafka assign 模式理论上原理一样)。使用种方式来避免问题不是来提高性能。从数据拉取角度看,单 KafkaConsumer 实例肯定不如多KafkaConsumer 实例。就目前我使用过程来看,性能基本上都是在消费者这边。多 worker,又是锁什么的太复杂,牺牲多线程调度性能(在分区数远大于worker数下,但这个时候多worker数性能又少于多KafkaConsumer模式,所以那种方式最佳分场景讨论不是一个定数),降低复杂度,在一定条件下也不是不可以。
深漂
深漂
kafka assign 模式是不是可以规避这个重新均衡导致分区重新分配问题?使用这种模式消费者不用加入group分组
后端进阶
后端进阶 博主
理论上是可以的规避这个重平衡问题的,感谢你的补充🙏
推送消息为什么使用RocketMQ,而不使用Kafka?

Kafka一般适用日志传输,而RocketMQ适用非日志的可靠性传输(日志传输也可以)。接下来我们将讨论它俩区别,你们就知道哪种情况用RocketMQ。其实,RocketMQ是阿里开发的,大家都知道,阿里的...

Ivy_Xu
04/01
0
0
说说MQ之RocketMQ(一)

原文出处:Valleylord RocketMQ 是出自 A 公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好,目前,RocketMQ 的文档仍然不够丰...

Valleylord
2018/10/12
0
0
说说MQ之RocketMQ(三)

原文出处:Valleylord RocketMQ 的主备模式 按之前所说,只有 RocketMQ 的多主多从异步复制是可以生产使用的,因此只在这个场景下测试。另外,消息采用 Push 顺序模式消费。 假设集群采用2主...

Valleylord
2018/10/12
0
0
说说MQ之RocketMQ(二)

原文出处:Valleylord RocketMQ 的 Java API RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供...

Valleylord
2018/10/12
0
0
rocketMq和kafka的性能对比和原理

根据阿里巴巴中间件团队对rocketMq,kafka和rabbitMq的发送消息性能的测试,在单机同步发送的场景下,Kafka>RocketMQ>RabbitMQ。如下图: Kafka的吞吐量高达17.3w/s, RocketMQ吞吐量在11.6w...

osc_awm6njne
2019/01/09
44
0

没有更多内容

加载失败,请刷新页面

加载更多

一文教你学会数据库异地备份

云栖号快速入门:【点击查看更多云产品快速入门】 不知道怎么入门?这里分分钟解决新手入门等基础问题,可快速完成产品配置操作! 数据库备份DBS提供数据库异地备份能力,满足PolarDB、RDS、...

osc_l7zl78wt
9分钟前
17
0
js中对象的定义是指什么

主要内容--乐字节java直播课 事件 事件 (Event) 是 JavaScript 应用跳动的心脏 ,进行交互,使网页动起来。当我们与浏览器中 Web 页面进行某些类型的交互时,事件就发生了。事件可能是用户在...

osc_6ls9vwji
10分钟前
3
0
TypeError: __init__() got an unexpected keyword argument ¨serialized_options¨

TypeError: __init__() got an unexpected keyword argument 'serialized_options' TypeError: __init__() got an unexpected keyword argument 'serialized_options' 这是由于包版本和pytho......

osc_r0irdqn7
11分钟前
15
0
读博难,大神来支招:DeepMind科学家Sebastian Ruder提出十条实用建议 - 知乎

如何找到合适的研究方向?如何维持饱满的科研热情?如何通过读博实现长期规划?……DeepMind 科学家 Sebastian Ruder 来支招。 选自http://ruder.io,作者:Sebastian Ruder,机器之心编译,...

osc_cpolwgcv
12分钟前
13
0
Java收入不再最低,Python被TypeScript击败,2020全球开发者调查报告出炉 - 知乎

Stack Overflow 2020 年度全球开发者调查报告出炉。报告显示,JavaScript 连续八年成为最常用的编程语言,而在最受开发者喜爱的编程语言榜单中,Python 排名第三,较去年下降一位,被 TypeSc...

osc_j3111wl4
13分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部