文档章节

RabbitMQ Java 基本API

BakerZhu
 BakerZhu
发布于 2018/07/01 21:40
字数 3058
阅读 184
收藏 2

3 月,跳不动了?>>>

API

一、exchangeDeclare 交换器声明

    /**
     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange
     * @param type the exchange type
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
     * @param autoDelete true if the server should delete the exchange when it is no longer in use
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client.
     * @param arguments other properties (construction arguments) for the exchange
     * @return a declaration-confirm method to indicate the exchange was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;

exchange : 交换器名称
type : 交换器类型 参考:https://my.oschina.net/LucasZhu/blog/1838105
durable : true if we are declaring a durable exchange (the exchange will survive a server restart) 是否耐用,如果设置为耐用的,当重启服务,交换器还将继续存在,否则会消失
autoDelete :true if the server should delete the exchange when it is no longer in use 当建立一次链接之后,如果在没有其他队列链接此交换器,则交换器会自动删除。同 队列声明中autoDelete
internal : true if the exchange is internal, i.e. can't be directly  published to by a client. 是否是内部交换器,如果是内部交换器,则不能与外部client-客户端进行直接连接,例如死信队列中设置的exchange。
arguments  如下: 

二、channel.queueDeclare 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

1. queue:声明队列名称
2. durable :true if we are declaring a durable queue (the queue will survive a server restart) 如果我们声明为持久化 , 则在重新启动的时候该队列还将存在。
3. exclusive true if we are declaring an exclusive queue (restricted to this connection) 排他队列,限于创建此队列的链接进行操作
4. autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 当建立一次链接之后,如果在没有其他消费者链接此队列,则队列会自动删除。
5.队列如下相关的属性信息:

三、basicPublish 基础发送消息:

channel.basicPublish("", QUEUE_NAME, null, (message+"1").getBytes("UTF-8"));

/**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     *
     * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
     *
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

/**
 * Publish a message.
 *
 * Publishing to a non-existent exchange will result in a channel-level
 * protocol exception, which closes the channel.
 *
 * Invocations of <code>Channel#basicPublish</code> will eventually block if a
 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
 *
 * @see com.rabbitmq.client.AMQP.Basic.Publish
 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.
 * @param exchange the exchange to publish the message to
 * @param routingKey the routing key
 * @param mandatory true if the 'mandatory' flag is to be set
 * @param immediate true if the 'immediate' flag is to be
 * set. Note that the RabbitMQ server does not support this flag.
 * @param props other properties for the message - routing headers etc
 * @param body the message body
 * @throws java.io.IOException if an error is encountered
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;

exchange:交换器名称,如果为空字符串"",则直接发送消息到队列,不经过交换器。
routingKey : 为经过交换器中消息的路由规则,如果没有交换器(""),则它为队列的名称。#匹配0个或多个单词,*匹配一个单词  在topic exchange做消息转发用
props : 设置消息的属性信息,如消息头,消息的超时时间等等。
body : 消息体
mandatory : true 如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者。false : 出现上述情形broker会直接将消息扔掉。
immediate : true 如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

四、queueBind 队列和交换器的绑定

channel.queueBind(queueName, EXCHANGE_NAME, severity);

    /**
     * Bind a queue to an exchange, with no extra arguments.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue
     * @param exchange the name of the exchange
     * @param routingKey the routine key to use for the binding
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

queue : 队列名称。
exchange : 交换器名称。
routingKey  : 这里的routingKey也叫bindingKey 是队列和交换器的直接绑定规则。

五、channel.basicConsume

推模式获取消息(消息中间件主动将消息推送给消费者)

推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。好处很明显,消费者总是有一堆在内存中待处理的消息,所以效率高。缺点是缓冲区可能会溢出

Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		String message = new String(body, "UTF-8");
		System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
	}
};
channel.basicConsume(queueName, true, consumer);

/**
 * Start a non-nolocal, non-exclusive consumer, with
 * a server-generated consumerTag.
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @param callback an interface to the consumer object
 * @return the consumerTag generated by the server
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

round-robin(推-轮询)

推的方式采用的轮询(round-robin)的方式,这里轮询的意思不是消息的轮询,而是推送次数的轮询,推送可能一次推送一条消息也可能一次推送多条消息。实现方式如上。

chanel.basicQos()

/**
 * Request specific "quality of service" settings.
 *
 * These settings impose limits on the amount of data the server
 * will deliver to consumers before requiring acknowledgements.
 * Thus they provide a means of consumer-initiated flow control.
 * @see com.rabbitmq.client.AMQP.Basic.Qos
 * @param prefetchSize maximum amount of content (measured in
 * octets) that the server will deliver, 0 if unlimited
 * @param prefetchCount maximum number of messages that the server
 * will deliver, 0 if unlimited
 * @param global true if the settings should be applied to the
 * entire channel rather than each consumer
 * @throws java.io.IOException if an error is encountered
 */
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

