文档章节

RabbitMQ探索:结构分析与常用方法解释

玄影镜心
 玄影镜心
发布于 2016/12/16 15:57
字数 3129
阅读 810
收藏 1

前言:包的引入

首先,我们使用的是基于spring boot的rabbitMq,

Maven地址

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.0.RELEASE</version>
        <relativePath />
    </parent>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

系统中会加入这样一个类库:

打开发现里面只有一个pom文件

打开以后我们看到:

三个jar包自上到下,分别是对Spring Boot的核心启动器(包含了自动配置、日志和YAML),为集成messaging api和消息协议提供支持,以及spring对rabbitmq的封装来支持AMQP协议

spring-rabbit提供了对RabbitMq客户端的封装。所以我们在研究spring-rabbit源码时候应该先对rabbitMq client有一个初步认识。

首先,我们将目光放在客户端client的包中,这个包定义了rabbitMq client所需要的一系列功能接口、实现类以及实际相关的操作类。

 

正文:

1:AMQP与AMQPImpl

AMQP接口及其实现类,定义了AMQP协议底层通讯需要的相关契约,在我使用时候可以忽略,如果对这方面有想深入可以去了解,再次我们仅列出相关代码,不在深究。

下面我们根据RabbitMq的使用流程来逐步进行分析。

 

2:ConnectionFactory

ConnectionFactory从名字看可以得知,它是一个用于获得Connection的工厂类。它定义了创建Connection需要的一系列默认参数,以及对参数的getter/setter方法。

常量类用于定义AMQP协议对接和系统内默认值,我们可以忽略,下面的私有变量是我们可以通过set方法进行配置的属性。这些属性直接与我们的系统配置和之后的调优相关,可以重点研究一下。

(上图为connection私有变量以及方法,此处并不完整,仅供参考)

与ConnectionFactory功能设计相关的一系列方法,对创建Connection提供了多种方法。此处我们不在深究。只要知道这个类最主要的功能——创建链接。

 

3:Connection

public interface Connection extends ShutdownNotifier {…}

Connection接口继承了ShutdownNotifier接口,ShutdownNotifier为连接添加一系列关闭通知的支持,包括添加关闭监听器等,在此我们可以先忽略,把目光放在Connection本身。

由上图可以看出,Connection首先定义了一系列获取相关参数的get方法。在我们需要获取参数的时候可以使用。

这个接口中最常用也最重要的两个功能:createChannel close

为了解Connection我们可以看一下他的实现类:AMQConnection,忽略其他类成员,主要看一下createChannel的实现。

1:首先,它调用私有方法ensureIsOpen(),这个方法名字很有迷惑性,它并不确保Connection处于开启状态,它所做的仅仅是判断连接是否已经被打开,如果连接处于关闭状态,会直接抛出一个AlreadyClosedException。

2:然后,判断他的成员变 _channelManager是否为null。

    _channelManager作为ChannelManager类的对象,用来对Connection中的Channel进行管理(通常我们不会创建多个Connection因为这是一种比较重要的资源,而每个Connection中都可以创建多个Channel。)

    _channelManager用volatile标记以确保多线程的可见,它在Connection被启动时候创建并传入参数,此处规定Connection中必须包含一个初始化的ChannelManager的对象,也就是说所有Channel都必须被置于ChannelManager中管理,否则不能被成功创建。(至于_channelManager什么时候会为null,我并没有深究)。

3:最后,调用ChannelManager的createChannel方法来创建Channel.

    此方法会给通道赋予一个编号(或者自定义编号),并返回Channel的实现类ChannelN。至此Channel创建完成,我们得到了一个可以用于绑定Queue的信道。

 

4:Channel

前文介绍的从Connection创建、启动到生成Channel这一系列动作,在我们实际应用中其实往往只需要几行代码就可以编写完成。 我们已经知道Channel相对Connection是更廉价的资源,通过对Channel进行操作,我们可以完成大部分需要的业务功能,所以这一章相对前面更重点一些。

         下面,我们先预览一下Channel接口的契约,由于方法过多,且分割开来看。

getChannelNumber : 获取该channel在channelManager中的序列。

getConnection:    获取包含此channel的Connection。

Close : 关闭当前channel

flowBlocked  : 该方法已经被弃用,不深究。

abort : 中止通道, 这个方法的实现通常是调用close方法(close方法五个参数boolean abort 设置为true),与close不同的是使用abort关闭通道时,此操作中的所有异常都会被丢弃。

addReturnListener:在通道上添加一个returnListener,来处理不能发送或未被送达的消息。)如果发送时设置“强制发送”或“立即发送” ,但是消息没有被接收到,就会回returnListener。

removeReturnListener : 移除一个returnListener,注意一个Channel允许有多个returnListener,移除时需要传入被移除对象的引用。

clearReturnListeners : 清除该channel上所有returnListener (其实是调用Collection<ReturnListener>的clear() )

