RabbitMQ工作队列之公平分发消息与消息应答(ACK)

原创
2020/05/13 22:39
阅读数 115

上篇文章中,我们讲了工作队列轮询的分发模式,该模式无论有多少个消费者,不管每个消费者处理消息的效率,都会将所有消息平均的分发给每一个消费者,也就是说,大家最后各自消费的消息数量都是一样多的。由此也就引发我们今天要介绍的公平分发模式。

消息应答(ACK)

消息丢失

我们之前的所有代码,如果消息队列将消息分发给消费者,那么就会从队列中删除,如果在我们处理任务的过程中,处理失败或者服务器宕机,那么这条消息肯定得不到执行,就会出现丢失。

我们所设想的如果任务在处理的过程中,如果服务器宕机等原因造成消息未被正常消费,那么必须分发给其他的消费者再次进行消费,这样及时服务器宕机也不会丢失任何的消息了。

ACK

所以ACK,就是消息应答机制,我们之前写的代码都是开启了自动应答,所以如果我们的消息没被正常消费,就会丢失。

要想确保消息不丢失,就必须将ACK自动应答关闭掉,在我们处理消息的流程中,如果消息正常被处理,那么最后进行手动应答,告诉队列我们正常消费了消息。

超时

RabbitMQ它是没有我们平常所见到的超时时间限制的,只要当消费者服务宕机,消息才会被重新分发,哪怕处理这条消息需要花费很长的时间。

公平分发模式

缺陷

我们提供多个消费者,目的就是为了提高系统的性能,提升系统处理任务的速度,如果将消息平均的分发给每个消费者,那么处理消息快的服务是不是会空闲下来,而处理慢的服务可能会阻塞等待处理,这样的场景是我们不愿意看到的。所以有了今天要说的分发模式,公平分发

能者多劳

所谓的公平分发,其实用能者多劳描述更为贴切,根据名字就可以知道,谁有能力处理更多的任务,那么就交给谁处理,防止消息的挤压。

那么想要实现公平分发,那么必须要将自动应答改为手动应答。这是公平分发的前提。

代理

消息生产者

public class Send {

public static final String QUEUE_NAME = "test_word_queue";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

// 获取连接
Connection connection = MQConnectUtil.getConnection();

// 创建通道
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i = 0; i < 10; i++) {

String msg = "消息:" + i;

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

Thread.sleep(i * 20);

System.out.println(msg);
}

channel.close();
connection.close();
}
}

消费者1

我们在消费者中设置了channel.basicQos(1);这样一个参数,这个意思就是表示,此消费者每次最多只接收一条消息进行处理,只有将消息处理结束,手动应答之后,下一条消息才会被分发进来。

public class Consumer1 {

public static final String QUEUE_NAME = "test_word_queue";

public static void main(String[] args) throws Exception {

// 获取连接
Connection connection = MQConnectUtil.getConnection();

// 创建频道
Channel channel = connection.createChannel();

// 一次仅接受一条未经确认的消息
channel.basicQos(1);

// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String msg = new String(body, StandardCharsets.UTF_8);

System.out.println("消费者[1]-内容:" + msg);

Thread.sleep(2 * 1000);

// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};

// 监听队列,将自动应答方式改为false,关闭自动应答机制
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

消费者2

public class Consumer2 {

public static final String QUEUE_NAME = "test_word_queue";

public static void main(String[] args) throws Exception {

// 获取连接
Connection connection = MQConnectUtil.getConnection();

// 创建频道
Channel channel = connection.createChannel();

// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

// 定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String msg = new String(body, StandardCharsets.UTF_8);

System.out.println("消费者[2]-内容:" + msg);

Thread.sleep(1000);

// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};

// 监听队列,需要将自动应答方式改为false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

消费结果

那么结果就会像我们之前预想的那样,由于消费者2消费消息花费的时间比消费者1更少,所以消费者2处理的消息的数量要比消费者1处理的消息的数量要多。这里我就不贴图了,大家可以敲代码进行尝试。


今天的文章到这里就结束了,下篇呢,会给介绍介绍另外一种模式,发布订阅模式

日拱一卒,功不唐捐


今日推荐

如何利用RabbitMQ生产一个简单的消息

RabbitMQ如何高效的消费消息




好文章,我在看


本文分享自微信公众号 - 一个程序员的成长(xiaozaibuluo)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部