文档章节

基于redis的延迟消息队列设计

peachyy
 peachyy
发布于 2017/08/21 14:32
字数 1474
阅读 355
收藏 1

需求背景

  • 用户下订单成功之后隔20分钟给用户发送上门服务通知短信
  • 订单完成一个小时之后通知用户对上门服务进行评价
  • 业务执行失败之后隔10分钟重试一次

    类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。

队列设计

目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件。

开发前需要考虑的问题?

  • 及时性 消费端能按时收到
  • 同一时间消息的消费权重
  • 可靠性 消息不能出现没有被消费掉的情况
  • 可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
  • 可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
  • 高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
  • 消费端如何消费

    当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。

简单定义一个消息数据结构

 

private String topic;/***topic**/

private String id;/***自动生成 全局惟一 snowflake**/

private String bizKey;

private long delay;/***延时毫秒数**/

private int priority;//优先级

private long ttl;/**消费端消费的ttl**/

private String body;/***消息体**/

private long createTime=System.currentTimeMillis();

private int status= Status.WaitPut.ordinal();

运行原理:

  1. Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
  2. id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
  3. 使用timer实时监控zset有序列表中top 10的数据 。 如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
  4. 客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。

客户端

因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。

  1. 添加延时消息添加成功之后返回消费唯一ID POST /push {…..消息体}
  2. 删除延时消息 需要传递消息ID GET /delete?id=
  3. 恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
  4. 恢复单个延时消息 需要传递消息ID GET /reStore/id
  5. 获取消息 需要长连接 GET /get/topic

用nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。

目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。

消息可恢复

实现恢复的原理 正常情况下一般都是记录日志,比如mysqlbinlog等。

这里我们直接采用mysql数据库作为记录日志。

目前打算创建以下2张表:

  1. 消息表 字段包括整个消息体
  2. 消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip

定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key。

举个栗子

假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。

当然恢复单个任务也可以这么干。

关于高可用

分布式协调还是选用zookeeper吧。

如果有多个实例最多同时只能有1个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。

最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。

扩展

支持zset队列个数可配置 避免大数据带来高延迟的问题。

目前存在日志和redis元数据有可能不一致的问题 如mysql挂了,写日志不会成功。

设计图:

更多信息请关注公众号techxxx

笑笑笑技术圈

本文转载自:http://blog.seoui.com/2017/08/19/delayqueue/

peachyy

peachyy

粉丝 3
博文 19
码字总数 5450
作品 0
江北
产品经理
私信 提问
基于Dynomite的分布式延迟队列

最近看了Dyno-queues分布式延迟队列的源码,发现了一些不错的技巧,而本文是对Dyno-queues架构精华的总结。 本文是根据 https://medium.com/netflix-techblog/distributed-delay-queues-bas...

小程故事多
2018/12/05
0
0
基于 golang 和 redis 实现的简易队列 - gmq

概述 是基于提供的特性,使用语言开发的一个简单易用的队列;关于 redis 使用特性可以参考之前本人写过一篇很简陋的文章 Redis 实现队列;的灵感和设计是基于有赞延迟队列设计,文章内容清晰而...

domorecode
07/14
1K
0
也谈淘点点60s短信订单的架构设计

看到了这个 http://www.oschina.net/question/926166_2137672 然后有人写了博客还分析设计了一下 http://my.oschina.net/u/926166/blog/522227 本人最近对架构设计较感兴趣,下面是我的设计,...

xue777hua
2015/10/27
7.2K
52
【开源】.net 分布式架构之业务消息队列

开源QQ群: .net 开源基础服务 238543768 开源地址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ ## 业务消息队列 ## 业务消息队列是应用于业务的解耦和分离,应具备分布式,高可靠性,...

车江毅
2015/10/12
4.5K
6
消息队列的延迟基准测试

在一年半以前,我发表了一篇叫剖析消息队列的博客,里面分析了一些不同的消息系统,并且做了一些性能测试。这篇文章现在看起来有些天真,并且存在很多问题,但是这却是我第一次做这样的系统测...

oschina
2016/02/14
4.8K
4

没有更多内容

加载失败,请刷新页面

加载更多

golang-字符串-地址分析

demo package mainimport "fmt"func main() {str := "map.baidu.com"fmt.Println(&str, str)str = str[0:5]fmt.Println(&str, str)str = "abc"fmt.Println(&s......

李琼涛
今天
4
0
Spring Boot WebFlux 增删改查完整实战 demo

03:WebFlux Web CRUD 实践 前言 上一篇基于功能性端点去创建一个简单服务,实现了 Hello 。这一篇用 Spring Boot WebFlux 的注解控制层技术创建一个 CRUD WebFlux 应用,让开发更方便。这里...

泥瓦匠BYSocket
今天
8
0
从0开始学FreeRTOS-(列表与列表项)-3

FreeRTOS列表&列表项的源码解读 第一次看列表与列表项的时候,感觉很像是链表,虽然我自己的链表也不太会,但是就是感觉很像。 在FreeRTOS中,列表与列表项使用得非常多,是FreeRTOS的一个数...

杰杰1号
今天
9
0
Java反射

Java 反射 反射是框架设计的灵魂(使用的前提条件:必须先得到代表的字节码的 Class,Class 类 用于表示.class 文件(字节码)) 一、反射的概述 定义:JAVA 反射机制是在运行状态中,对于任...

zzz1122334
今天
7
0
聊聊nacos的LocalConfigInfoProcessor

序 本文主要研究一下nacos的LocalConfigInfoProcessor LocalConfigInfoProcessor nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/config/impl/LocalConfigInfoProcessor.java p......

go4it
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部