Pulsar Meetup 讲师博文推荐:林琳「Apache Pulsar 技术系列 - Pulsar事务实现原理」

04/25 22:55
阅读数 0

作者简介


 林琳                                                                         


腾讯云中间件专家工程师

Apache Pulsar PMC,《深入解析Apache Pulsar》作者。目前专注于中间件领域,在消息队列和微服务方向具有丰富的经验。负责 TDMQ的设计与开发工作,目前致力于打造稳定、高效和可扩展的基础组件与服务。


前言


在事务消息未出现前,Pulsar中支持的最高等级的消息传递保证,是通过Broker的消息去重机制,来保证Producer在单个分区上的消息只精确保存一次。当Producer发送消息失败后,即使重试发送消息,Broker也能确保消息只被持久化一次。但在Partitioned Topic的场景下,Producer没有办法保证多个分区的消息原子性。


当Broker 宕机时,Producer可能会发送消息失败,如果Producer没有重试或已用尽重试次数,则消息不会写入 Pulsar。在消费者方面,目前的消息确认是尽力而为的操作,并不能确保消息一定被确认成功,如果消息确认失败,这将导致消息重新投递,消费者将收到重复的消息, Pulsar 只能保证消费者至少消费一次。


类似地,Pulsar Functions 仅保证对幂等函数上的单个消息处理一次,即需要业务保证幂等。它不能保证处理多个消息或输出多个结果只发生一次。


举个例子,某个Function的执行步骤是:从Topic-A1、Topic-A2中消费消息,然后Function中对消息进行聚合处理(如:时间窗口聚合计算),结果存储到Topic-B,最后分别确认(ACK) Topic-A1和Topic-A2中的消息。该Function可能会在“输出结果到Topic-B”和“确认消息”之间失败,甚至在确认单个消息时失败。这将导致所有(或部分)Topic-A1、Topic-A2的消息被重新传递和重新处理,并生成新的结果,进而导致整个时间窗口的计算结果错误。


因此,Pulsar需要事务机制来保证精确一次的语义(Exactly-once),生产和消费都能保证精确一次,不会重复,也不会丢失数据,即使在Broker宕机或Function处理失败的情况下。


事务简介


Pulsar事务消息的设计初衷是用于保证Pulsar Function的精确一次语义,可以保证Producer发送多条消息到不同的Partition时,可以同时全部成功或者同时全部失败。也可以保证Consumer消费多条消息在时,可以同时全部确认成功或同时全部失败。当然,也可以把生产、消费都包含在同一个事务中,要么全部成功,要么全部失败。


我们以本小节开头处的Function场景为例,演示生产、消费在同一个事务中的场景:


首先,我们需要在broker.conf中启用事务。


transactionCoordinatorEnabled=true


然后,我们分别创建PulsarClient和事务对象。生产者和消费者API中都需要带上这个事务对象,才能确保它们在同一个事务中。


//创建client,并启用事务PulsarClient pulsarClient = PulsarClient.builder()        .serviceUrl("pulsar://localhost:6650")        .enableTransaction(true)        .build();// 创建事务Transaction txn = pulsarClient        .newTransaction()        .withTransactionTimeout(1, TimeUnit.MINUTES)        .build()        .get();        String sourceTopic = "public/default/source-topic";String sinkTopic = "public/default/sink-topic";//创建生产者和消费者Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)        .topic(sourceTopic)        .subscriptionName("my-sub")        .subscribe();Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)        .topic(sinkTopic)        .create();        // 从原Topic中消费一条消息,并发送到另外一个Topic中,它们在同一个事务内        Message<String> message = sourceConsumer.receive();sinkProducer.newMessage(txn).value("sink data").sendAsync();sourceConsumer.acknowledgeAsync(message.getMessageId(), txn);// 提交事务txn.commit().get();



我们以本小节开头处的Function例子来说:


当未开启事务时,如果Function先把结果写入SinkTopic,但是消息确认失败(下图Step-4失败),这会导致消息被重新被投递(下图Step-1),Function会重新计算一个结果再发送到SinkTopic,这样就会出现一条数据被重复计算并投递了两次。


如果没有开启事务,Function会先确认消息,再把数据写入SinkTopic(先执行Step-4 再执行Step-3),此时如果写入SinkTopic失败,而SourceTopic的消息又已经被确认,则会造成数据丢失,最终的计算结果也不准确。


如果开启了事务,只要最后没有commit,前面所有的步骤都会被回滚,生产的消息、确认过的消息都被回滚,从而让整个流程可以重新再来一遍,不会重复计算,也不会丢失数据。整个时序图如下所示:



我们只需要根据上面步骤,了解每一步具体做了什么,就能清楚整个事务的实现方式。在下面的小节中,我们将逐步介绍。


事务流程


在了解整个事务流程之前,我们先介绍Pulsar中事务的组件,常见的分布式事务中都会有TC、TM、RM等组件:


  1. TM:事务发起者。定义事务的边界,负责告知 TC,分布式事务的开始,提交,回滚。在Pulsar事务中,由每个PulsarClient来扮演这个角色。


  2. RM:每个节点的资源管理者。管理每个分支事务的资源,每一个 RM 都会作为一个分支事务注册在 TC。在Pulsar中定义了一个TopicTransactionBuffer和PendingAckHandle来分别管理生产、消费的资源。


  3. TC :事务协调者。TC用于处理来自Pulsar Client的事务请求以跟踪其事务状态的模块。每个TC都有一个 唯一id (TCID) 标识,TC之间独立维护自己的事务元数据存储。TCID 用于生成事务 ID,广播通知不同节点提交、回滚事务。


下面,我们以一个Producer来介绍整个事务的流程,图中灰色部分代表存储,现有内存和Bookkeeper两种存储实现:



  1. 选择TC。一个Pulsar集群中可能存在多个TC(默认16个),PulsarClient在创建事务时需要先选择用哪个TC,后续所有事务的创建、提交、回滚等操作都会发往这个TC。选择的规则很简单,由于TC的Topic是固定的,首先Lookup查看所有分区所在的Broker(每个分区就是一个TC),然后每次Client创建新事务,轮询选择一个TC即可。


  2. 开启事务。代码中通过pulsarClient.newTransaction()开启一个事务,Client会往对应的TC中发送一个newTxn命令,TC生成并返回一个新事务的ID对象,对象里保存了TC的ID(用于后续请求找节点)和事务的ID,事务ID是递增的,同一个TC生成ID不会重复。


  3. 注册分区。Topic有可能是分区主题,消息会被发往不同的Broker节点,为了让TC知道消息会发送到哪些节点(后续事务提交、回滚时TC需要通知这些节点),Producer在发送消息之前,会先往TC上注册分区信息。这样一来,后续TC就知道要通知哪些节点的RM来提交、回滚事务了。


  4. 发送消息。这一步和普通的消息发送没有太大的区别,不过消息需要先经过每个Broker上的RM,Pulsar中RM被定义为TopicTransactionBuffer,RM里面会记录一些元数据,最后消息还是会被写入原始的Topic中。此时虽然消息已经被写入了原始Topic,但消费者是不可见的,Pulsar中的事务隔离级别是Read Commit。


  5. 提交事务。Producer发送完所有的消息后,提交事务,TC会收到提交请求后,会广播通知RM节点提交事务,更新对应的元数据,让消息可以被消费者消费。


Setp-4中的消息是如何保证持久化到Topic中又不可见的呢?


每个Topic中都会保存一个maxReadPosition属性,用来标识当前消费者可以读取的最大位置,当事务还未提交之前,虽然数据已经持久化到Topic中,但是maxReadPosition是不会改变的。因此Consumer无法消费到未提交的数据。


消息已经持久化了,最后事务要回滚,这部分数据如何处理?


如果事务要回滚,RM中会记录这个事务为Aborted状态。每条消息的元数据中都会保存事务的ID等信息,Dispatcher中会根据事务ID判断这条消息是否需要投递给Consumer。如果发现事务已经结束,则直接过滤掉(内部确认掉消息)。


最后提交事务时如果部分成功、部分失败,如何处理?


TC中有一个名为TransactionOpRetryTimer的定时对象,所有未全部成功广播的事务都会交给它来重试,直到所有节点最终全部成功或超过重试次数。那这个过程不会出现一致性问题吗?首先我们想想,出现这种情况的场景是什么。通常是某些Broker节点宕机导致这些节点不可用,或是网络抖动导致暂时不可达。在Pulsar中如果出现Broker宕机,Topic的归属是会转移的,除非整个集群不可用,否则总是可以找到一个新的Broker,通过重试来解决。在Topic归属转移过程中,maxReadPosition没有改变,消费者也消费不到消息。即使整个集群不可用,后续等到集群恢复后,Timer还是会通过重试让事务提交。


如果事务未完成,会阻塞普通消息的消费吗?