另外几个不同用途监听器的方法此处忽略,有需求了请自行研究。

getDefaultConsumer和setDefauletConsumer这两个方法对Channel的私有变量defaultConsumer进行操作,允许channel设置一个默认的消费者以处理某些情况,但大多数时候并不需要使用这个方法,此处不再深究。

这一系列功能应该是我们使用最多,也是最需要了解的部分了。

basicQos:拥有三个重载方法,通过basicQos为客户设置同一时间点获得消息数的最大值。

在默认状态下,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息(轮询)。但如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。

例如:basicQos(1),这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会再将新的Message分发给它。()

basicPublish : 发布消息! 这是最最重要的方法了,我们所做的一切操作都是为了消息最后的发送,这个方法含有三个重载方法,主要包含对exchange、routingKey、是否持久化、是否独占和消息主体等一系列对象引用和配置。发送消息时候需要根据实际需求进行配置。

exchangeDeclare : 声明一个交换机! 所有message都是Channel---->exchange---->queue的,调用这个方法创建一个Exchange,这里的Exchange的类型直接决定了之后的消息分发机制。

exchangeDelete : 删除交换机,当参数中的ifUnused为true时,只允许exchange为使用时被删除。

exchangeBind:绑定两个exchange,应该是将一个源交换机的消息绑定到另一个交换机(我并没有使用过这个方法,因为我还没有想到这个方法的应用场景,知道有个这个功能即可,先不深究。)

exchangeUnbind: 解除交换机的绑定,exchangeBind的反向方法,不深究。

 

queueDeclare:声明一个队列!重要方法,共两个重载方法。我们只看参数最多的一个。

queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments){...}

String queue: 队列名称。

boolean durable是否持久化,为true时,该队列将在服务器重启后仍然存在(当RabbitMQ退出时,默认会将消息和队列都清除)。

boolean exclusive是否为独占队列,为true时,这个队列是独占队列,只被声明者使用。

boolean autoDelete是否自动删除,为true时,服务器将在这个队列不再使用时候删除它。

Map<String, Object> arguments:队列的其他属性,即Queue实例中的arguments参数,具体作用目前我并没有使用过,通常设置为null。

 

queueDelete : 删除队列。对指定name的队列进行删除操作,方法中两个布尔值分别为:是否在未使用时候删除和是否在队列为空时删除。

 

queueBind : 队列与交换机的绑定,也是常用的方法之一。所有队列都必须与一个交换机进行绑定,从而接收消息。

queueUnbind:解除queue与exchange的绑定。

queuePurge : 清除指定队列的内容。

basicGet : 从队列上检索消息,获取一个持有消息相应数据的bean :GetResponse。

basicAck :消息确认。RabbItMQ发送了一个消息给消费者后,会马上从内存中移除这个信息。为了保证消息永远不会丢失,RabbitMQ支持消息应答。消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。如果消费者没有发送应答,RabbitMQ会认为该信息没有被完全处理,然后将会重新转发给别的消费者。(消息应答默认是打开的),在消费者调用basicConsume方法时若设置为false,则需要调用basicAck手动返回应答

basicNack :拒绝一个或多个接收的消息

basicReject : 拒绝消息 (这两个没有看。)

客户端接收消息的模式默认是自动应答,但是通过设置autoAck为false可以让客户端主动应答消息。当客户端拒绝此消息或者未应答便断开连接时,就会使得此消息重新入队。

basicConsume : 启用消费者。消费者通常使用的基本方法,内部调用了Consumer的handleConsumeOk进行处理(通常在ConsumerDispatcher内实现),将一个Consumer的实现对象注册到Queue中。

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal,
 boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException{...}

String queue: 队列名称

boolean autoAck: 自动确认,为false时需要消费者调用basicAck进行显示确认

String consumerTag: 客户端生成的消费者标签,用于建立上下文。

boolean noLocal:如果不应该给此消费者传递消息,则设置为true

boolean exclusive: 是否独占

Map<String, Object> arguments : 其他配置参数

Consumer callback : 消费者对象的接口

 

basicCancel: 取消消费者。调用Consumer的handleCancelOk处理

basicRecover : 恢复消息。请求重新发送未确认的消息,当消费者断开连接未能及时给服务端返回消息确认时,可以调用此方法要求再次获取之前的消息。如果为true则消息将被重新排序和还可能交付给不同的消费者。 如果为false,则将消息重新发送给之前的消费者。

txSelect:在通道上开启事务,保证消息不会丢失。(和confirm模式不能同时使用,而且会带来大量的多余开销,导致吞吐量下降很多,故而不推荐。)

txConfirm:提交事务。

txRollback:回滚事务。

confirmSelect:启用通道的confirm模式。认情况下,发送端不关注发出去的消息是否被消费掉了。可设置channel为confirm模式,所有发送的消息都会被确认一次,用户可以自行根据server发回的确认消息查看状态。

