文档章节

rabbitmq学习记录(八)消息发布确认机制

人觉非常君
 人觉非常君
发布于 07/20 22:16
字数 1037
阅读 17
收藏 3

RabbitMQ服务器崩了导致的消息数据丢失,已经持久化的消息数据我们可以通过消息持久化来预防。但是,如果消息从生产者发送到vhosts过程中出现了问题,持久化消息数据的方案就无效了。 RabbitMQ为我们提供了两种解决方案:

方案一:通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;

实现方法以及测试结果如下: 生产者一:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer01 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

生产者二:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer02 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			int k = 2/0;// 触发rollback事件
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

消费者:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 队列名称
	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

下面我们开始测试: 先运行测试类的main方法,接下来运行生产者1的main方法,结果如下: 再来运行运行生产者2的main方法,结果如下: 可以看到消费者只接收到了生产者1发送的消息,消息2的没有收到。至于消息有没发送到message broker,或者说 原因就是:在发送者发送消息过程中,执行

channel.txCommit();

之前,如果出现了什么问题,就行执行

channel.txRollback();

回滚事务。 但是事务影响性能比较严重,所以还是建议使用方案二。

方案二:通过将channel设置成confirm模式来实现; confirm:发送单条消息后确认

package com.example.demo.queue.confirm.confirm.single;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_single_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "Producer发出的信息:";
		channel.confirmSelect();
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		System.out.println("send msg : "+msg);
		if (channel.waitForConfirms()) {
			System.out.println("信息发送成功.");
		} else {
			System.out.println("信息发送失败.");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:批量发送消息后确认

package com.example.demo.queue.confirm.confirm.batch;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_batch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		channel.confirmSelect();
		for(int i=0;i<10;i++) {
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg["+i+"] : "+msg);
		}
		if (channel.waitForConfirms()) {
			System.out.println("msg send successfully");
		} else {
			System.out.println("msg send fail");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:发送信息,异步确认

package com.example.demo.queue.confirm.confirm.synch;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_synch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		channel.confirmSelect();
		// 存放信息的序列化
		SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
		// 添加监听器
		channel.addConfirmListener(new ConfirmListener() {
			
			/**
			 * 收到消费者已经处理完消息之后发出的反馈,触发该方法
			 */
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
			
			/**
			 * 长时间没收到消费者已经处理完消息之后发出的反馈,触发该方法
			 */
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
		});
		while(true) {
			long seqNo = channel.getNextPublishSeqNo();
			channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes());
			longTreeSet.add(seqNo);
		}
	}
	
}

© 著作权归作者所有

共有 人打赏支持
人觉非常君
粉丝 6
博文 43
码字总数 29951
作品 0
浦东
Spring Boot+RabbitMQ学习笔记

RabbitMQ RabbitMQ 是一个由 Erlang 语言开发的 的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息队列 先从基础开始,加...

听_风
06/07
0
0
RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

Edwyn王
2015/05/20
0
0
RabbitMQ与Redis队列对比

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

凯文加内特
2015/05/20
0
0
rabbitmq——prefetch count

消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。 然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbi...

hncscwc
2014/01/24
0
1
【转载】关于RabbitMQ的消息确认

RabbitMQ 将消息投递到客户端后,客户端如果没处理完这个消息就死掉了,这个消息还会不会存在?这取决于 RabbitMQ 的消息确认机制(Message acknowledgment)是否打开。 为了确保消息不会丢失...

摩云飞
2012/11/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot 上传图片(多张和单张都可以)

@RequestMapping("/upload")@ResponseBodypublic String handleFileUpload(HttpServletRequest request) { MultipartHttpServletRequest params = ((MultipartHttpServletRequest) req......

MrBoyce
4分钟前
1
0
Android Studio 3.X打开DDMS

Android Studio更新到3.0版本以后Tools菜单变成了这样了 DDMS入口找不到了 在设置里找到SDK的路径 打开tools文件夹 双击打开 monitor.bat文件 会出来一个黑色命令行框,稍等一下,熟悉的DDM...

lanyu96
9分钟前
1
0
js 进一法、四舍五入法、舍去法 保留两位小数

function format_number(num,len) { len = len > 0 && len <= 20 ? len : 2; var result = parseFloat(num); num=isNaN(result)?0:result; var numpow=Math.pow(10,len)......

xiaogg
16分钟前
0
0
Node.js中的不安全跳转如何防御详解

Node.js中的不安全跳转如何防御详解 导语: 早年在浏览器大战期间,有远见的Chrome认为要运行现代Web应用,浏览器必须有一个性能非常强劲的Java引擎,于是Google自己开发了一个高性能的开源的...

开元中国2015
17分钟前
1
0
国内首个区块链试验区在海南成立

据新华社报道,10月8日,海南自贸区(港)区块链试验区正式在海南生态软件园授牌设立,这也是目前为止国内第一个区块链试验区。 该试验区位于海南生态软件园,与试验区同一天成立还有2家研究...

问题终结者
18分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部