可靠消息一致性的奇淫技巧

原创
2019/05/13 08:30
阅读数 14

点击上方"田守枝的技术博客",关注我


"可靠消息最终一致性"是为了解决Producer端的消息发送与本地事务执行的原子性问题,是一种柔性事务,属于异步确保型,软状态,最终一致。

问题典型场景是:本地往DB中插入一条记录,同时往MQ中发送一条消息,必须保证二者同时成功或者同时失败。由于DB和MQ是不同的系统,可能插入DB成功,但是发消息到MQ中失败;也可能插入DB失败,但是发送消息到MQ成功。如何保证二者的一致性,就成为了我们要解决的问题。

本文深入讲解如何实现可靠消息一致性的各种实现方案,让你一次爽个够。包括:

  • 本地事务表

  • RocketMQ中的事务消息

  • Binlog订阅解析

  • Kafka中如何实现(彩蛋)

1 本地事务表

要解决Producer端的消息发送与本地事务执行的原子性问题,一个典型的思路是,我们先将消息暂存到一个地方,在本地事务执行完成之前,这个消息对消费者是不可见的。只有当本地事务确认执行成功后,消费者才可以消费到这条消息。

下面用伪代码演示这个过程:


        步骤说明:

  • 先预发送一条消息,注意这里使用了"预发送这个关键字,预发送的消息,消费者是看不到,因此不会消费。

  • 执行本地业务,例如往数据库插入一条记录。

  • 根据本地事务执行的结果,确认是提交这条消息,还是回滚这条消息。只有提交后的消息,消费者才能进行消费。

        显然,如果我们可以保证上述每个步骤都可以正确的执行,那么本地事务的执行与发送消息的行为将可以保持一致。然后事实总是残酷的,这一套流程充满了挑战。

1.1 如何实现预发送

        大多数MQ,消息发送后,就可以直接被消费者消费了,然而我们并不想这样,需要等到本地事务也执行成功。

        一种很直观的思路是,我们先将这个消息,找一个地方暂存起来。例如在数据库中建立一个表,将消息存入到这个表中,称之为”本地事务表”。在这个表中,可以有一个state字段表示消息的状态,在预发送阶段,我们将其标记为UNKONWN

1.2 如何确认或者回滚

        我们可以根据本地事务执行的结果,修改本地事务表中状态字段的值。如果本地事务执行成功,我们可以将本地事务表中的状态字段改为LOCAL_COMMIT;如果执行失败,我们可以将其改为LOCAL_ROLLBACK

        另外,我们通过一个异步的线程,不断的从这个表中,查询状态为LOCAL_COMMIT的消息,将其发送到MQ中。异步线程发送消息到MQ中,也可能成功,或者失败:

发送MQ成功:

此时,从发送端来说,整个事务已经结束,将其标记GLOBAL_COMMIT,接下来就是消费端进行消费。

发送MQ失败:

这个时候需要进行重试,直到成功。如果你想限制一个最大重试次数,可以在这个表中添加一个retries字段,每重试一次,就+1,当超过次数阈值后,就不再发送。你也可以指定一个消息的超时时间,当超过时间阈值后,也不再发送。对于发送失败的消息,将其状态标记为MESSAGE_ERROR。还可以事务表中添加一个cause字段,表示是什么原因导致的发送失败。

1.3 如何避免状态丢失

前面提到预发送消息阶段,会将本地事务表的状态字段设置为UNKONWN。在本地事务执行之后,将其改为LOCAL_COMMIT或者LOCAL_ROLLBACK。

然而,有可能本地事务执行之后,更改本地消息表中消息状态的行为失败了。这种情况下,消息就一直处于UNKONWN状态,而异步线程只会发送状态为LOCAL_COMMIT的消息到MQ中,这个消息会一直被忽略,也就是产生了消息状态丢失。解决方案

方案一:扩大事务边界

        将预发送消息、执行本地事务、修改本地事务表消息状态三个操作合并到一个事务中。

        在第一步预发送消息之前就开启事务,在第三步执行结束之后提交或者回滚事务,因为所有操作位于同一个事务中,从而保证,本地事务表中的消息记录,与业务操作产生的记录,总是同时成功或者失败,且状态一致。

方案二:合并事务状态

        显然,你还可以更进一步,消息不需要有预发送的状态,直接和正常的数据库操作合并到一个事务中写入到数据库,状态直接就是LOCAL_COMMIT,之后异步线程发送的逻辑不变。

