文档章节

轻松搞定RabbitMQ(三)——消息应答与消息持久化

白志华
 白志华
发布于 2015/10/18 10:56
字数 1264
阅读 80
收藏 3

       这个官网的第二个例子中的消息应答和消息持久化部分。我把它摘出来作为单独的一块儿来分享。


Message acknowledgment(消息应答)

       执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。基于现在的代码,一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。

       但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。

       为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ可以删除它了。

       如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会不丢失任何消息了。

       没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

       消息应答是默认打开的。我们明确地把它们关掉了(autoAck=true)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

       修改一下Worker.java

channel.basicQos(1);//保证一次只分发一个
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			      channel.basicAck(envelope.getDeliveryTag(), false);
			    }
			  }
			};
       我们还是运行1个生产者,2个消费者,在消息处理过程中,人为让一个消费者挂掉,然后会看到剩下的任务都会被另外的消费者执行。

       运行结果如下:


       如果你关闭了自动消息应答,手动也未设置应答,这是一个很简单错误,但是后果却是极其严重的。消息在分发出去以后,得不到回应,所以不会在内存中删除,结果RabbitMQ会越来越占用内存,最终的结果,你懂得。。。


Message durability(消息持久化)

       我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍将失去!
       当RabbitMQ退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。 
    

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
       尽管这行代码是正确的,但他不会在我们当前的设置中起作用。因为我们已经定义了一个名叫hello的未持久化的队列。RabbitMQ不允许使用不同的参数设定重新定义已经存在的队列,并且会向尝试如此做的程序返回一个错误。一个快速的解决方案——就是声明一个不同名字的队列,比如task_queue。

       (当然,我们也可以登录到RabbitMQ的服务管理页面,RabbitMQ默认的端口是5672,管理页面默认端口是15672,页面地址为:http://localhost:15672,使用是用户名和密码登录。RabbitMQ的默认密码和用户名都是guest。点开“queue”那栏,可以看到队列列表,点击“hello”杜列,会展开队列的详细信息。把页面拉到最后,有一项“Delete / purge”,点开,点击“Delete”按钮,就可以把队列删除掉了。 然后再运行代码的时候,就会创建一个支持持久化的hello队列。)

       上述的代码需要在生产者和消费者都要作出同样的修改。

       在这一点上我们确信task_queue的队列不会丢失,即使RabbitMQ服务重启。现在我们需要将消息标记为持久性的——通过设置 MessageProperties(实现BasicProperties)为PERSISTENT_TEXT_PLAIN。

       现在你可以启动RabbitMQ服务器,执行一次生产者NewTask的程序,然后关闭服务器,再重新启动服务器,运行消费者Work做下实验。可以发现消费者依旧可以读出消息来。说明在RabbitMQ服务器关闭后,消息和队列信息都已经做了持久化。再次启动后,会重新加载到服务器中,消费者运行后,就可以正常的从队列中获取消息了。

本文转载自:http://blog.csdn.net/xiaoxian8023/article/details/48710653

共有 人打赏支持
白志华
粉丝 32
博文 265
码字总数 57524
作品 0
长沙
程序员
私信 提问
RabbitMQ 实战教程(二) 工作队列

原文出处:梁桂钊的博客 工作队列,又称任务队列,主要思想是避免立即执行资源密集型任务,并且必须等待完成。相反地,我们进行任务调度,我们将一个任务封装成一个消息,并将其发送到队列。...

lianggui
09/29
0
0
RabbitMQ实战2.消息轮询、响应、持久化

继上篇 RabbitMQ实战1.消息代理 消息轮询分配 如果生产者投递的消息需要运行相当长的时间,且有多个消费者在处理消息,那么RabbitMQ是怎么分配消息的? 新建 new_task.py 新建 worker.py 在三...

章鱼喵_
08/09
0
0
RabbitMQ生产端消息可靠性投递方案分析

导文: 1.什么是RabbitMQ 2.Java开发技术大杂烩(三)之电商项目优化、rabbitmq、Git、OSI、VIM、Intellj IDEA、HTTP、JS、Java 之前在上面2篇文章中,讲到过RabbitMQ的安装,基本概念和用法。...

cmazxiaoma
11/07
0
0
Rabbitmq-server 分析

一、 发送端: 接收端: 验证: 在多个shell窗口中运行consumer,然后运行procedure,发现每个consumer 轮流接收相同队列中的消息。 二、 发送端: 接收端: (1) rabbitmq循环调度,将消息循...

meteor_hy
05/31
0
0
你不知道的RabbitMQ集群架构全解

你不知道的RabbitMQ集群架构全解 RabbitMQ系列文章 RabbitMQ在Ubuntu上的环境搭建 深入了解RabbitMQ工作原理及简单使用 RabbitMQ交换器Exchange介绍与实践 RabbitMQ事务和Confirm发送方消息确...

王磊的博客
07/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

我为什么坚持写作

说写作可能是抬高了自己,目前来说只能是写东西、记录东西、表达观点和情感。 在俞敏洪的公众号上看到过一篇文章,里面讲了一个观点,大概是说写作不求能写出伟大的作品,只是把自己的生活、...

Bob2100
58分钟前
1
0
中国公有云三巨头,同时支持Rancher Kubernetes平台

华为云容器引擎(CCE)、阿里云K8S容器服务(ACK)和腾讯云K8S引擎(TKE),中国公有云三巨头正式全面支持Rancher Kubernetes平台。 Rancher正式宣布扩大对中国领先Kubernetes服务的支持,华...

RancherLabs
今天
0
0
【NLP】【八】基于keras与imdb影评数据集做情感分类

【一】本文内容综述 1. keras使用流程分析(模型搭建、模型保存、模型加载、模型使用、训练过程可视化、模型可视化等) 2. 利用keras做文本数据预处理 【二】环境准备 1. 数据集下载:http:...

muqiusangyang
今天
1
0
nginx 解决session一致性

session 粘滞性 每个请求按访问ip的hash结果分配,这样每个访客固定访问一个后端服务器,可以解决session的问题。 upstream backserver {ip_hash;server 192.168.0.14:88;server 192.1...

zhu_kai1
今天
2
0
使用Cloud application Studio在C4C UI里创建下拉列表(dropdown list)

在Cloud Application Studio里新建一个Code List Data Type: 维护Value和描述信息,以及在ABSL里使用的constant值。 保存之后,上述维护的信息会存储到一个名为.codelist 的文件里。激活之后...

JerryWang_SAP
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部