redis(4)、基于redis 构建异步消息系统
redis(4)、基于redis 构建异步消息系统
haoran_10 发表于1年前
redis(4)、基于redis 构建异步消息系统
  • 发表于 1年前
  • 阅读 88
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 学生专属云服务套餐 10元起购>>>   

 

 

一般消息队列有两种场景

  • 生产者消费者模式 :多个生产者生产消息放在消息队列里,多个消费者同时监听消息队列,谁先抢到消息,谁先处理。每个消息只能被消费一次。
  • 发布者订阅者模式:发布者发布消息到消息队列里,多个监听者同时监听该消息队列,都会同时收到同一份消息。即每个消息被每个监听者消费一次。

 

一、构建生产者消费者模式

(1)构建生产者消费者模式,可以使用list去实现

主要使用LPUSH或者RPUSH插入数据,使用LPOP或者RPOP取出数据并且删除。



 

 生产者伪代码为:

string key = "like_list";
string msg = "user_1";
redis.lpush(key,msg);

 消费者伪代码为:

string key = "like_list";
while(true){
     string msg = redis.brpop(BLOCK_TIMEOUT, key );
     if (msg == null) continue;
     processMsg(msg );
}

(2)构建具有优先级的生产者消费者模式,使用sorted sets实现

使用ZADD插入带有SCORE的数据,也就是优先级参数。使用ZREVRANGE 取出数据,并且zrem删除数据,只有在取出成功,并且删除成功的情况下,认为该消息被正确取出。

sorted sets 没有像list中的pop命令一样,取出并删除。


二、构建发布者订阅者模式

使用PUBSUB构建发布者订阅者模式

PUBLISH channel message   往channel中发布消息

SUBSCRIBE channel [channel ...] 客户端订阅某个channgel,可以同时订阅多个channgel

UNSUBSCRIBE [channel [channel ...]] 客户端取消订阅channel

 

发布者伪代码:

string channel = "order_mq";
string msg = "user_1_1001";
redis.publish(channel,msg);

 

订阅者伪代码:

string channel = "order_mq";
redis.subscribe(channel);
while(true){
    redis.onmessage(new MessageListsner(){
              public void onMessage(string channel,string msg){
                       doProcessMsg(msg);
              }
    });
}

 


三、结束语

(1)、基于redis实现的消息系统,不同于activeMQ或者其他同类的消息中间件产品,没有提供持久化等一些特性,所以消息一旦丢失,就不能重现。所在监听者要启动早于生产者。

(2)、基于redis实现的消息系统,也没有提供配置多个任务去同时处理消息,在程序中可以采用线程池,多线程的方式去处理消息。

 

共有 人打赏支持
粉丝 27
博文 88
码字总数 80846
×
haoran_10
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: