RabbitMQ示例Demo

原创
2019/03/22 19:01
阅读数 4W

生产者 发送消息 带上 routingkey (不需要bind 操作)

消费者接收消息 需要 bind routingkey,

生产者根据 routingkey 和消费者 bind 的rutingkey 来发送消息到 对应消费者的 队列中去。

bindkey 其实就是 routingkey ,不存在bindingkey 这个参数名称,

1. rabbitMQ windows下的安装

(1) 下载erlang 可根据自己的操作系统下载32位和64位的:http://www.erlang.org/downloads,下载后直接点击下一步进行默认路径和设置安装即可。

新建一个系统变量:变量名为ERLANG_HOME,变量值为安装Erlang的路径(路径中不要包含bin目录)

把ERLANG_HOME 追加到Path路径中

维护以上信息后,打开cmd 命令窗口键入:erl 查看 erl 是否安装成功

(2) 安装RabbitMQ 下载地址:https://www.rabbitmq.com/install-windows.html

安装完成后 cmd 命令到安装目录下的sbin 目录 键入命令: rabbitmq-plugins.bat enable rabbitmq_management  开启插件

键入: rabbitmq-server  启动MQ

浏览器http://localhost:15672/ 访问MQ的主页 初始化密码和用户名都是: guest  登录成功后我们会看到以下页面:

1.在eclipse 对rabbitMQ 做一个简单的demo以便我们对rabbitMQ的前期理解,编写并运行rabbitMQ的工作队列模式(workqueue):创建rabbitMQproducer 生产者,rabbitMGconsumer 消费者(可以有多个消费者):

(1)创建rabbitMGproducer项目引入ampq依赖:

    <dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>3.6.5</version>
    </dependency>

(2)rabbitMQproducer 生产者:

/**
 * 生产者
 * @author Administrator
 *
 */
public class rabbitMQproducer {
	private static String host = "127.0.0.1";
	private static String userName = "guest";
	private static String passWord = "guest";
	private static int port = 5672;
	/**
	 * @param args
	 */
	public static void main(String[] args) {

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(userName);
			factory.setPassword(passWord);
			Connection connect = factory.newConnection();
			Channel channel = connect.createChannel();
			DeclareOk declareOK = channel.queueDeclare("test", false, false, false, null);
			channel.basicPublish("", "test", null, "hello rabbit".getBytes("UTF-8"));
			channel.close();
			connect.close();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

}

(3)消费者rabbitMGconsumer


/**
 * 消费者
 * @author Administrator
 *
 */
public class rabbitMGconsumer {


	private static String host = "127.0.0.1";
	private static String userName = "guest";
	private static String passWord = "guest";
	private static int port = 5672;
	/**
	 * @param args
	 */
	public static void main(String[] args) {

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(userName);
			factory.setPassword(passWord);
			Connection connect = factory.newConnection();
			Channel channel = connect.createChannel();
			DeclareOk declareOK = channel.queueDeclare("test", false, false, false, null);
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume("test", true, consumer);
			while (true) {
				Delivery deliver = consumer.nextDelivery();
				System.out.println("reciver messager:"+new String(deliver.getBody()));
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		
	}

}

(4)分别运行消费者和生产者,测试结果如下:

2.发布订阅模式,在这其中引入了exchange(交换器)的概念,消息发布者把消息发布到交换器,交换器再把消息根据routingKey发布到对应的队列,交换器exchangeType有以下类型: direct, topic, headers and fanout ;

(2.1)发布订阅模式exchangeType 为 :fanout 这种类型 exchange会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

2.1.1)创建exchangeType 类型为fanout的生产者rabbitMQProFanout

package com.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * 生产者
 * @author Administrator
 *
 */
public class rabbitMQProFanout {
	private static String host = "127.0.0.1";
	private static String userName = "guest";
	private static String passWord = "guest";
	private static int port = 5672;
	/**
	 * @param args
	 */
	public static void main(String[] args) {

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(userName);
			factory.setPassword(passWord);
			Connection connect = factory.newConnection();
			Channel channel = connect.createChannel();
			//声明 exchange 的type 为 fanout 广播模式
			channel.exchangeDeclare("my.fanout3","fanout",true);
			for (int i = 0; i < 100; i++) {
				channel.basicPublish("my.fanout3", "", null, "hellow my fanout".getBytes());
			}
			channel.close();
			connect.close();
		} catch (Exception e) {
			
		}
	}

}