会。假设我们开启事务,发送了几条事务消息,但是并未提交或回滚事务。此时继续往Topic中发送普通消息,由于事务消息一直没有提交,maxReadPosition不会变化,消费者会消费不到新的消息,会阻塞普通消息的消费。这是符合预期的行为,为了保证消息的顺序。而不同Topic之间不会相互影响,因为每个Topic都有自己的maxReadPosition。


事务的实现


我们可以把事务的实现分为五部分:环境、TC、生产者RM、消费者RM、客户端。由于生产和消费资源的管理是分开的,因此我们会分别介绍。


环境设定


事务协调者的设置,需要从Pulsar集群的初始化时开始,我们在第一章中有介绍如何搭建集群,第一次需要执行一段命令,初始化ZooKeeper中的集群元数据。此时,Pulsar会自动创建一个SystemNamespace,并在里面创建一个Topic,完整的Topic如下所示:


persistent://pulsar/system/transaction_coordinator_assign



这是一个PartitionedTopic,默认有16个分区,每个分区就是一个独立的TC。我们可以通过--initial-num-transaction-coordinators参数来设置TC的数量。


TC与RM


接下来,我们看看服务端的事务组件,如下图所示:



  • TransactionMetadataStoreService 是Broker上事务的总体协调者,我们可以认为它是TC。


  • TransactionMetadataStore 被TC用来保存事务的元数据,如:新创建的事务,Producer注册上来的分区。这个接口有两个实现类,一个是把数据保存到Bookkeeper的实现,另外一个则直接把数据保存在内存中。


  • TransactionTimeoutTracker 服务端用于追踪超时的事务。


  • 各种Provider,它们都属于工厂类,无需特别关注。


  • TopicTransactionBuffer 生产者的RM,当事务消息被发送到Broker,RM作为代理会记录一些元数据,然后把消息存入原始Topic。内部包含了TopicTransactionBufferRecover和TransactionBufferSnapshotService,RM的元数据会被结构化为快照并定时刷盘,这两个对象分别负责快照的恢复和快照的保存。由于生产消息是以Topic为单位,因此每个Topic/Partition都会有一个。


  • PendingAckHandle 消费者的RM,由于消费是以订阅为单位的,因此每个订阅都有一个。


由于线上环境通常会使用持久化的事务,因此下面的原理都基于持久化实现。


所有事务相关的服务,在BrokerService启动时会初始化。TC主题中,每个Partition都是一个Topic,TransactionMetadataStoreService在初始化时,会根据当前Broker纳管的TC Partition,从Bookkeeper中恢复之前持久化的元数据。每个TC会保存以下元数据:


  • newTransaction。新建一个事务,返回一个唯一的事务ID对象。


  • addProducedPartitionToTxn。注册生产者要发送消息的Partition信息,用于后续TC通知对应节点的RM提交/回滚事务。


  • addAckedPartitionToTxn。注册消费者要消费消息的Partition信息,用于后续TC通知对应节点的RM提交/回滚事务。


  • endTransaction。结束一个事务,可以是提交、回滚或者超时等。


我们在初始化PulsarClient时,如果设置了enableTransaction=true,则Client初始化时,还会额外初始化一个TransactionCoordinatorClient。由于TC的Tenant、Namespace以及Topic名称都是固定的,因此TC客户端可以通过Lookup发现所有的Partition信息并缓存到本地,后续Client创建事务时,会轮询从这个缓存列表中选取下一个事务要使用的TC。


Producer事务管理


接下来我们会开启一个事务:


// 创建事务Transaction txn = pulsarClient        .newTransaction()        .withTransactionTimeout(1, TimeUnit.MINUTES)        .build()        .get();



上面这段代码中,会发送一个newTxn给某个TC,并得到一个Transaction对象。


开启事务时,TransactionCoordinatorClient会从缓存中选取一个TC,然后往选定的TC所在的Broker发送一个newTxn命令,命令的结构定义如下所示:


message CommandNewTxn {    required uint64 request_id = 1;    optional uint64 txn_ttl_seconds = 2 [default = 0];    optional uint64 tc_id = 3 [default = 0];}


由于命令中包含了TCID,因此即使多个TC被同一个Broker纳管也没有问题。Broker会根据TCID找到对应的TC并处理请求。


Producer发送消息之前,会先发送一个AddPartitionToTxn命令给Broker,只有成功后,才会继续发送真实的消息。事务消息到达Broker后,传递给TransactionBuffer进行处理。期间Broker必定会对消息进行去重校验,通过校验后,数据会保存到TransactionBuffer里,而TransactionBuffer只是一个代理(会保存一些元数据),它最终会调用原始Topic保存消息,TransactionBuffer在初始化时,构造方法需要传入原始Topic对象。我们可以把TransactionBuffer看作是Producer端的RM。