getNextPublishSeqNo:查看下一个要发送的消息的序号。

waitForConfirms:等待所有消息发送并确认。 

messageCount:返回队列中即将发送的消息数量。

consumerCount:返回队列中的消费者数量。

以上是channel接口定义的全部方法内容,具体实现可以查看ChannelN。

 

5:Consumer与QueueingConsumer

从接口的方法名可以知晓,这个接口定义了一系列消费者处理消息后的回调方法,通常我们并不直接使用这些方法。

QueueingConsumer:

nextDelivery : 等待下一个消息传递并返回。

QueueingConsumer类含有一个内部类Delivery,它抽象了一个消息实体,我们对消费者的大部分处理都是操作delivery来完成的。

_body是一个byte[],保存我们的消息主体。

_envelope是类Envelope的实例对象,它封装了一系列于AMQP的参数和方法。可以使用getEnvelope()获取当前的信息。

_properties记录了一系列与请求本身相关的基本属性,并未这些参数提供了一系列get方法,以供获取。

清单:

<完>

© 著作权归作者所有

玄影镜心
粉丝 9
博文 103
码字总数 53409
作品 0
西安
高级程序员
私信 提问
1、RabbitMQ 入门秘籍,三分钟带你快速了解RabbitMQ

一、前言 刚开始接触RabbitMQ的时候,有些概念那理解起来简直是像风像雨又像雾,晦涩难懂。这篇文章用尽可能浅显的语言来解释RabbitMQ的入门知识。毕竟是入门课程,并没有对很多概念进行深入...

极客慧
2018/11/26
206
0
Centos安装rabbitmq的php扩展

前边我们介绍过怎么安装rabbitmq,但是想用php来调用rabbitmq可是没那么简单了,整整搞了一个下午才搞定.... 主要是两个包 1.rabbitmq-c的包 2.amqp的包 下载 首先是rabbitmq-c-0.4.1.tar.gz包...

mac_zhao
2014/09/26
47
0
openstack 最简单的 RabbitMQ 监控方法

先来看张图: 这是 Nova 的架构图,我们可以看到有两个组件处于架构的中心位置:数据库和Queue。数据库保存状态信息,而几乎所有的 nova-* 服务都直接依赖于 Queue 实现服务之间的通信和调用...

zhongbeida_xue
2018/05/09
0
0
OpenStack中RabbitMQ RPC 调用研究

这两天研究了一下,OpenStack的工作原理,并着重调研了一下RabbitMQ在OpenStack中扮演的角色。 首先,OpenStack中模块Volume Control、Network Controller、ComputeController以及Scheduler...

icheer
2013/08/27
418
0
三、RabbitMQ如何实现AMQ协议(读书笔记)

AMQP作为一种RPC传输机制 RabbitMQ作为一种AMPQ代理服务器,提供了一套严格的通信方式,核心部分的通信几乎都使用了RPC(远程过程调用)模式。 启动会话 AMQP协议定义,当客户端要与RabbitM...

XuePeng77
03/12
88
0

没有更多内容

加载失败,请刷新页面

加载更多

【2019年8月版本】OCP 071认证考试最新版本的考试原题-第5题

choose the best answer The CUSTOMERS table has a CUST_LAST_NAME column of data type VARCHAR2. The table has two rows whose COST_LAST_MANE values are Anderson and Ausson. Which q......

oschina_5359
37分钟前
3
0
电脑怎样制作流程图?分享绘制流程图方法

流程图的绘制可以用很多方法来实现,小编经常使用电脑对流程图进行绘制,即简单又便利,相信很多朋友都因为不知道怎样绘制流程图而选择了放弃,今天这篇文章希望可以让大家重拾绘制流程图的信...

干货趣分享
39分钟前
4
0
Elasticsearch 7.x 之文档、索引和 REST API 【基础入门篇】

前几天写过一篇《Elasticsearch 7.x 最详细安装及配置》,今天继续最新版基础入门内容。这一篇简单总结了 Elasticsearch 7.x 之文档、索引和 REST API。 什么是文档 文档Unique ID 文档元数据...

泥瓦匠BYSocket
43分钟前
5
0
TL665x-EasyEVM开发板处理器、flash、RAM

TL665x-EasyEVM是广州创龙基于SOM-TL665x核心板研发的一款TI C66x多核定点/浮点高性能DSP开发板,采用核心板+底板方式,底板尺寸为200mm*106.65mm,采用4*50pin和1*80pin B2B工业级连接器,稳...

Tronlong创龙
47分钟前
3
0
DevExpress Report-XRTable绑定数据

将从跳转前的页面(A)中获取传入的数据(dtOrd、BatchID、ModelID),绑定到Report报表对应的控件 ,代码如下: this.xrtBatchID.Text = sBatchID; this.xrtModel.Text ...

_Somuns
48分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部