文档章节

rabbitmq学习记录(八)消息发布确认机制

人觉非常君
 人觉非常君
发布于 2018/07/20 22:16
字数 1037
阅读 30
收藏 2

RabbitMQ服务器崩了导致的消息数据丢失,已经持久化的消息数据我们可以通过消息持久化来预防。但是,如果消息从生产者发送到vhosts过程中出现了问题,持久化消息数据的方案就无效了。 RabbitMQ为我们提供了两种解决方案:

方案一:通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;

实现方法以及测试结果如下: 生产者一:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer01 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

生产者二:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer02 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			int k = 2/0;// 触发rollback事件
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

消费者:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 队列名称
	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

下面我们开始测试: 先运行测试类的main方法,接下来运行生产者1的main方法,结果如下: 再来运行运行生产者2的main方法,结果如下: 可以看到消费者只接收到了生产者1发送的消息,消息2的没有收到。至于消息有没发送到message broker,或者说 原因就是:在发送者发送消息过程中,执行

channel.txCommit();

之前,如果出现了什么问题,就行执行

channel.txRollback();

回滚事务。 但是事务影响性能比较严重,所以还是建议使用方案二。

方案二:通过将channel设置成confirm模式来实现; confirm:发送单条消息后确认

package com.example.demo.queue.confirm.confirm.single;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_single_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "Producer发出的信息:";
		channel.confirmSelect();
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		System.out.println("send msg : "+msg);
		if (channel.waitForConfirms()) {
			System.out.println("信息发送成功.");
		} else {
			System.out.println("信息发送失败.");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:批量发送消息后确认

package com.example.demo.queue.confirm.confirm.batch;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_batch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		channel.confirmSelect();
		for(int i=0;i<10;i++) {
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg["+i+"] : "+msg);
		}
		if (channel.waitForConfirms()) {
			System.out.println("msg send successfully");
		} else {
			System.out.println("msg send fail");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:发送信息,异步确认

package com.example.demo.queue.confirm.confirm.synch;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_synch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		channel.confirmSelect();
		// 存放信息的序列化
		SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
		// 添加监听器
		channel.addConfirmListener(new ConfirmListener() {
			
			/**
			 * 收到消费者已经处理完消息之后发出的反馈,触发该方法
			 */
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
			
			/**
			 * 长时间没收到消费者已经处理完消息之后发出的反馈,触发该方法
			 */
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
		});
		while(true) {
			long seqNo = channel.getNextPublishSeqNo();
			channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes());
			longTreeSet.add(seqNo);
		}
	}
	
}

© 著作权归作者所有

共有 人打赏支持
人觉非常君
粉丝 7
博文 44
码字总数 30395
作品 0
浦东
私信 提问
java B2B2C Springboot电子商城系统-消息队列之 RabbitMQ

常见的消息队列 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码请加企鹅求求: :二一四七七七五六三三 目前业界有四款常用的消息队列,它们分别是RabbitMQ、Roc...

it菲菲
2018/12/14
0
0
(六)RabbitMQ消息队列-消息任务分发与消息ACK确认机制(PHP版)

在前面一章介绍了在PHP中如何使用RabbitMQ,至此入门的的部分就完成了,我们内心中一定还有很多疑问:如果多个消费者消费同一个队列怎么办?如果这几个消费者分任务的权重不同怎么办?怎么把...

Super_RD
2017/04/26
0
0
RabbitMQ与Redis队列对比

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

凯文加内特
2015/05/20
0
0
RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

Edwyn王
2015/05/20
0
0
rabbitmq——prefetch count

消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。 然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbi...

hncscwc
2014/01/24
0
1

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 白掌柜说了卖货不卖身

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @爱漫爱 :这是一场修行分享羽肿的单曲《Moony》 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :开不开心? 开心呀, 我又不爱睡懒觉…...

小小编辑
今天
7
0
大数据教程(11.7)hadoop2.9.1平台上仓库工具hive1.2.2搭建

上一篇文章介绍了hive2.3.4的搭建,然而这个版本已经不能稳定的支持mapreduce程序。本篇博主将分享hive1.2.2工具搭建全过程。先说明:本节就直接在上一节的hadoop环境中搭建了! 一、下载apa...

em_aaron
今天
2
0
开始看《JSP&Servlet学习笔记》

1:WEB应用简介。其中1.2.1对Web容器的工作流程写得不错 2:编写Servlet。搞清楚了Java的Web目录结构,以及Web.xml的一些配置作用。特别是讲了@WebServlet标签 3:请求与响应。更细致的讲了从...

max佩恩
今天
4
0
mysql分区功能详细介绍,以及实例

一,什么是数据库分区 前段时间写过一篇关于mysql分表的的文章,下面来说一下什么是数据库分区,以mysql为例。mysql数据库中的数据是以文件的形势存在磁盘上的,默认放在/mysql/data下面(可...

吴伟祥
今天
3
0
SQL语句查询

1.1 排序 通过order by语句,可以将查询出的结果进行排序。放置在select语句的最后。 格式: SELECT * FROM 表名 ORDER BY 排序字段ASC|DESC; ASC 升序 (默认) DESC 降序 1.查询所有商品信息,...

stars永恒
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部