文档章节

分布式场景下如何保证消息队列实现最终一致性

中关村的老男孩
 中关村的老男孩
发布于 06/17 17:05
字数 1096
阅读 28
收藏 0

考虑一个分布式场景中一个常见的场景:服务A执行某个数据库操作成功后,会发送一条消息到消息队列,现在希望只有数据库操作执行成功才发送这条消息。下面是一些常见的作法:

1. 先执行数据库操作,再发送消息

public void purchaseOrder() {
    orderDao.save(order);
    messageQueue.send(message);
}

有可能order新增成功,发送消息失败。最终形成不一致状态。

2. 先发送消息,再执行数据库操作

public void purchaseOrder() {
    messageQueue.send(message);
    orderDao.save(order);
}

有可能消息发送成功,而order新增失败,从而形成不一致状态。

3. 在数据库事务中,先发送消息,再执行数据库操作

@Transactional public void purchaseOrder() {
    messageQueue.send(message);
    orderDao.save(order);
}

这里同样无法保证一致性。如果数据库操作成功,然而消息已经发送了,无法进行回滚。

4. 在数据库事务中,先执行数据库操作,再发送消息

@Transactional public void purchaseOrder() {
    orderDao.save(order);
    messageQueue.send(message);
}

这种方案成功与否,取决于消息队列是否拥有应答机制和事务机制。

应答机制表示producer发送消息后,消息队列能够返回response从而证明消息是否插入成功。

如果消息队列拥有应答机制,将上面的代码改写为:

@Transactional public void purchaseOrder() {
    orderDao.save(order); try{
        kafkaProducer.send(message).get();
    } catch(Exception e) throw new RuntimeException("Fail to send message");
    }

这段代码表示如果发送发收到消息队列错误的response,就抛出一个RuntimeException。那么消息发送失败,能够造成数据库操作的回滚。这个方案看似可行,然而存在这样一种情况,如果消息发送成功,而消息队列由于网络原因没有即时返回response,此时消息发送方由于没有及时收到应答从而认为消息发送失败了,因此消息发送方的数据库事务回滚了,然而消息的确已经插入成功,从而造成了最终不一致性。

上面的不一致性可以通过消息的事务机制解决。

事务机制表示消息队列中的消息是否拥有状态,从而决定消费者是否消费该条消息。

Alibaba旗下的开源消息队列RocketMQ以高可用性闻名,它是最早支持事务消息的消息队列。Kafka从版本0.11开始也支持了事务机制。

RoketMQ的事务机制是将消息标记为Prepared状态或者Confirmed状态。处于Prepared状态的消息对consumer不可见。

而Kafka通过Transaction Marker将消息标记为Uncommited或Commited状态。Consumer通过配置isolation-levelread_committedread_uncommitted来决定对哪种类型的消息可见。

5. 消息队列不支持事务消息

如果消息队列不支持事务消息,那么我们的解决方案是,新增一张message表,并开启一个定时任务扫描这张message表,将所有状态为prepared的message发送给消息队列,发送成功后,将message状态置为confirmed。

代码如下:

@Transactional public void purchaseOrder() {
    orderDao.save(order);
    messageService.save(message);
}

此时插入order和插入message的逻辑处于同一个数据库事务,通过后台的定时程序不断扫描message表,因此一定能够保证消息被成功投递到消息消费方。

这个方案存在的一个问题是,有可能后台任务发送消息成功后宕机了,从而没有来得及将已发送的message状态置为confirmed。因此下一次扫描message表时,会重复发送该条消息。这就是at least once delivery

由于at least once delivery的特性,consumer有可能收到重复的数据。此时可以在consumer端建立一张message_consume表,来判断消息是否已经消费过,如果已经消费过,那么就直接丢弃该消息。

© 著作权归作者所有

中关村的老男孩
粉丝 39
博文 58
码字总数 135493
作品 0
海淀
架构师
私信 提问
保证分布式系统数据一致性的6种方案

问题的起源 在电商等业务中,系统一般由多个独立的服务组成,如何解决分布式调用时候数据的一致性? 具体业务场景如下,比如一个业务操作,如果同时调用服务 A、B、C,需要满足要么同时成功;...

huojiao2006
2017/03/06
0
0
一站式解决分布式 SOA 事务问题 - EasyTransaction

一、由来 这个框架是结合齐牛金融公司之前遇到的分布式事务场景以及 支付宝程立分享的一个PPT<大规模SOA系统的分布式事务处理>而设计实现,意在解决之前公司对于每个分布式事务场景中都自行重...

skyesx
2017/03/29
0
2
分布式之《保证分布式系统数据一致性的6种解决方案》

原文:http://weibo.com/ttarticle/p/show?id=2309403965965003062676 编者按:本文由「高可用架构后花园」群讨论整理而成。 有人的地方,就有江湖 有江湖的地方,就有纷争 问题的起源 在电商...

无信不立
2016/04/20
0
0
保证分布式系统数据一致性的6种方案(转载)

保证分布式系统数据一致性的6种方案(转载) 问题的起源 在电商等业务中,系统一般由多个独立的服务组成,如何解决分布式调用时候数据的一致性? 具体业务场景如下,比如一个业务操作,如果同时...

tantexian
2016/06/19
359
0
保证分布式系统数据一致性的6种方案

保证分布式系统数据一致性的6种方案问题的起源 在电商等业务中,系统一般由多个独立的服务组成,如何解决分布式调用时候数据的一致性? 具体业务场景如下,比如一个业务操作,如果同时调用服...

秋风醉了
2016/05/12
656
0

没有更多内容

加载失败,请刷新页面

加载更多

抽象同步队列AQS——AbstractQueuedSynchronizer锁详解

AQS——锁的底层支持 谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)! 类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资...

须臾之余
今天
3
0
springboot配置百度UEditor 富文本详解

富文本简介 UEditor是由百度web前端研发部开发所见即所得富文本web编辑器,具有轻量,可定制,注重用户体验等特点,开源基于MIT协议,允许自由使用和修改代码... 准备工作 ueditor需要单独文...

wotrd
昨天
4
0
mysql 5.7之my.cnf配置大全

[client]port = 3306socket = /tmp/mysql.sock[mysqld]###############################基础设置######################################Mysql服务的唯一编号 每个mysql服务...

Online_Reus
昨天
3
0
MAVEN打包时引入外部链接的包

1.项目引入了ORACLE的jar包,MAVEN配置如下 2.打jar包的时候需要指定下main入口函数mainClass <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> ......

Cobbage
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部