文档章节

rabbitmq学习记录(五)交换机Exchange-fanout

人觉非常君
 人觉非常君
发布于 07/20 19:48
字数 1029
阅读 6
收藏 1

之前学习的都是一条消息发给一个消费者,下面开始记录如何把一条信息发给多个消费者

这边我们用到了交换机Exchange

交换机模式:fanout

模式特点:生产者把消息发送给Exchange之后,Exchange则会把这些消息添加到与自己绑定的所有队列之中,监听这些队列的消费者就可以收到这些消息。

注:Exchange并不能保存信息,如果没有绑定的队列,那么生产者发送数据就会丢失,只有队列才能存储生产者的消息。

生产者:声明交换机后指定交换机模式fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	// exchange名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	public static void main(String[] args) {
		Connection connection = null;
		Channel channel = null;
		try {
			// 获取连接
			connection = ConnectionUtil.getConnection();
			// 创建通道
			channel = connection.createChannel();
			// 声明交换机
			channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
			// 生产者发送的信息
			String msg = "msg from producer:";
			for(int i=0;i<10;i++) {
				msg = "msg from producer :" + i;
				System.out.println("send msg : "+msg);
				// 发送信息
				channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 关闭通道
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
			// 关闭连接
			try {
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
	}
	
}

 

消费者1:声明队列fanout_exchange_to_queue_01并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_01";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
				
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
			
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

消费者2:声明队列fanout_exchange_to_queue_02并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer02 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_02";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[2]:receive msg:"+msg);
					System.out.println("[2]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

消费者3:声明队列fanout_exchange_to_queue_03,没有绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer03 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_03";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
//			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[3]:receive msg:"+msg);
					System.out.println("[3]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

这次,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法

可以看到生产者已经成功发送消息给交换机,交换机也成功创建了,但是消息数据丢失了,原因就是上面说的,交换机无法保存从生产者那接收到的信息,只有队列可以保存消息。

接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。

消费者1终端:

消费者2终端:

消费者3终端:

 

 

© 著作权归作者所有

共有 人打赏支持
人觉非常君
粉丝 6
博文 43
码字总数 29951
作品 0
浦东
私信 提问
springboot之RabbitMQ详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最...

无语年华
05/30
0
0
RabbitMQ 的安装与工作模式

RabbitMQ 概念: 交换机(exchange type)把消息推送到队列的方法: fanout:不处理路由键,转发到所有绑定的队列上 direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发 topic:路由...

求学ing
2014/11/11
0
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
09/07
0
0
RabbitMQ与SpringBoot整合

首先添加依赖 添加配置 在application.properties中添加以下配置 演示一 :direct模式交换机(exchange)模式 创建消息队列 消息发送者 消息接收者 注意:用户guest是rabbitMQ的默认用户 密码为g...

haoyuehong
07/10
0
0
Springboot 集成rabbitmq

RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlan...

江火似流萤
2017/11/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

PHP生成CSV之内部换行

当我们使用PHP将采集到的文件内容保存到csv文件时,往往需要将采集内容进行二次过滤处理才能得到需要的内容。比如网页中的换行符,空格符等等。 对于空格等处理起来都比较简单,这里我们单独...

豆花饭烧土豆
28分钟前
1
0
使用 mjml 生成 thymeleaf 邮件框架模板

发邮件算是系统开发的一个基本需求了,不过搞邮件模板实在是件恶心事,估计搞过的同仁都有体会。 得支持多种客户端 支持响应式 疼彻心扉的 outlook 多数客户端只支持 inline 形式的 css 布局...

郁也风
31分钟前
4
0
让哲学照亮我们的人生——读《医务工作者需要学点哲学》有感2600字

让哲学照亮我们的人生——读《医务工作者需要学点哲学》有感2600字: 作者:孙冬梅;以前读韩国前总统朴槿惠的著作《绝望锻炼了我》时,里面有一句话令我印象深刻,她说“在我最困难的时期,...

原创小博客
今天
3
0
JAVA-四元数类

public class Quaternion { private final double x0, x1, x2, x3; // 四元数构造函数 public Quaternion(double x0, double x1, double x2, double x3) { this.x0 = ......

Pulsar-V
今天
17
0
Xshell利用Xftp传输文件,使用pure-ftpd搭建ftp服务

Xftp传输文件 如果已经通过Xshell登录到服务器,此时可以使用快捷键ctrl+alt+f 打开Xftp并展示Xshell当前的目录,之后直接拖拽传输文件即可。 pure-ftpd搭建ftp服务 pure-ftpd要比vsftp简单,...

野雪球
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部