文档章节

rabbitmq学习记录(五)交换机Exchange-fanout

人觉非常君
 人觉非常君
发布于 07/20 19:48
字数 1029
阅读 2
收藏 1

之前学习的都是一条消息发给一个消费者,下面开始记录如何把一条信息发给多个消费者

这边我们用到了交换机Exchange

交换机模式:fanout

模式特点:生产者把消息发送给Exchange之后,Exchange则会把这些消息添加到与自己绑定的所有队列之中,监听这些队列的消费者就可以收到这些消息。

注:Exchange并不能保存信息,如果没有绑定的队列,那么生产者发送数据就会丢失,只有队列才能存储生产者的消息。

生产者:声明交换机后指定交换机模式fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	// exchange名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	public static void main(String[] args) {
		Connection connection = null;
		Channel channel = null;
		try {
			// 获取连接
			connection = ConnectionUtil.getConnection();
			// 创建通道
			channel = connection.createChannel();
			// 声明交换机
			channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
			// 生产者发送的信息
			String msg = "msg from producer:";
			for(int i=0;i<10;i++) {
				msg = "msg from producer :" + i;
				System.out.println("send msg : "+msg);
				// 发送信息
				channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 关闭通道
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
			// 关闭连接
			try {
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
	}
	
}

 

消费者1:声明队列fanout_exchange_to_queue_01并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_01";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
				
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
			
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

消费者2:声明队列fanout_exchange_to_queue_02并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer02 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_02";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[2]:receive msg:"+msg);
					System.out.println("[2]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

消费者3:声明队列fanout_exchange_to_queue_03,没有绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer03 {

	// 交换机名称
	private static final String EXCHANGE_NAME = "exchange_fanout"; 
	
	// 监听队列名称
	private static final String QUEUE_NAME = "fanout_exchange_to_queue_03";
	
	public static void main(String[] args) {
		try {
			// 获取连接
			Connection connection = ConnectionUtil.getConnection();
			// 创建通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 绑定队列到交换机
//			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[3]:receive msg:"+msg);
					System.out.println("[3]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

这次,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法

可以看到生产者已经成功发送消息给交换机,交换机也成功创建了,但是消息数据丢失了,原因就是上面说的,交换机无法保存从生产者那接收到的信息,只有队列可以保存消息。

接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。

消费者1终端:

消费者2终端:

消费者3终端:

 

 

© 著作权归作者所有

共有 人打赏支持
人觉非常君
粉丝 4
博文 43
码字总数 29919
作品 0
浦东
RabbitMQ 的安装与工作模式

RabbitMQ 概念: 交换机(exchange type)把消息推送到队列的方法: fanout:不处理路由键,转发到所有绑定的队列上 direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发 topic:路由...

求学ing
2014/11/11
0
0
springboot之RabbitMQ详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最...

无语年华
05/30
0
0
RabbitMQ与SpringBoot整合

首先添加依赖 添加配置 在application.properties中添加以下配置 演示一 :direct模式交换机(exchange)模式 创建消息队列 消息发送者 消息接收者 注意:用户guest是rabbitMQ的默认用户 密码为g...

haoyuehong
07/10
0
0
Springboot 集成rabbitmq

RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlan...

江火似流萤
2017/11/02
0
0
Rabbitmq-server 分析

一、 发送端: 接收端: 验证: 在多个shell窗口中运行consumer,然后运行procedure,发现每个consumer 轮流接收相同队列中的消息。 二、 发送端: 接收端: (1) rabbitmq循环调度,将消息循...

meteor_hy
05/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

HTTPS is easy

HTTPS is easy https://www.troyhunt.com/https-is-easy/ HTTPS is easy! In fact, it's so easy I decided to create 4 short videos around 5 minutes each to show people how to enable ......

openthings
23分钟前
0
0
bugList 2

用户端: 1. 上传文件时,当选择:彩色-A3-双面时,第二个图片有bug 应改为 和第一个图片的类型相同 2. 确认打印时,三个下拉选目前有bug 应改为:根据后台配置的商家,group by计算出不同城...

勇恒
26分钟前
2
0
keras cnn 网咯 mnist 分类

搭建貌似比tf是简单很多。。。。。 from keras.datasets import mnistfrom keras.utils import np_utilsfrom keras.models import Sequentialfrom keras.layers import Dense, Activat......

阿豪boy
28分钟前
0
0
解决 /var/run/nginx.pid failed

nginx: [error] open() "/var/run/nginx.pid" failed (2: No such file or directory) sudo nginx -c /etc/nginx/nginx.conf nginx -s reload...

驛路梨花醉美
30分钟前
0
0
nginx负载均衡-ssl原理-生成ssl密钥对-nginx配置ssl

nginx负载均衡: 1.创建配置文件 vim /usr/local/nginx/conf/vhost/load.conf #添加以下内容: upstream qq_com #名字自定义,借助此模块定义多个IP,后面...

ZHENG-JY
30分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部