文档章节

了解消息队列中间件——RabbitMQ

江左煤郎
 江左煤郎
发布于 01/13 13:42
字数 1769
阅读 21
收藏 2

1. 了解消息队列中间件

    1. 消息:指的是在应用之间传送的数据,比如json字符串、纯文本字符串等

    2. 消息队列中间件:指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下进行进程之间的通信。现在常用的消息中间件有RabbitMQ、ActiveMQ、Kafka等

    3. 消息队列中间件的消息传递模式:

  • P2P(point to point):即点对点模式,该模式基于队列,消息产生者生成消息并发送到队列中保存,消息消费者从消息队列中接收消息进行处理(和多线程间中的生产者/消费者模式相同)。
  • Pub/Sub模式(publish/subscribe):即发布/订阅模式,该模式中会定义一个内容节点(topic),被称为主题,主题就相当于消息队列,是消息传递的一个中介,消息发布者将消息发布到主题中,而消息订阅者在订阅这个主题后就可以从该主题中接收到发布者发布的消息。用于一对多广播消息

    4. 消息中间件的作用:

  • 解耦:将消息发送者和消息接收者之间解耦
  • 持久化存储:在某些情况下,处理数据的过程可能会失败导致数据丢失,而消息中间件就可以实现将消息数据持久化存储到本地磁盘,直到这些消息数据被处理完成,就可以避免数据丢失的情况。
  • 扩展性:消息发送者和消息接收者之间的解耦,提供了良好的扩展性
  • 缓冲削峰:大多数情况下,消息发送肯定要比消息接收者的处理要快的多,而消息中间件就提供了缓冲层的作用,在访问量剧增的情况下,对于服务端会产生极大压力,而这样的情况很少出现,不可能为这类峰值情况提供持续的资源,如果没有消息中间件,很有可能会因为超负荷请求导致系统崩溃,而在加入消息中间件作为缓冲层后,请求会被写入消息中间件中等待处理,而消息接收者则会从消息中间件中一个个取出消息进行处理,不会因为突然的巨量请求导致崩溃,应用仍然能正常运行
  • 可恢复性:当系统中的部分组件失效时(比如断电、程序异常终止),因为消息中间件的存在,不会丢失过多数据,如果一个处理消息的进程挂掉,进入消息中间件的消息仍然可以在系统恢复后进行处理
  • 顺序保证:消息中间件支持一定程度上数据处理的顺序性
  • 异步通信:消息中间件提供了异步处理机制

2. 在CentOS中安装部署单机RabbitMQ

    参考在Centos中RabbitMQ的安装步骤

1、首先安装erlang ,下载erlang的安装包到centos上,

wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
并且进行安装 rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
查看是够安装成功

[root@localhost Desktop]# erl

Erlang/OTP 19 [erts-8.0.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V8.0.3  (abort with ^G)

输入halt().退出erl
2、安装rabbitMQ 
第一种在线下载,先下载

rpm:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
下载完成后安装:

yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm 

3、开放端口

可以选择直接关闭防火墙,执行命令 

systemctl stop firewalld.service

或者

vim /etc/sysconfig/iptables

#添加一下内容
#RabbitMQ  
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 25672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 4369 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5671 -j ACCEPT
##RabbitMQ 
4、启动rabbit

service rabbitmq-service start
或者

/sbin/service rabbitmq-server start

5、访问 
游览器输入下列地址,即可进入RabbitMQ的管理界面:

http://localhost:15672/
 

3. RabbitMQ的Java客户端简单使用

    1. 创建Maven工厂,导入RabbitMQ的Java客户端的相关jar包:

  	<dependency>
  		<groupId>com.rabbitmq</groupId>
  		<artifactId>amqp-client</artifactId>
  		<version>5.0.0</version>
  	</dependency>

    2. 添加RabbitMQ的管理员账号:在CentOS中启动RabbitMQ后,执行指令,添加一个root用户,并且密码为123456

rabbitmqctl add_user root 123456

设置账户权限,开放所有权限

rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

设置账户为管理员角色

rabbitmqctl set_user_tags root administrator

    3. 消息产生者客户端代码:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class RabbitMQProducer {
	private static final String IP="192.168.10.128";
	private static final String USER="root";
	private static final String PASSWORD="123456";
	private static final int PORT=5672;
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=null;
		Channel channel=null;
		try {
			String exchange_demo="exchange_demo";//声明一个交换器名称
			String queue_demo="queue_demo";//声明一个队列名称
			String route_demo="route_demo";//声明一个路由键,用于绑定交换器和队列
			ConnectionFactory fac=new ConnectionFactory();//获取一个rabbitMQ连接池,并设置相关参数
			fac.setHost(IP);
			fac.setPassword(PASSWORD);
			fac.setUsername(USER);
			fac.setPort(PORT);
			//从连接池中获取一个rabbitMQ连接
			connection=fac.newConnection();
			channel=connection.createChannel();//创建一个频道
			channel.exchangeDeclare(exchange_demo, "direct",false,false,null);//创建一个type为direct,持久化的、非自动删除的交换器
			channel.queueDeclare(queue_demo, true, false, false, null);//创建一个持久化、非排他的、非自动删除的交换器
			channel.queueBind(queue_demo, exchange_demo, route_demo);//将交换器和队列通过路由键绑定
			//发送一条消息
			String message="Hello World";
			channel.basicPublish(exchange_demo, route_demo, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
		} finally {
			//关闭资源
			if(connection!=null){
				if(channel!=null){
					channel.close();//可以不用关闭,当connection关闭后,channel也会自动关闭
				}
				connection.close();
			}
		}
	}
}

    4. 消息消费者客户端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQConsumer {
	private static final String IP="192.168.10.128";
	private static final String USER="root";
	private static final String PASSWORD="123456";
	private static final int PORT=5672;
	private static final String QUEUE_NAME="queue_demo";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=null;
		Address[] address={new Address(IP,PORT)};
		try {
			String queue_demo="queue_demo";//声明一个队列名称
			ConnectionFactory fac=new ConnectionFactory();//获取一个rabbitMQ连接池,并设置相关参数
			fac.setPassword(PASSWORD);
			fac.setUsername(USER);
			//从连接池中获取一个rabbitMQ连接
			connection=fac.newConnection(address);
			final Channel channel=connection.createChannel();//创建一个频道
			channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
			Consumer con=new DefaultConsumer(channel){
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
						byte[] body) throws IOException {
					System.out.println("get message:"+new String(body,"utf-8"));
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			};
			channel.basicConsume(queue_demo, con);
			if(channel!=null){
				channel.close();
			}
		} finally {
			//关闭资源
			if(connection!=null){
				
				connection.close();
			}
		}
	}
}

 