方案三:对PREPARED状态消息也进行检查

        方案一、二的特点在于,只在业务方法执行的时候,只进行一次判断事务是否可以提交,之后异步线程发送消息的时候,只检查LOCAL_SUCCESS状态的消息发送到MQ中,这可以满足大部分场景了。

        然而,有时可能会有更复杂的场景。例如,有一个业务逻辑很复杂,业务的发起方A,除了操作本地数据库,可能需要进行RPC调用查询业务B,以获得一些MQ消息中必须要包含的一些信息。然而,B可能当前还不能提供这些信息,需要等待一段时间才能提供。A希望将这条消息保存下来,等到B可以提供足够多信息的时候再发送。这个时候,方案一、二就不满足了,我们需要继续进行改进。

        具体策略是:在原始方案的基础上,让异步线程除了发送LOCAL_SUCCESS状态的消息之外,还对PREPARED状态的消息进行检查。当然你需要设置个过滤条件,如一个PREPARED状态的消息的创建时间,必须与当前时间比较的差值大于某个时间阈值时,才去尝试去查询这个消息的正确状态应该是什么。

        设置阈值,主要是为了避免与新事务一开始插入消息的PREPARED消息状态混淆,这些新的PREPARED消息可能立即就会修改为LOCAL_SUCCESS。只有那些长时间处于PREPARED状态的消息,才有可能是因为本地事务执行成功,但是更新消息状态失败而导致的。

        我们可以总结出,方案三的最大特点是:当前条件满足的情况下,立即判断可以发送消息;如果当前条件不满足,还可以异步的确定是否满足消息发送的条件。显然提供了极大的灵活性。而方案一二,只支持前者。

2 RocketMQ事务消息

         Apache RocketMQ 4.3版本中引入了事务消息。与我们前面使用本地事务表要解决的问题相同。都是为了为了解决Producer端发送消息本地事务执行的原子性问题。

与本地事务表的思路一致,都是将消息先找一个地方暂存起来,只不过暂存的地方不一样,RocketMQ将消息暂存到了内部的主题中。

        为了支持事务消息,RocketMQ引入了Half Topic 以及Operation Topic 两个内部队列来存储事务消息推进状态。其中:

  • Half Topic对应队列中存放着prepare消息,也就是预发送的消息,消息会不直接发到目标Topic,因此消费者不可见,实现暂存

  • Operation Topic对应的队列则存放了prepare message对应的commit/rollback消息,消息体中则是prepare message对应的offset。

关于这两个topic,我们在后文会详细介绍其作用。

下面是RocketMQ中一个事务消息的发送流程,不够清楚,可以放大看一下每个步骤:


图片来源:阿里巴巴中间件官微

接下来,对上述部分流程合并进行说明。

2.1 事务生产者预发送消息

通过TransactionMQProducer发送事务消息,这个producer会在一条普通的Message中加上一些元数据,标识这是一条预先发送的事务消息。broker端在发现这是一条事务消息的时候,会将其存储到Half Topic中。另外必须要制定producer group,以便在发送者失败甚至宕机的情况下,回查其他同一个producer group中的实例查询事务状态。

2.2 执行本地事务

在发送prepare消息成功后,需要执行本地事务。这是需要实现RocketMQ提供的一个TransactionListener接口的方法来完成(非Java读者不必在意,语言是次要的,关键是思路):


        其中:

  • executeLocalTransaction方法:用于执行本地事务,显然就是我们的业务逻辑代码,操作数据库,或者干一些其他事情。

  • checkLocalTransaction方法:用于检查事务状态,关于这个方法的作用我们将后文讲解。

这两个方法都返回了一个表示本地事务消息的执行状态LocalTransactionState,事务生产者会将其上报给broker。状态总共有三种,见下文分析:

public enum LocalTransactionState {    COMMIT_MESSAGE,    ROLLBACK_MESSAGE,    UNKNOW,}

2.3 本地事务状态的处理

不管客户端返回的是哪一种状态,生产者拿到这个状态,都会将这个状态报告给broker。broker在处理时,发现这是一个报告事务状态的消息,首先会判断状态值,进行相应的处理。

2.3.1 COMMIT & ROLLBACK状态

broker会把收到事务消息的状态后,会记录在内部主题Operation Topic中,消息体中则是prepare message对应在Half Topic中的offset。如下图所示:


图片来源:阿里巴巴中间件官微

此外,broker还会有一个内部服务,消费Operation Topic中的消息,具体来说:

  • 如果是rollback消息,broker将从Half Topic中删除该prepare消息不进行下发。

  • 如果是commit消息,broker将会把这个消息从取出来发送到原始的目标Topic中,此时consumer端可以消费

细心的读者发现了,图中这两个队列的长度刻意画的不相等。其实是为了说明,在一些异常情况,可能上报事务消息状态失败,因此OperationTopic中没有记录,二者的差值可能就是UNKOWN未确认中间状态的消息,需要进行特殊处理。