2.1.2)创建exchangeType 类型为fanout的消费者rabbitMQConFanout ,获取。

public class rabbitMQConFanout {
	private static String host = "127.0.0.1";
	private static String userName = "guest";
	private static String passWord = "guest";
	private static int port = 5672;
	/**
	 * @param args
	 */
	public static void main(String[] args) {

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(userName);
			factory.setPassword(passWord);
			Connection connect = factory.newConnection();
			Channel channel = connect.createChannel();
			//声明exchange
			DeclareOk result = channel.exchangeDeclare("my.fanout3", "fanout",true);
			//声明 queue
			channel.queueDeclare("my.fanout.quene3", false, false, false, null);
			//绑定 queue 到 exchange
			channel.queueBind("my.fanout.quene3", "my.fanout3", "");
//			//获取消息
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume("my.fanout.quene3", false, consumer);
			channel.basicQos(1);
			while (true) {
				Delivery deliver = consumer.nextDelivery();
				System.out.println("reciver messager:"+new String(deliver.getBody()));
				channel.basicAck(deliver.getEnvelope().getDeliveryTag(), true);
				Thread.sleep(500);
				
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

注意:如果在没有消费者的情况下,生产者的消息将会丢失。

(2.2)创建exchangeType 类型为 Topic 的生产者 和消费者 ,topic的规则就是模糊匹配,比如 routingKey 是 log.topic 和warn.topic ,bindingKey 可以是 *.topic 其中 * 代表 一个单词  # 代表多个单词

    (2.2.1)生产者 rabbitMQProTopics


public class rabbitMQProTopics {
	private static String host = "127.0.0.1";
	private static String userName = "guest";
	private static String passWord = "guest";
	private static int port = 5672;
	/**
	 * @param args
	 */
	public static void main(String[] args) {

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(userName);
			factory.setPassword(passWord);
			Connection connect = factory.newConnection();
			Channel channel = connect.createChannel();
			//创建exchangeType 类型为 topic 
			channel.exchangeDeclare("my.topic","topic", true);
			channel.basicPublish("my.topic", "log.topic", null, "hellow my topic log".getBytes());
			channel.basicPublish("my.topic", "warn.topic", null, "hellow my topic warn".getBytes());
			channel.close();
			connect.close();
		} catch (Exception e) {
		}
	}

}

 (2.2.2) 消费者 rabbitMQConTopics

public class rabbitMQConTopic {
	private static String host = "127.0.0.1";
	private static String userName = "guest";
	private static String passWord = "guest";
	private static int port = 5672;
	/**
	 * @param args
	 */
	public static void main(String[] args) {

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(userName);
			factory.setPassword(passWord);
			Connection connect = factory.newConnection();
			Channel channel = connect.createChannel();
			//声明exchangeType 类型为 topic 名称为my.topc
			channel.exchangeDeclare("my.topic","topic", true);
			
			//声明queue
			channel.queueDeclare("my.topic.queue", false, false, false, null);
			
			//exchangeType 和 routingKey 绑定 (*匹配一个单词,#匹配多个单词)
			channel.queueBind("my.topic.queue", "my.topic", "*.topic");
			//获取消息
			QueueingConsumer consumer = new QueueingConsumer(channel);
			String result = channel.basicConsume("my.topic.queue", false, consumer);
			while (true) {
				Delivery deliver = consumer.nextDelivery();
				System.out.println("reciver messager:"+new String(deliver.getBody()));
				channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
			}
		} catch (Exception e) {
		}
	}

}

 分别启动 消费者 和 生产者 控制台打印结果 :

至此 在eclipse 中 单纯的对 rabbitMQ 有个简单的了解,下面我们就开始在 springboot 当中集成对rabbitMQ 的使用

 

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部