© 著作权归作者所有

共有 人打赏支持
江左煤郎
粉丝 26
博文 86
码字总数 220642
作品 0
西安
后端工程师
私信 提问
消息中间件—RabbitMQ(集群监控篇1)

摘要:任何没有监控的系统上线,一旦在生产环境发生故障,那么排查和修复问题的及时性将无法得到保证 一、为何要对消息中间件进行监控? 上线的业务系统需要监控,然而诸如消息队列、数据库、...

癫狂侠
2018/05/28
0
0
消息中间件—RabbitMQ(集群原理与搭建篇)

摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理 一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试...

癫狂侠
2018/05/25
0
0
消息中间件—RabbitMQ(初探篇)

文章摘要:本篇文章为RabbitMQ的入门文章,不像其他一些程序代码和应用实战性的文章会带着大家从一个“Hello World”的简单例子出发,在该篇幅中主要给大家讲下RabbitMQ消息队列的起源、为何...

癫狂侠
2018/05/23
0
0
玩转不同业务场景,这些RabbitMQ特性会是得力助

我是在解决分布式事务的一致性问题时了解到的RabbitMQ,当时主要是要基于RabbitMQ来实现我们分布式系统之间对有事务可靠性要求的系统间通信。 提到RabbitMQ,不难想到的几个关键字:消息中间...

java菜分享
2018/12/22
0
0
玩转不同业务场景,这些RabbitMQ特性会是得力助攻

我是在解决分布式事务的一致性问题时了解到的RabbitMQ,当时主要是要基于RabbitMQ来实现我们分布式系统之间对有事务可靠性要求的系统间通信。 简单总结了分布式系统中的消息中间件以及Rabbi...

2018/12/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

大数据教程(11.9)hive操作基础知识

上一篇博客分享了hive的简介和初体验,本节博主将继续分享一些hive的操作的基础知识。 DDL操作 (1)创建表 #建表语法CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name ...

em_aaron
今天
0
0
OSChina 周四乱弹 —— 我家猫真会后空翻

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @我没有抓狂 :#今天听这个# 我艇牛逼,百听不厌,太好听辣 分享 Led Zeppelin 的歌曲《Stairway To Heaven》 《Stairway To Heaven》- Led Z...

小小编辑
今天
2
0
node调用dll

先安装python2.7 安装node-gyp cnpm install node-gyp -g 新建一个Electron-vue项目(案例用Electron-vue) vue init simulatedgreg/electron-vue my-project 安装electron-rebuild cnpm ins......

Chason-洪
今天
3
0
scala学习(一)

学习Spark之前需要学习Scala。 参考学习的书籍:快学Scala

柠檬果过
今天
3
0
通俗易懂解释网络工程中的技术,如STP,HSRP等

导读 在面试时,比如被问到HSRP的主备切换时间时多久,STP几个状态的停留时间,自己知道有这些东西,但在工作中不会经常用到,就老是记不住,觉得可能还是自己基础不够牢固,知识掌握不够全面...

问题终结者
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部