文档章节

Redis实现消息队列

voole
 voole
发布于 2016/08/07 00:02
字数 1215
阅读 2526
收藏 26

基于Redis消息队列-实现短信服务化

1.Redis实现消息队列原理

常用的消息队列有RabbitMQ,ActiveMQ,个人觉得这种消息队列太大太重,本文介绍下基于Redis的轻量级消息队列服务。 
一般来说,消息队列有两种模式,一种是发布者订阅模式,另外一种是生产者和消费者模式。Redis的消息队列,也是基于这2种原理的实现。 
发布者和订阅者模式:发布者发送消息到队列,每个订阅者都能收到一样的消息。 
生产者和消费者模式:生产者将消息放入队列,多个消费者共同监听,谁先抢到资源,谁就从队列中取走消息去处理。注意,每个消息只能最多被一个消费者接收。

2.Redis消息队列使用场景

在我们的项目中,使用消息队列来实现短信的服务化,任何需要发送短信的模块,都可以直接调用短信服务来完成短信的发送。比如用户系统登录注册短信,订单系统的下单成功的短信等。

3.SpringMVC中实现Redis消息队列

因为我们短信只需要发送一次,所以我们使用的是消息队列的生产者和消费者模式。

3.1引入Maven依赖

引入Redis相应的maven依赖,这里需要spring-data-redis和jedis

    //pom.xml
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
        <version>1.6.0.RELEASE</version>
    </dependency>

      <!-- jedis -->
      <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
             <version>2.5.1</version>
        </dependency>

3.2配置redis生成者消费者模式

    //applicationContext-redis.xml
    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:redis="http://www.springframework.org/schema/redis"
    xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
                            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd     
                            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
                            http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop.xsd
                            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                            http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
    <description>spring-data-redis配置</description>

    <bean id="redisConnectionFactory"
        class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="hostName" value="${redis.host}"></property>
        <property name="port" value="${redis.port}"></property>
        <property name="usePool" value="true"></property>
    </bean>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
        <property name="connectionFactory" ref="redisConnectionFactory"></property>
    </bean>

    <bean id="jdkSerializer"
        class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />

    <bean id="smsMessageListener"
        class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="smsMessageDelegateListener" />
        <property name="serializer" ref="jdkSerializer" />
    </bean>

    <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>

    <redis:listener-container>
        <redis:listener ref="smsMessageListener" method="handleMessage"
            serializer="jdkSerializer" topic="sms_queue_web_online" />
    </redis:listener-container>

    <!-- jedis -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxIdle" value="300" /> <!-- 最大能够保持idel状态的对象数  -->
        <property name="maxTotal" value="60000" /> <!-- 最大分配的对象数 -->
        <property name="testOnBorrow" value="true" /> <!-- 当调用borrow Object方法时,是否进行有效性检查 -->
    </bean>

    <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
        <constructor-arg index="0" ref="jedisPoolConfig" />
        <constructor-arg index="1" value="${redis.host}" />
        <constructor-arg index="2" value="${redis.port}" type="int" />
    </bean>

</beans>

主要的配置说明: 
1.序列化:一般我们向Redis发送一个消息定义的Java对象,这个对象需要序列化。这里使用JdkSerializationRedisSerializer:

<bean id="jdkSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />

2.发送者:

<bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>

3.监听者:

    <bean id="smsMessageListener"
        class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="smsMessageDelegateListener" />
        <property name="serializer" ref="jdkSerializer" />
    </bean>
    <redis:listener-container>
        <redis:listener ref="smsMessageListener" method="handleMessage"
            serializer="jdkSerializer" topic="sms_queue_web_online" />
    </redis:listener-container>

smsMessageListener:消息监听器 
redis:listener-container:定义消息监听,method:监听消息执行的方法,serializer:序列化,topic:监听主题(可以理解为队列名称)

3.3代码实现

1.定义短信消息对象SmsMessageVo

public class SmsMessageVo implements Serializable {
    //id
    private Integer smsId;

    //手机号
    private String mobile;

    //类型,1:验证码 2:订单通知
    private Byte type;

    //短信创建时间
    private Date createDate;

    //短信消息处理时间
    private Date processTime;

    //短信状态,1:未发送 2:发送成功 3:发送失败
    private Byte status;

    //短信内容
    private String content;

    //省略setter和getter方法
    ...

2.定义消息队列发送对象SendMessage

//SendMessage.java
public class SendMessage {

