如何利用RabbitMQ生产一个简单的消息

原创
2020/05/08 22:31
阅读数 38

最近业务中有有这样一个场景,就是用户在商城下单之后,如果30分钟没有付款,那么就需要将这个订单处理掉,要么直接删除,要么直接标识为失效状态,为什么要这么做?

  • 1、库存,用户在下单之后,会锁定一个库存,如果用户一直不支付,那么就会占用库存,影响别的用户购买,
  • 2、随着业务的发展,用户量的增加,我们的订单数据会越来越多,那么我们要及时的清理无效的订单,提升系统的性能;

曾经的纯洁无瑕

首先说下,我曾经那些纯洁无瑕的想法,第一次看到这种需求的时候,如果要清理失效的订单,那我直接写一个定时任务,5分钟或者10分钟跑一次,删除过期的订单,增加库存。

定时任务确实可以解决上面的问题,但是存在一个现实的问题,那就是数据库压力,甚至影响整个系统的性能,如果你是几百、几千个订单还好,要是十几万、甚至上百万,那么此方法肯定是行不通的。

进阶版本

既然发现了上面的方法不行,那么就重新想办法,有什么办法可以不用查询数据库,就可以知道哪些订单快要过期,再这样的思考下,我用Redis做了一个消息队列,当生成订单的时候,生成一个延迟10分钟消息,10分钟结束的时候,就会从Redis的队列中将这个订单取出来,然后这样我就可以将这个订单删除,增加库存。

Redis做消息队列存在的问题

之前用一个全新的Redis做消息队列,存储的数据也不多,感觉还好,当随着业务增加,Redis存储超过90%的时候,大量的消息没有被消费,就是消息丢失很严重。那么这样肯定是不行的。

删除订单,增加库存这是不能有太多误差的事情,所以Redis消息队列已经不能满足我的需求,那么就需要可靠性高的消息队列,也就是我们这次要介绍的RabbitMQ。

RabbitMQ安装与面板介绍

这里我就不跟大家介绍如何安装RabbitMQ了,网上其实有很多这种教程,所以大家自行搜索吧。重点要跟大家说下,RabbitMQ的面板,我们的消息队列,以及消息都是可以在面板上看到的。我是用的MQ的版本是3.8,各个版本之间的面板多多少少可能有点不太一样。

第一次接触的话,我们不要想着全部我一次性都看懂,都知道是干嘛的,我觉得没必要,先熟悉最基础的,我在上面圈了两个地方Queue、Admin和Add a new queue 这个三个是最基本的,我们学习必须要用的东西。

  • Queue:这个就是我们声明的消息队列;
  • Admin:用户管理,RabbitMQ默认有一个用户是guest,但是RabbitMQ神奇的就是每个库都必须创建一个用户角色;
  • Add a new queue:这个就是创建一个新的队列,但是我们一般不这么直接创建,而是在代码中创建;

再来补充下Admin吧,首先要告诉大家一个基本的东西,就是如果想要声明一个队列,那么你必须要有一个库(比喻手法),队列存在于库中,可以想象下mysql,是不是得先有库,再有表。MQ也是这样的。

右边的Virtual Hosts就是创建库,Name就是库名,写的时候前面必须加/

如果细心的朋友可以看到第一张图中两个队列前面就是库名,标识队列存在与xiaoshuo库中。

一个简单的消息队列

当生产者生产出消息之后,发送到队列中,消费者监听到队列中有消息进行消费,那么我们本篇就先实现一个简单的消息队列。

代码

我们不使用SpringBoot框架,我们就从基本的写,人生原生的API,这样以后才懂得什么是怎么回事。

1、引入需要的pom文件

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>

2、连接RabbitMQ

/**
* @description: TODO MQ连接工厂
* @author: bingfeng
* @create: 2020-05-07 08:55
*/
public class MQConnectUtil {

public static Connection getConnection() throws IOException, TimeoutException {

// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 设置连接地址
factory.setHost("127.0.0.1");

// 设置端口
factory.setPort(5672);

// 选择 vhost
factory.setVirtualHost("/xiaoshuo");

// 设置用户名
factory.setUsername("bingfeng");

// 密码
factory.setPassword("123");

return factory.newConnection();
}
}

3、发送消息

/**
* @description: TODO 模拟发送消息
* @author: bingfeng
* @create: 2020-05-07 09:01
*/
public class Producer {

/**
* 队列名
*/
public static final String QUEUE_NAME = "simple_queue_test";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = MQConnectUtil.getConnection();

// 从连接中获取一个通道
Channel channel = connection.createChannel();

// 创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 消息内容
String msg = "你好,冰峰!";

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

System.out.println("发送消息:" + msg);

channel.close();
connection.close();
}
}

4、消费消息

/**
* @description: TODO 消费MQ
* @author: bingfeng
* @create: 2020-05-07 09:11
*/
public class Consumer {

public static final String QUEUE_NAME = "simple_queue_test";

public static void main(String[] args) throws Exception {

// 获取连接
Connection connection = MQConnectUtil.getConnection();

// 创建频道
Channel channel = connection.createChannel();

// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String msg = new String(body, StandardCharsets.UTF_8);

System.out.println("消费消息:" + msg);
}
};

// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

当我们发送一个消息之后,我们可以直接在面板看到队列中的消息数

点进来之后,拉到下面,Messages就是我们想要查看的消息数量,就是你想看几条消息填几就行了,填完之后点击下面的Get Messages,我们最近的消息就会显示在下面。

当我们消费了这个消息之后,队列中就没有这个消息了。

最后的话

今天就写到这,我打算把RabbitMQ从入门到项目中的实战用法全部一级一级分享给大家,一是防止自己以后忘记可以回来翻翻看,二是分享给大家有兴趣的朋友可以一起学习,后面我也会把我之前说的,订单过期删除的业务场景也会写一遍。

熟悉RabbitMQ的朋友,可能看下来解决写的东西很简单,但是毕竟也有很多人实际工作中并没有用过MQ,自己可能也没有了解过,对于没有了解过的朋友来说,我觉得入个门还是挺不错的。

有什么问题,欢迎大家下方solo。

SpringBoot中的异常处理与参数校验
@PostConstruct注解,你该好好看看



好文章,我在看

本文分享自微信公众号 - 一个程序员的成长(xiaozaibuluo)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部