文档章节

rabbitmq学习记录(五)交换机Exchange-fanout

人觉非常君
 人觉非常君
发布于 07/20 19:48
字数 1029
阅读 2
收藏 1

之前学习的都是一条消息发给一个消费者,下面开始记录如何把一条信息发给多个消费者

这边我们用到了交换机Exchange

交换机模式:fanout

模式特点:生产者把消息发送给Exchange之后,Exchange则会把这些消息添加到与自己绑定的所有队列之中,监听这些队列的消费者就可以收到这些消息。

注:Exchange并不能保存信息,如果没有绑定的队列,那么生产者发送数据就会丢失,只有队列才能存储生产者的消息。

生产者:声明交换机后指定交换机模式fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	// exchange名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	public static void main(String[] args) {
		Connection connection = null;
		Channel channel = null;
		try {
			// 获取连接
			connection = ConnectionUtil.getConnection();
			// 创建通道
			channel = connection.createChannel();
			// 声明交换机
			channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
			// 生产者发送的信息
			String msg = "msg from producer:";
			for(int i=0;i<10;i++) {
				msg = "msg from producer :" + i;
				System.out.println("send msg : "+msg);
				// 发送信息
				channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 关闭通道
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
			// 关闭连接
			try {
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
	}
	
}

 

消费者1:声明队列fanout_exchange_to_queue_01并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_01";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
				
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
			
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

消费者2:声明队列fanout_exchange_to_queue_02并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer02 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_02";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[2]:receive msg:"+msg);
					System.out.println("[2]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

消费者3:声明队列fanout_exchange_to_queue_03,没有绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer03 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_03";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
//			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[3]:receive msg:"+msg);
					System.out.println("[3]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

这次,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法

可以看到生产者已经成功发送消息给交换机,交换机也成功创建了,但是消息数据丢失了,原因就是上面说的,交换机无法保存从生产者那接收到的信息,只有队列可以保存消息。

接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。

消费者1终端:

消费者2终端:

消费者3终端:

 

 

© 著作权归作者所有

共有 人打赏支持
人觉非常君
粉丝 6
博文 43
码字总数 29951
作品 0
浦东
RabbitMQ 的安装与工作模式

RabbitMQ 概念: 交换机(exchange type)把消息推送到队列的方法: fanout:不处理路由键,转发到所有绑定的队列上 direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发 topic:路由...

求学ing
2014/11/11
0
0
springboot之RabbitMQ详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最...

无语年华
05/30
0
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
09/07
0
0
RabbitMQ与SpringBoot整合

首先添加依赖 添加配置 在application.properties中添加以下配置 演示一 :direct模式交换机(exchange)模式 创建消息队列 消息发送者 消息接收者 注意:用户guest是rabbitMQ的默认用户 密码为g...

haoyuehong
07/10
0
0
Rabbitmq-server 分析

一、 发送端: 接收端: 验证: 在多个shell窗口中运行consumer,然后运行procedure,发现每个consumer 轮流接收相同队列中的消息。 二、 发送端: 接收端: (1) rabbitmq循环调度,将消息循...

meteor_hy
05/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

6. Python3源码—List对象

6.1. List对象 List对象是“变长对象”。 6.1.1. Python中的创建 Python中List对象最重要的创建方法为PyList_New,如下Python语句最终会调用到PyList_New: test = [1, 2, 3, 4, 5] 6.1.2. ...

Mr_zebra
21分钟前
1
0
nginx屏蔽指定接口(URL)

Step1:需求 web平台上线后,需要屏蔽某个服务接口,但又不想重新上线,可以采用nginx屏蔽指定平台接口的办法 Step2:具体操作 location /dist/views/landing/UNIQUE_BEACON_URL { re...

Linux_Anna
29分钟前
2
0
tomcat高并发配置调优

作者:Joker-pan 原文:https://blog.csdn.net/u011622226/article/details/72510385?utm_source=copy --------------------- tomcat 解压就使用的,配置都没动过,肯定不能支持高并发了; ...

imbiao
48分钟前
4
0
mysql 联结,级联查询总结区分

其实我对 数据库的级联或者联结查询一直都是会用,项目能查询出来自己想要的结果即可。 毕竟SQL使用复杂的查询毕竟比较少,而且不难使用。 至于区分他们,我还真的有点模糊。 在看 《SQL必知...

之渊
今天
3
0
区块链入门教程分享区块链POW证明代码实现demo

兄弟连区块链入门教程分享区块链POW证明代码实现demo 这里强调一下区块链的协议分层 应用层 合约层 激励机制 共识层 网络层 数据层 上 一篇主要实现了区块链的 数据层,数据层主...

兄弟连区块链入门教程
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部