/**
 * Request a specific prefetchCount "quality of service" settings
 * for this channel.
 *
 * @see #basicQos(int, int, boolean)
 * @param prefetchCount maximum number of messages that the server
 * will deliver, 0 if unlimited
 * @param global true if the settings should be applied to the
 * entire channel rather than each consumer
 * @throws java.io.IOException if an error is encountered
 */
void basicQos(int prefetchCount, boolean global) throws IOException;

/**
 * Request a specific prefetchCount "quality of service" settings
 * for this channel.
 *
 * @see #basicQos(int, int, boolean)
 * @param prefetchCount maximum number of messages that the server
 * will deliver, 0 if unlimited
 * @throws java.io.IOException if an error is encountered
 */
void basicQos(int prefetchCount) throws IOException;

prefetchSize:0 
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究

basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
prefetchSize 与 prefetchCount 可以同时设置,达到任何一个限制,则队列暂停推送消息
global 参数表示前两个参数的作用域,true 表示限制是针对信道的,false 表示限制是针对消费者的(我还没试过一个信道支持多个消费者的例子)
可以对同一个信道同时设置 global 为 true 和 false 的 Qos,表示队列要考虑每个消费者的限制,同时还要考虑整个信道的限制

fair dispatch(推-公平分发)

上面轮询推策略方式在服务器性能不均的情况下,带来消息在某个或某些消费者节点大量堆积的问题。
怎样才能做到按照每个消费者的能力分配消息呢,联合使用Qos和Acknowledge就可以做到。

channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        String message = new String(body, "UTF-8");
        System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
        try {
            TimeUnit.MILLISECONDS.sleep(2000);
        } catch (InterruptedException e) {
        }
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
channel.basicConsume(QUEUE_NAME, false, consumer);

核心:1.设置通道最多堆积(拥有)一条未提交的消息,当消费完成并且ack之后 channel再进行第二条消息的接收。2.设置ack方式为手动通知。

basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1,消息从队列异步推送给消费者,消费者的ack也是异步发送给队列,从队列的角度去看,总是会有一批消息已推送但尚未获得ack确认。Qos(Quality of Service)的prefetchCount参数就是来限制这批未确认消息的数量。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

六、channel.basicGet 

拉模式获取消息(消费者主动从消息中间件拉取消息)

拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。

while(true) {
	GetResponse response = channel.basicGet(queueName, true);
	if(response == null) {
		System.out.println("Get Nothing!");
		TimeUnit.MILLISECONDS.sleep(1000);
	} else {
		String message = new String(response.getBody(), "UTF-8");
		System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", queueName, message);
		TimeUnit.MILLISECONDS.sleep(500);
        channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
	}
}

/**
 * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
 * @see com.rabbitmq.client.AMQP.Basic.Get
 * @see com.rabbitmq.client.AMQP.Basic.GetOk
 * @see com.rabbitmq.client.AMQP.Basic.GetEmpty
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @return a {@link GetResponse} containing the retrieved message data
 * @throws java.io.IOException if an error is encountered
 */
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;
非阻塞方法,如果队列中没有数据会返回null 

七、channel.basicAck();