TransactionBuffer中会保存两种信息,一种是原始消息,直接使用Topic保存。另外一种是快照,快照中保存了Topic名称,最大可读位置信息(避免Consumer读到未提交的数据)、该Topic中已经中断(aborted)的事务列表。


其中,中断的事务,是由TC广播告知其他Broker节点的,TransactionBuffer接到信息后,会直接在原始Topic中写入一个abortMarker,标记事务已经中断,然后更新内存中的列表。abortMarker也是一条普通的消息,但是消息头中的元数据和普通消息不一样。这些数据保存在快照中,主要是为了Broker重启后数据能快速恢复。如果快照数据丢失,TopicTransactionBufferRecover会从尾到头读取Topic中的所有数据,每遇到一个abortMarker都会更新内存中的中断列表。如果有了快照,我们只需要从快照处的起点开始读即可恢复数据。


Consumer事务管理


消费者需要在消息确认时带上事务对象,标识使用事务Ack:


consumer.acknowledge(message, txn);


服务端每个订阅都有一个PendingAckHandle对象用于管理事务Ack信息,我们可以认为它是管理消费者数据的RM。当Broker发现消息确认请求中带有事务信息,则会把这个请求转交给对应的PendingAckHandle处理。


所有开启了事务的消息确认,不会直接修改游标上的MarkDeleted位置,而是先持久化到一个额外的Ledger中,Broker内存中也会缓存一份。这个Ledger由pendingAckStore管理,我们可以认为是Consumer RM的日志。


事务提交时,RM会调用消费者对应的Subscription,执行刚才所有的消息确认操作。同时,也会在日志Ledger中写入一个特殊的Marker,标识事务需要提交。在事务回滚时,也会先在日志中记录一个AbortMarker,然后触发Message重新投递。


pendingAckStore中保存的日志是redo log,该组件在初始化时,会先从日志Ledger中读取所有redo log,从而在内存中重建先前的消息确认信息。因为消息确认是幂等操作,如果Broker不慎宕机,只需要把redo log中的操作重新执行一遍。当订阅中的消息被真正确认掉后,pendingAckStore中对应的redo log也可以被清理了。清理方式很简单,只需要移动pendingAckStore中Ledger的MarkDelete位置即可。


再谈TC


所有的事务提交、回滚,由于Client端告知TC,或者由于超时TC自动感知。TC的日志中保存了Producer的消息要发往哪些Partition,也保存了Consumer会Ack哪些Partition。RM分散在每个Broker上,记录了整个事务中发送的消息和要确认的消息。当事务结束时,TC则以TCID为key,找到所有的元数据,通过元数据得知需要通知哪些Broker上的RM,最后发起广播,通知这些Broker上的RM,事务需要提交/回滚。


尾声


Pulsar中的设计细节非常多,由于篇幅有限,作者会整理一系列的文章进行技术分享,敬请期待。如果各位希望系统性地学习Pulsar,可以购买作者出版的新书《深入解析Apache Pulsar》。


活动推荐

林琳作为本次 Pulsar Meetup 深圳 2024 大会讲师之一,诚邀大家参与本次大会。

图片

由 AscentStream 谙流科技和腾讯云中间件联合主办的 Pulsar Meetup 深圳 2024 将于 2024年04月27日 14:00-18:00 在深圳腾讯大厦 2 楼多功能厅,精彩呈现,期待大家多多报名!

同时我们也开展同步直播,大家可以关注 AscentStream 谙流科技视频号、腾讯云中间件、示说、上海开源信息技术协会、开源江湖、和Powerdata等视频号等参与观看。

大会同时准备了 Pulsar 相关书籍和各类礼品,在线上和线下同期发送,希望大家积极参与,踊跃提问。

扫码报名

本次活动采用线下演讲+线上直播形式,线下人数限制在 200 人,先报先得,赶紧来参加吧。

扫描下方二维码,免费报名线下活动,也可从我们合作伙伴处报名。

扫码报名

添加小助手,给您拉入大会交流群。

添加小助手入群

大会扩展阅读


  1. 邀请函 | Pulsar Meetup 深圳 2024

  2. Pulsar Meetup 深圳 2024 讲师和议题介绍

  3. Pulsar Meetup 深圳 2024 会务介绍





本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部