2.3.2 UNKNOW状态

        如果消息是UNKNOW中间状态,那么说明目前还不能确定事务的状态,broker需要主动询问客户端producer。以下场景,可能会出现UNKNOW中间状态:

  • 异常状态:如果执行本地事务过程中,执行端挂掉,或者超时

  • 有意为之:回顾前面本地事务表方案讲解时,在一些特殊场景下,需要等待一段时间满足特定场景,才能将消息给消费者进行消费。所以我们主动返回UNKNOW。

       由于UNKNOW中间状态的消息,并不会提交到Operation Topic中,因此Half Topic与Operation Topic这两个内部主题中,服务端通过比对两个主题的差值来找到尚未提交的超时事务,进行回查。

回查意味着,业务方必须提供一个方法让rocketmq来回调。前面我们看到TransactionListener接口有2个方法,另外一个方法checkLocalTransaction就是用于回查。

我们需实现这个方法,rocketmq会把我们之前发送的消息当做参数传入。我们可以根据消息中的内容,反查之前的业务记录信息,确定状态。

最后小提一点,broker主动询问客户端producer事务状态,是依赖于broker与producer端的双向通信能力来完成的也就是broker会主动给客户端producer发请求。双向通信能力是基于rocketmq-remoting模块基础上完成的。

2.4 小结

        从上述事务消息设计中可以看到,RocketMQ事务消息较好的解决了事务的最终一致性问题,事务发起方仅需要关注本地事务执行以及实现回查接口给出事务状态判定等实现,而且在上游事务峰值高时,可以通过消息队列,避免对下游服务产生过大压力。RocketMQ官网上提供了事务消息的完整使用案例,读者可以自行参考。

      当然,事情不可能是那么的美好,以下是RocketMQ事务消息使用限制:

  • 事务消息没有延迟和批量支持,即不能使用延迟消息的特性和批量发送消息的特性。

  • 为了避免多次检查单个消息并导致Half Topic消息累积,默认将单个消息的检查次数限制为15次。

  • 在broker的配置中,由参数“transactionTimeout”配置检查事务消息的固定周期。

  • 可以多次检查或消费事务消息。

  • 将事务消息提交到用户的目标topic的可能会失败。RocketMQ自身的高可用性机制确保了高可用性。如果要确保事务性消息不会丢失且事务完整性得到保证,建议使用同步双写机制

  • 事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许后向查询。MQ Server按其生产者ID查询客户端。

3 订阅数据库binlog

在之前的方案中,不论是本地事务表,还是RocketMQ中的事务消息,对业务都有一定的侵入性。

事实上,我们可以通过另外一种方案来实现可靠消息的发送。在这个方案中:

  • 先把本地事务执行完成。本地事务中的每个数据库更新操作,如INSERT、UPDATE等都会产生binlog event,这些event会在本地事务提交成功后,才会生成。

  • 接着,我们通过一个binlog订阅组件,来订阅数据库中的记录变更。订阅到的binlog event,必然都是已经成功执行的本地事务的信息。可以放心的根据这些event解析出相应的信息,发送消息到MQ中即可。

事实上,笔者认为这个方案更加优雅。目前开源的binlog订阅组件有很多,各种语言的实现都有:java、go、python等,首推的还是阿里巴巴开源的canal,服务端使用java编写,支持多语言客户端。

4 Kafka中的事务消息

从Kafka 0.11开始,KafkaProducer支持另外两种模式:幂等生产者( idempotent producer)和事务生产者(transactional producer)。

  • 幂等生产者:将Kafka的交付语义从"at least once”加强到"exactly once"。特别是生产者重试将不再引入重复。

  • 事务生产者:允许应用程序以原子方式同时发送消息发送到多个主题和分区

这里我们看到了Kafka中的事务消息实际上与RocketMQ中的事务消息是截然不同的概念,类似于数据库事务的原子性。

如果你希望在Kafka中使用类似于RocketMQ的事务消息,那么只能自己做了,可以在Kafka之前加一个代理,由这个代理暂存事务消息,条件满足后,再发送到目标Topic中供业务方消费。

识别二维码关注我


这里只是介绍了事务消息,RocketMQ中还有很多其他高级特性,如重试队列、延迟队列、死信队列等,这些特性Kafka也不支持。

厮大的新书《深入理解Kafka:核心设计与事件原理》中,详细的介绍了如何在Kafka中实现这些高级特性,难得的一本好书。另外也有对应的掘金小册,电子版,更方便。


往期精彩

详解HTTP 与TCP中Keep-Alive机制的区别

TCP粘包、拆包与通信协议详解

异地多活场景下的数据同步之道

数据库中间件详解

深入理解数据库编程中的超时设置

分布式事务概述

史上最详细mybatis与spring整合教程

源码剖析 Mybatis 映射器(Mapper)工作原理

剖析Spring多数据源

本文参考

http://rocketmq.apache.org/docs/transaction-example/

里程碑 | Apache RocketMQ 正式开源分布式事务消息

本文分享自微信公众号 - 田守枝的技术博客(tianshouzhi_blog)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部