/**
 * Acknowledge one or several received
 * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
 * containing the received message being acknowledged.
 * @see com.rabbitmq.client.AMQP.Basic.Ack
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param multiple true to acknowledge all messages up to and
 * including the supplied delivery tag; false to acknowledge just
 * the supplied delivery tag.
 * @throws java.io.IOException if an error is encountered
 */
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag : 该消息的index 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。RabbitMQ保证在每个信道中,每条消息的DeliveryTag 从1开始递增。
multiple : 是否批量 true : 将一次性ack所有小于deliveryTag的消息。false 确认当前消息。

八、channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

/**
 * Reject one or several received messages.
 *
 * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
 * @see com.rabbitmq.client.AMQP.Basic.Nack
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param multiple true to reject all messages up to and including
 * the supplied delivery tag; false to reject just the supplied
 * delivery tag.
 * @param requeue true if the rejected message(s) should be requeued rather
 * than discarded/dead-lettered
 * @throws java.io.IOException if an error is encountered
 */
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

deliveryTag : 该消息的index
multiple : 是否批量. true 将一次性拒绝所有小于deliveryTag的消息。
requeue : 被拒绝的是否重新入队。

九、channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

/**
 * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
 * containing the received message being rejected.
 * @see com.rabbitmq.client.AMQP.Basic.Reject
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
 * @throws java.io.IOException if an error is encountered
 */
void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag : 该消息的index 。
requeue : 被拒绝的是否重新入队。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息

 

 

© 著作权归作者所有

BakerZhu
粉丝 109
博文 517
码字总数 423077
作品 0
通州
程序员
私信 提问
加载中

评论(0)

RabbitMQ入门(1)--介绍

前面声明本文都是RabbitMQ的官方指南翻译过来的,由于本人水平有限难免有翻译不当的地方,如发现不对的地方,请联系下我,好及时改正。好了,正文开始: RabbitMQ 是一个消息代理。这主要的原...

-悟空-
2015/02/24
2.4W
18
RabbitMQ 3.6.5 Milestone 2 发布

RabbitMQ 3.6.5 Milestone 2 发布了,RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继...

凝小紫
2016/08/30
939
5
redhat 安装Rabbitmq

先安装JDK 安装erlang环境: 安装依赖文件:yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto; yum install unixODBC-devel 解压erlang文件:tar -zxvf otpsrc19.2.ta......

ts88
2018/04/23
137
0
RabbitMQ 3.6.1 RC1 发布

RabbitMQ 3.6.1 RC1 发布了,下载地址: https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_1_rc1 该版本完整的改进内容如下: Bug Fixes Purging a lazy queue cou......

淡漠悠然
2016/02/13
976
0
RabbitMQ 3.6.6 Milestone 5 发布

RabbitMQ 3.6.6 Milestone 5 发布了,RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继...

淡漠悠然
2016/10/08
2K
2

没有更多内容

加载失败,请刷新页面

加载更多

什么是Android上的“上下文”? - What is 'Context' on Android?

问题: In Android programming, what exactly is a Context class and what is it used for? 在Android编程中, Context类到底是什么?它的用途是什么? I read about it on the developer......

技术盛宴
45分钟前
18
0
OkHttp配置HTTPS访问+服务器部署

1 概述 OkHttp配置HTTPS访问,核心为以下三个部分: sslSocketFactory() HostnameVerifier X509TrustManager 第一个是ssl套接字工厂,第二个用来验证主机名,第三个是证书信任器管理类.通过OkHtt...

氷泠
今天
20
0
华为P40发布:搭载HMS硬刚谷歌,未涨价抢全球高端机市场

  文连线 Insight,作者向阳,编辑水笙   3 月 26 日晚,华为消费者业务 CEO 余承东登上台,以熟悉的英文口音开启了华为发布会,他说,“这就是我们的 P40 系列。”   以往华为P系列通...

水果黄瓜
今天
28
0
如何从Java中的字符串值获取枚举值? - How to get an enum value from a string value in Java?

问题: Say I have an enum which is just 说我有一个枚举 public enum Blah { A, B, C, D} and I would like to find the enum value of a string, for example "A" which would be B......

javail
今天
9
0
OSChina 周一乱弹 —— 小姐姐,这tm不是犬耳娘吗!你认错了吧

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《Drip Drip Drip》- 音乐治疗 手机党少年们想听歌,请使劲儿戳(这里) @-Eric- ...

小小编辑
今天
46
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部