文档章节

RabbitMQ之SSM框架整合xml配置

木九天
 木九天
发布于 08/22 15:28
字数 1064
阅读 77
收藏 1

      前提:jdk1.8,本博客使用的是RabbitTemplate模版,用封装好的方法,不再使用 ==》入门级教学

      还有一个重点,自己一定要会使用rabbitmq服务器,自己创建exchange、queue等,不然使用该博客的话,会报错的。

      两种方法:topic模式以及延迟队列的使用

1、pom

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>4.5.16</version>
</dependency>

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.11.RELEASE</version>
</dependency>

2、application.properties

# rabbitmq 消息配置
rabbitmq.addresses=localhost:5672
rabbitmq.virtual-host=/
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.channel-cache-size=50
rabbitmq.concurrentConsumers=3
rabbitmq.maxConcurrentConsumers=10
# 确认方式 MANUAL 手动,AUTO 自动,NONE 自动确认
rabbitmq.acknowledgeMode=MANUAL
# 线程池数量 = 并发数 * 监听数
rabbitmq.task-executor.pool-size=100

3、spring-rabbit.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
   http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
   http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <!--启用注解监听消息-->
    <rabbit:annotation-driven/>

    <!-- 配置连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="localhost"
                               port="5672"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}" />

    <!-- 定义mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 声明队列 -->
    <rabbit:queue name="topicqueue2" auto-declare="false" durable="true"></rabbit:queue>
    <rabbit:queue name="queue_seckill" auto-declare="false" durable="true"></rabbit:queue>
    <rabbit:queue name="dlx_delay_queue" auto-declare="false" durable="true">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="6000" value-type="java.lang.Long"/>
            <entry key="x-dead-letter-exchange" value="dlx_delay_exchange" />
            <entry key="x-dead-letter-routing-key" value="immediate_road" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:queue name="immediate" auto-declare="false" durable="true">
    </rabbit:queue>
    <!--producer-->

    <!-- 定义交换机绑定队列(通配符模式) #匹配一个或多个词  *匹配一个词 -->
    <rabbit:topic-exchange name="IExchange" id="IExchange">
        <rabbit:bindings>
            <rabbit:binding queue="topicqueue2" pattern="lazy.#"/>
            <rabbit:binding queue="queue_seckill" pattern="seckill.#"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--延迟队列-->
    <rabbit:direct-exchange name="dlx_delay_exchange" durable="true" auto-declare="false">
        <rabbit:bindings>
            <rabbit:binding queue="dlx_delay_queue" key="dlx_delay_road" />
            <rabbit:binding queue="immediate" key="immediate_road" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter"
          class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- 定义模版 -->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />


    <!-- 定义消费者 -->
    <bean name="delayConsumer" class="com.platform.mq.DelayListener" />
    <bean name="seckillConsumer" class="com.platform.mq.SeckillHandler" />

    <!-- 定义消费者监听队列 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="seckillConsumer" queues="queue_seckill" />
        <rabbit:listener ref="delayConsumer" queues="immediate" />
    </rabbit:listener-container>

    <!--消息监听容器,配合注解监听消息-->
    <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--并发消费者数量-->
        <property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers:3}"/>
        <!--最大数量-->
        <property name="maxConcurrentConsumers" value="${rabbitmq.maxConcurrentConsumers:10}"/>
        <!--消息转换-->
        <property name="messageConverter" ref="jsonMessageConverter"/>
        <!--任务线程池-->
        <property name="taskExecutor">
            <task:executor id="amqpTaskExecutor" pool-size="${rabbitmq.task-executor.pool-size:100}"/>
        </property>
        <!--手动确认-->
        <property name="acknowledgeMode" value="MANUAL"/>
    </bean>
</beans>

4、监听器

import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;

/**
 * @Author:MuJiuTian
 * @Description: RabbitMq延迟队列 https://blog.csdn.net/m912595719/article/details/83787486
 * ChannelAwareMessageListener(Message memssage,Channel channel) MessageListener(Message message)
 * @Date: Created in 下午4:17 2019/8/12
 */

public class DelayListener implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message) {
        try {
            JsonNode jsonData = MAPPER.readTree(message.getBody());
            System.out.println("延迟队列时间为:"+DateUtil.format(new Date(), DateFormat.getDateTimeInstance()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.platform.service.SeckillService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @Author:MuJiuTian
 * @Description: 秒杀消费者消费消息,监听执行业务逻辑处理
 * @Date: Created in 下午5:01 2019/8/14
 */
public class SeckillHandler implements MessageListener {

    @Autowired
    SeckillService seckillService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message) {
        try {
            //队列中继续执行秒杀
            JsonNode jsonData = MAPPER.readTree(message.getBody());
            String goodsId = jsonData.get("goodsId").asText();
            int productId = jsonData.get("productId").asInt();
            int userId    = jsonData.get("userId").asInt();
            int sellerNum = jsonData.get("sellerNum").asInt();
            //开始秒杀
            seckillService.seckillRedis(goodsId,productId,sellerNum,userId);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5、实体类

public class Mail implements Serializable {
    private static final long serialVersionUID = -8140693840257585779L;
    private String mailId;
    private String country;
    private Double weight;


    public Mail() {
    }

    public Mail(String mailId, String country, double weight) {
        this.mailId = mailId;
        this.country = country;
        this.weight = weight;
    }

    public String getMailId() {
        return mailId;
    }

    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
                + weight + "]";
    }
}

6、controller

/**
 * topic:通配符模式
 */
@GetMapping(value = "/test7")
public void test11(){
    Mail mail = new Mail("21","China",27.2);
    System.out.println("topic模式发送数据到消息队列"+DateUtil.format(new Date(), DateFormat.getDateTimeInstance()));
    rabbitTemplate.convertAndSend("IExchange","lazy.dtb",mail);
}



/**
 * 死信队列 long等待时间,目前测试为:自动消费
 */
@GetMapping(value = "/test8")
public void test13(long time) throws IOException {
    Mail mail = randomMail();
    System.out.println("延迟队列:dlx方式"+DateUtil.format(new Date(), DateFormat.getDateTimeInstance()));
    rabbitTemplate.convertAndSend("dlx_delay_exchange","dlx_delay_road", mail, message -> {
        message.getMessageProperties().setExpiration(time + "");
        return message;
    });
}


/**
 * 随机创建一个Mail实体对象,供接口测试
 */
public static Mail randomMail() {
    Mail mail = new Mail();
    mail.setMailId(new Random().nextInt(100)+"");
    mail.setCountry("China");
    mail.setWeight(new Random().nextDouble());
    return mail;
}

      ok!

© 著作权归作者所有

木九天

木九天

粉丝 187
博文 224
码字总数 171159
作品 0
海淀
程序员
私信 提问
SSM框架下配置rabbitMQ

一、准备工作: 第一步:下载erlang http://www.erlang.org/downloads/19.3 第二步:下载rabbitMQ http://www.rabbitmq.com/download.html rabbitMQ安装完成后,打开rabbitMQ控制台 输入:r...

嘴角轻扬30
2018/11/15
217
0
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
2018/09/13
417
0
SpringBoot笔记(十一)RabbitMQ

安装Erlang RabbitMQ基于Erlang,所以得先安装Erlang http://www.erlang.org/downloads 根据自己的系统选择下载,安装完了,配置一下path即可 windows默认安装路径: 验证: 有时候可能需要重...

世外大帝
2018/04/25
0
0
RabbitMQ集群整合SpringBoot2.x

RabbitMQ相信大家已经再熟悉不过了,作为业界四大主流消息中间件之一(Apache RocketMQ、Apache Kafka、Apache ActiveMQ、RabbitMQ),它具有非常好的性能和可靠性的集群模式,不仅仅在各大互...

阿神_
2018/07/24
0
0
在CentOS上安装rabbitmq

转自:http://flyingdutchman.iteye.com/blog/1887283 这文章写得很好,除了安装软件编译时间比较长之外,安装这个几乎没出现什么错误。现在去配置下rabbitmq,马上就可以使用了。 在本节中我...

mac_zhao
2014/09/28
201
0

没有更多内容

加载失败,请刷新页面

加载更多

怎样在磁盘上查找MySQL表的大小?这里有答案

导读 我想知道 MySQL 表在磁盘上占用多少空间,但看起来很琐碎。不应该在 INFORMATION_SCHEMA.TABLES 中提供这些信息吗?没那么简单! 我想知道 MySQL 表在磁盘上占用多少空间,但看起来很琐碎...

问题终结者
24分钟前
5
0
jQuery load() 方法实现加载远程数据

jQuery load() 方法是简单但强大的 AJAX 方法。load() 方法从服务器加载数据,并把返回的数据放入被选元素中。 语法: $(selector).load(URL,data,callback);必需的 URL 参数规定您希望加载的...

前端老手
26分钟前
5
0
Spring Boot缓存实战 Redis 设置有效时间和自动刷新缓存-2

问题 上一篇Spring Boot Cache + redis 设置有效时间和自动刷新缓存,时间支持在配置文件中配置,说了一种时间方式,直接扩展注解的Value值,如: @Override@Cacheable(value = "people#${s...

xiaolyuh
34分钟前
9
0
怎样在磁盘上查找MySQL表的大小?这里有答案

我想知道 MySQL 表在磁盘上占用多少空间,但看起来很琐碎。不应该在 INFORMATION_SCHEMA.TABLES 中提供这些信息吗?没那么简单! 我想知道 MySQL 表在磁盘上占用多少空间,但看起来很琐碎。不应...

Linux就该这么学
58分钟前
5
0
Redis

一、Redis支持的几种数据类型:字符串、List、SET、HASH、ZSET 二、Redis的缓存技术主要是为了降低关系数据库的负载并减少网站成本 三、在Redis里面,被MULTI命令和EXEC命令包围的所有命令会...

BobwithB
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部