文档章节

rabbitmq学习记录(八)消息发布确认机制

人觉非常君
 人觉非常君
发布于 07/20 22:16
字数 1037
阅读 10
收藏 2

RabbitMQ服务器崩了导致的消息数据丢失,已经持久化的消息数据我们可以通过消息持久化来预防。但是,如果消息从生产者发送到vhosts过程中出现了问题,持久化消息数据的方案就无效了。 RabbitMQ为我们提供了两种解决方案:

方案一:通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;

实现方法以及测试结果如下: 生产者一:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer01 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

生产者二:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer02 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			int k = 2/0;// 触发rollback事件
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

消费者:

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 队列名称
	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

下面我们开始测试: 先运行测试类的main方法,接下来运行生产者1的main方法,结果如下: 再来运行运行生产者2的main方法,结果如下: 可以看到消费者只接收到了生产者1发送的消息,消息2的没有收到。至于消息有没发送到message broker,或者说 原因就是:在发送者发送消息过程中,执行

channel.txCommit();

之前,如果出现了什么问题,就行执行

channel.txRollback();

回滚事务。 但是事务影响性能比较严重,所以还是建议使用方案二。

方案二:通过将channel设置成confirm模式来实现; confirm:发送单条消息后确认

package com.example.demo.queue.confirm.confirm.single;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_single_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "Producer发出的信息:";
		channel.confirmSelect();
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		System.out.println("send msg : "+msg);
		if (channel.waitForConfirms()) {
			System.out.println("信息发送成功.");
		} else {
			System.out.println("信息发送失败.");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:批量发送消息后确认

package com.example.demo.queue.confirm.confirm.batch;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_batch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		channel.confirmSelect();
		for(int i=0;i<10;i++) {
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg["+i+"] : "+msg);
		}
		if (channel.waitForConfirms()) {
			System.out.println("msg send successfully");
		} else {
			System.out.println("msg send fail");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:发送信息,异步确认

package com.example.demo.queue.confirm.confirm.synch;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_synch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		channel.confirmSelect();
		// 存放信息的序列化
		SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
		// 添加监听器
		channel.addConfirmListener(new ConfirmListener() {
			
			/**
			 * 收到消费者已经处理完消息之后发出的反馈,触发该方法
			 */
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
			
			/**
			 * 长时间没收到消费者已经处理完消息之后发出的反馈,触发该方法
			 */
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
		});
		while(true) {
			long seqNo = channel.getNextPublishSeqNo();
			channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes());
			longTreeSet.add(seqNo);
		}
	}
	
}

© 著作权归作者所有

共有 人打赏支持
人觉非常君
粉丝 4
博文 43
码字总数 29919
作品 0
浦东
Spring Boot+RabbitMQ学习笔记

RabbitMQ RabbitMQ 是一个由 Erlang 语言开发的 的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息队列 先从基础开始,加...

听_风
06/07
0
0
RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

Edwyn王
2015/05/20
0
0
RabbitMQ与Redis队列对比

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

凯文加内特
2015/05/20
0
0
rabbitmq——prefetch count

消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。 然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbi...

hncscwc
2014/01/24
0
1
【转载】关于RabbitMQ的消息确认

RabbitMQ 将消息投递到客户端后,客户端如果没处理完这个消息就死掉了,这个消息还会不会存在?这取决于 RabbitMQ 的消息确认机制(Message acknowledgment)是否打开。 为了确保消息不会丢失...

摩云飞
2012/11/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

angular指令监听ng-repeat渲染完成后执行自定义事件方法

今天工作中遇到需要用到ng-repeat遍历渲染完后执行某个操作,angular本身并没有提供监听ng-repeat渲染完成的指令,所以需要自己创建自定义指令。 在ng-repeat模板实例内部会暴露出一些特殊属...

孟飞阳
47分钟前
1
0
URLEncoder和URLDecoder

public static void main(String[] args) { String str1 = "https://test1-life.pingan.com/ilifecore/productMall/loading.html?productId=8000000241&channelCode=XCX00001&productCode=00......

鬼才王
57分钟前
2
0
对象及变量的并发访问-第一篇

方法内部的变量为线程安全变量 “非线程安全”问题存在于“共享变量”中,如果是方法内部的私有变量,则不存在“非线程安全”问题,所得结果也就是“线程安全”的。 package chaprer3;/**...

简心
58分钟前
1
0
程序媛眼中的程序猿原来是这样子的!

一直都想写一篇关于描述程序员的文章,但是一直没能开头,一来因为文笔不好,更主要的原因是貌似对程序员既熟悉又不熟悉,很怕写出来的是以偏概全,给大家造成对程序员的既定印象,不过,管他...

Java小铺
今天
1
0
bean标签

声明:本栏目所使用的素材都是凯哥学堂VIP学员所写,学员有权匿名,对文章有最终解释权;凯哥学堂旨在促进VIP学员互相学习的基础上公开笔记。 bean标签 bean标签中的init-method属性,该属性...

凯哥学堂
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部