    private RedisTemplate<String, Object> redisTemplate;


    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }



    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }



    public void sendMessage(String channel, Serializable message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

3.发送消息

    String smsContent = templateToContent(template.getContent(),
                    regMsgCode);
    SmsMessageVo smsMessageVo = new SmsMessageVo();
    smsMessageVo.setMobile(mobile);
    smsMessageVo.setType((byte) SmsType.VERIFICATION.getType());
    smsMessageVo.setChannelId(1);
    smsMessageVo.setContent(smsContent);
    smsMessageVo.setCreateDate(new Date());
    smsMessageVo.setStatus((byte) SmsSendStatus.TO_SEND.getType());
    smsMessageVo.setTemplateId(1);

    //先把待发送的短信存入数据库
    SmsQueue smsQueue = new SmsQueue();
    BeanUtils.copyProperties(smsQueue, smsMessageVo);
    smsQueueService.addSmsQueue(smsQueue);

    //异步发送短信到redis队列
    sendMessage.sendMessage(Constants.REDIS_QUEUE_SMS_WEB, smsMessageVo);
    //Constants.REDIS_QUEUE_SMS_WEB = "sms_queue_web_online",和applicationContext-redis中topic配置一样

4.监听消息

//SmsMessageDelegateListener.java
@Component("smsMessageDelegateListener")
public class SmsMessageDelegateListener {

    @Autowired
    private SmsQueueService smsQueueService;

    //监听Redis消息
    public void handleMessage(Serializable message){
        if(message instanceof SmsMessageVo){
            SmsMessageVo messageVo = (SmsMessageVo) message;

            //发送短信
            SmsSender smsSender = SmsSenderFactory.buildEMaySender();
            smsSender.setMobile(messageVo.getMobile());
            smsSender.setContent(messageVo.getContent());
            boolean sendSucc = false;
            //判断短信类型
            //验证码短信
            if(messageVo.getType() == (byte)SmsType.VERIFICATION.getType()){
                sendSucc = smsSender.send();
            }


            if(!sendSucc){
                return;
            }

            // 异步更新短信表状态为发送成功
            final Integer smsId = messageVo.getSmsId();
            Executor executor = Executors.newSingleThreadExecutor();
            executor.execute(new Runnable() {
                public void run() {
                    SmsQueue smsQueue = new SmsQueue();
                    smsQueue.setSmsId(smsId);
                    smsQueue.setStatus((byte)SmsSendStatus.SEND.getType());
                    smsQueue.setProcessTime(new Date());
                    smsQueueService.updateSmsQueue(smsQueue);
                }
            });

        }
    }
}

4.总结

下面使用一张流程图,来总结Redis消息队列和短信服务。 
这里写图片描述

本文转载自:http://blog.csdn.net/JaCman/article/details/51246449

共有 人打赏支持
voole

voole

粉丝 13
博文 79
码字总数 3761
作品 1
海淀
加载中

评论(2)

热血鸣人
这个是发布者和订阅者模式,根本不是生产者消费者模式。发布消息后,所有的订阅者都能接收到消息。
老唱片之桥-
老唱片之桥-
怎么调用呢 有demo吗
RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

Edwyn王
2015/05/20
0
0
RabbitMQ与Redis队列对比

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

凯文加内特
2015/05/20
0
0
转:Redis消息队列的若干实现方式

微信号:neihanrukou Redis消息队列的若干实现方式 最近忙着用Redis实现一个消息通知系统,今天大概总结了一下技术细节,其中演示代码如果没有特殊说明,使用的都是PhpRedis扩展来实现的。 内...

fzxu_05
2013/03/04
0
0
用redis实现消息队列(实时消费+ack机制)

消息队列 首先做简单的引入。 MQ主要是用来: 解耦应用、 异步化消息 流量削峰填谷 目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。 网上的资源对各种情况都有详...

美的让人心动
05/03
0
0
Kafka与常见消息队列的对比

Kafka与常见消息队列的对比 RabbitMQ Erlang编写 支持很多的协议:AMQP,XMPP, SMTP, STOMP 非常重量级,更适合于企业级的开发 发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持...

无声胜有声
2017/07/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

kernel version does not match DSO version

错误信息: kernel version 384.11 does not match DSO version 384.130.0 原因是: cuda driver版本太低,不匹配DSO 简单有效的修复方法,升级nvidia driver, 步骤如下: 1. google seach ...

刘小米
今天
0
0
maven坐标和依赖

一、maven坐标详解 <groupId>com.fgt.club</groupId><artifactId>club-common-service-facade</artifactId><version>3.0.0</version><packaging>jar</packaging> maven的坐标元素说......

老韭菜
今天
1
0
springmvc-servlet.xml配置表功能解释

问:<?xml version="1.0" encoding="UTF-8" ?> 答: xml version="1.0"表示是此xml文件的版本是1.0 encoding="UTF-8"表示此文件的编码方式是UTF-8 问:<!DOCTYPE beans PUBLIC "-//SPRING//......

隐士族隐逸
今天
1
0
基于TP5的微信的公众号获取登录用户信息

之前讲过微信的公众号自动登录的菜单配置,这次记录一下在TP5项目中获取自动登录的用户信息并存到数据库的操作 基本的流程为:微信设置自动登录的菜单—>访问的URL指定的函数里获取用户信息—...

月夜中徘徊
今天
0
0
youTrack

package jetbrains.teamsys.license.runtime; 计算lis package jetbrains.ring.license.reader; 验证lis 安装后先不要生成lis,要把相关文件进行替换 ring-license-checker-1.0.41.jar char......

max佩恩
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部