Rabbitmq Java入门指南

原创
2018/05/24 14:10
阅读数 2.1K

Java客户端API指南

本指南涵盖了RabbitMQ Java客户端及其公共API。它假定使用最近的主要版本的客户端,读者熟悉基础知识

该库的5.x版本系列需要JDK 8,用于编译和运行时。在Android上,这意味着只支持Android 7.0或更高版本。4.x版本系列支持7.0之前的JDK 6和Android版本。

该库是开源的,在GitHub上开发,并在三重许可下

· Apache公共许可证2.0

· Mozilla公共许可证

· GPL 2.0

这意味着用户可以考虑使用上述列表中的任何许可证进行许可。例如,用户可以选择Apache Public License 2.0并将该客户端包含到商业产品中。根据GPLv2许可的代码库可以选择GPLv2等。

还有一些 与Java客户端一起提供的命令行工具

客户端API在AMQP 0-9-1协议模型上进行了严格建模,并提供了更多的抽象以便于使用。

一个API参考(JavaDoc的)是单独提供的。

概观

RabbitMQ Java客户端使用com.rabbitmq.client作为其顶层包。关键类和接口是:

· Channel:表示AMQP 0-9-1通道,并提供大部分操作(协议方法)。

· Connection:代表AMQP 0-9-1连接

· ConnectionFactory:构造连接实例

· Consumer:代表消息消费者

· DefaultConsumer:消费者常用的基类

· BasicProperties:消息属性(元数据)

· BasicProperties.Builder:建设者BasicProperties

协议操作可通过 Channel接口获得。Connection用于打开Channel,注册连接生命周期事件处理程序,并关闭不再需要的连接。 Connection通过ConnectionFactory实例化,这就是您如何配置各种连接设置,如虚拟主机或用户名。

连接和频道

核心API类是Connection 和Channel,分别代表AMQP 0-9-1 connection 和Channel。它们通常在使用前进口:

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

连接到RabbitMQ

以下代码使用给定参数(host name, port number, etc)连接到RabbitMQ节点:

ConnectionFactory factory = new ConnectionFactory();

//  默认账号密码|“guest,限于本地连接 

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

 

Connection conn = factory.newConnection();

所有这些参数都对本地运行的RabbitMQ节点具有合理的默认值。如果在创建连接之前属性保持未分配状态,将使用属性的默认值:

属性

默认值

Username

"guest"

Password

"guest"

Virtual host

"/"

Hostname

"localhost"

port

5672用于常规连接, 5671用于使用TLS的连接

或者,可以使用URI

ConnectionFactory factory = new ConnectionFactory();

factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");

Connection conn = factory.newConnection();

所有这些参数都对本地运行的RabbitMQ服务器有合理的默认值。

请注意,用户guest只能默认从本地主机连接。这是为了限制生产系统中众所周知的凭证使用。

使用Connection接口创建channel:

Channel channel = conn.createChannel();

现在可以使用channel发送和接收消息,如后面的部分所述。

在服务器节点日志中 可以观察到成功和不成功的客户端连接事件。

断开与RabbitMQ的连接

要断开连接,只需关闭通道和连接:

channel.close();

conn.close();

请注意,关闭频道可能被认为是很好的做法,但在这里并不是必须的 - 当底层连接关闭时,它将自动完成。

客户端断开事件可以在服务器节点日志中观察到

ConnectionChannel周期

Connection意味着周期较长。底层协议针对长时间运行的连接进行设计和优化。这意味着每个操作打开一个新的连接,例如发布的消息是不必要的,并且强烈不鼓励,因为它会引入大量的网络往返和开销。

Channel也意味着周期较长,但由于许多可恢复的协议错误会导致频道关闭,所以频道使用寿命可能会比连接频率短。每次操作关闭和打开新频道通常是不必要的,但可以适当。如有疑问,请考虑重复使用channel第一。

Channel级异常(例如尝试从不存在的队列中消耗)将导致通道关闭。已关闭的频道不能再使用,并且不会再收到来自服务器的更多事件(如消息传递)。Channel级异常将由RabbitMQ记录并启动通道的关闭序列(见下文)。

使用交换机(Exchanges队列(Queues 

客户端应用程序与协议的高级构建块交换和队列一起工作。这些必须在可以使用之前进行声明。声明任何一种类型的对象只是确保其中一个名称存在,并在必要时创建它。

继续前面的例子,下面的代码声明了一个exchange和一个queue,然后将它们绑定在一起。

channel.exchangeDeclare(exchangeName, "direct", true);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明以下对象,这两个对象都可以通过使用其他参数进行定制。这里他们都没有任何特别的论点。

1. 一个持久的,非自动删除的“direct”类型的交换

2. 一个具有生成名称的非持久,独占,自动删除队列

上面的函数调用然后使用给定的路由密钥(routing key)将队列绑定到交换机。

请注意,当只有一个客户端想要使用它时,这将是一种典型的声明方式:它不需要知名的名称,没有其他客户端可以使用它(独占),并且会自动清除(自动删除)。如果有几个客户想共享一个知名名称的队列,那么这个代码将是合适的:

channel.exchangeDeclare(exchangeName,“direct”,true);

channel.queueDeclare(queueName,true,false,false,null);

channel.queueBind(queueName,exchangeName,routingKey);

声明:

1. 一个持久的,非自动删除的“direct”类型的交换

2. 一个持久的,非独占,自动删除队列

许多Channel API方法被重载。这些便捷的ExchangeDeclare,queueDeclare和queueBind短格式 使用合理的默认值。还有更多的参数更多的表单,可以根据需要覆盖这些默认值,在需要的地方提供完全控制。

这种“简单形式,长形式”模式在客户端API使用中使用。

一些常见操作还有一个“不等待”版本,不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用

channel.queueDeclareNoWait(queueName,true,false,false,null);

“不等待”版本更高效,但提供较低的安全保证,例如,它们更依赖于检测失败操作的心跳机制。如有疑问,请从标准版开始。只有在高拓扑(队列,绑定)流失的场景中才需要“无等待”版本。

队列或交换可以被明确删除:

channel.queueDelete("queue-name")

只有在队列为空时才能删除队列:

channel.queueDelete(“queue-name”,false,true)

或者如果没有使用(没有任何消费者):

channel.queueDelete(“queue-name”,true,false)

可以清除队列(删除所有消息):

channel.queuePurge(“队列名称”)

发布消息

要将消息发布到交易所,请按如下方式使用Channel.basicPublish:

byte[] messageBodyBytes = "Hello, world!".getBytes();

channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了进行良好的控制,您可以使用重载的变体来指定强制标志,或使用预设的消息属性发送消息:

channel.basicPublish(exchangeName, routingKey, mandatory,

                    MessageProperties.PERSISTENT_TEXT_PLAIN,

                     messageBodyBytes);

这将发送带有交付模式2(持久性),优先级1和内容类型“text / plain”的消息。你可以使用一个Builder类来构建你自己的消息属性对象,只要你喜欢就可以提供许多属性,例如:

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .contentType("text/plain")

               .deliveryMode(2)

               .priority(1)

               .userId("bob")

               .build()),

               messageBodyBytes);

本示例使用自定义标题发布消息:

Map<String, Object> headers = new HashMap<String, Object>();

headers.put("latitude",  51.5252949);

headers.put("longitude", -0.0905493);

 

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .headers(headers)

               .build()),

               messageBodyBytes);

本示例发布包含过期(expiration)的消息:

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .expiration("60000")

               .build()),

               messageBodyBytes);

我们没有在这里说明所有的可能性。

请注意,BasicProperties是自动生成的持有AMQP的内部类。

Channel#basicPublish的 调用最终会阻止 资源驱动型警报生效。

通道Channels 和并发注意事项(线程安全)

作为一个经验法则,在线程之间共享Channel实例是需要避免的。应用程序应该更喜欢使用每个线程的通道,而不是在多个线程之间 共享同一个通道

尽管通道上的某些操作可以安全地同时调用,但有些操作不会并且会导致不正确的帧交错,双重确认等。

在共享通道上同时发布可能会导致连线上的帧错误交错,触发连接级别的协议异常并由代理立即关闭连接。因此它需要在应用程序代码中进行明确的同步(Channel#basicPublish必须在关键部分中调用)。在线程之间共享频道也会干扰发布商确认。最好避免在共享通道上同时发布,例如通过使用每个线程的通道。

可以使用通道池来避免在共享通道上同时发布:一旦线程完成一个通道的处理,它就会将其返回到池中,从而使该通道可用于另一个线程。通道池可以被认为是一个特定的同步解决方案。建议使用现有的共享库来代替自行开发的解决方案。例如,Spring AMQP 具有即用型通道池功能。

通道消耗资源,在大多数情况下,应用程序在同一个JVM进程中很少需要超过几百个开放通道。如果我们假设应用程序对每个通道都有一个线程(因为不应该同时使用通道),那么单个JVM的数千个线程已经有相当可观的开销,这可能是可以避免的。此外,一些快速发布商可以轻松地使网络接口和代理节点饱和:发布涉及的工作量少于路由,存储和传递消息的工作量。

要避免的典型反模式是为每个发布的消息打开一个频道。渠道应该是相当长寿的,打开一个新渠道是一个网络往返,这使得这种模式非常低效。

在一个线程中使用并在共享通道上的另一个线程中发布可能是安全的。

服务器推送的交付(请参见下面的部分)与保证每通道排序被保留的保证同时进行分派。调度机制使用java.util.concurrent.ExecutorService,每个连接一个。可以提供一个自定义执行程序,该自定义执行程序将由使用 ConnectionFactory#setSharedExecutor设置程序的单个ConnectionFactory生成的所有连接共享。

使用手动确认时,重要的是要考虑哪些线程进行确认。如果它与接收交付的线程不同(例如Consumer#handleDelivery 委托交付处理到另一个线程),则将multiple参数设置为true进行确认是不安全的,将导致双重确认,并因此导致通道级协议异常关闭频道。一次确认一条消息可能是安全的。

通过订阅接收消息(“推送API”)

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.DefaultConsumer;

接收消息的最有效方式是使用Consumer 界面设置订阅。消息将在到达时自动发送,而不是必须明确要求。

在调用与Consumer相关的API方法时 ,个人订阅始终由其消费者标签引用。消费者标签是消费者标识符,可以是客户端或服务器生成的。要让RabbitMQ生成节点范围的唯一标记,请使用Channel#basicConsume覆盖,该覆盖不会接收使用者标记参数,也不会传递消费者标记的空字符串,并使用Channel#basicConsume返回的值。消费者标签用于取消消费者。

不同的消费者实例必须具有不同的消费者标签。强烈建议在连接上重复使用消费者标签,并且可能会导致自动连接恢复问题,并在监控消费者时混淆监控数据。

实现Consumer的最简单方法是为便利类DefaultConsumer创建子类。该子类的一个对象可以通过basicConsume 调用来设置订阅:

boolean autoAck = false;

channel.basicConsume(queueName, autoAck, "myConsumerTag",

     new DefaultConsumer(channel) {

         @Override

         public void handleDelivery(

String consumerTag,

                        Envelope envelope,

                        AMQP.BasicProperties properties,

                        byte[] body)

             throws IOException

         {

             String routingKey = envelope.getRoutingKey();

             String contentType = properties.getContentType();

             long deliveryTag = envelope.getDeliveryTag();

             //(处理消息组件在这里...) 

             channel.basicAck(deliveryTag, false);

         }

     });

在这里,因为我们指定了autoAck = false,确认传递给消费者的消息,最简单的 方法是在handleDelivery方法中完成,如图中所示。

更复杂的消费者将需要覆盖更多的方法。特别是,handleShutdownSignal 当通道和连接关闭被调用,handleConsumeOk传递消费者标签的任何其他回调到之前消费者被调用。

消费者也可以分别实现 handleCancelOk和handleCancel 方法来通知显式和隐式取消。

您可以使用 Channel.basicCancel明确取消特定的消费者:

channel.basicCancel(consumerTag);

通过消费者标签。

就像出版商一样,为消费者考虑并发危害安全也很重要。

对消费者的回调被调度到与实例化其通道的线程分离的线程池中 。这意味着消费者可以安全地调用Connection或Channel上的阻塞方法 ,例如 Channel#queueDeclare或 Channel#basicCancel。

每个通道都有自己的调度线程。对于每个 频道一个消费者最常见的使用情况,这意味着消费者不支持其他消费者。如果每个频道有多个消费者,请注意,长时间运行的消费者可能会阻止向该频道上的 其他消费者发送回调 。

有关并发性和并发性危害安全性的其他主题,请参阅并发注意事项(线程安全性)部分。

检索单个消息(“Pull API”)

要显式检索消息,请使用 Channel.basicGet。返回的值是GetResponse的一个实例,从中可以提取标题信息(属性)和消息正文:

boolean autoAck = false ;

GetResponse response = channel.basicGet(queueName, autoAck);

if (response == null) {

     //没有检索到消息。

} else {

    AMQP.BasicProperties props = response.getProps();

    byte[] body = response.getBody();

    long deliveryTag = response.getEnvelope().getDeliveryTag();

    ...

 

    

并且由于上面的autoAck = false,您还必须调用Channel.basicAck来确认您已成功接收消息:

    ...

    channel.basicAck(method.deliveryTag,false); //确认收到消息 

}

处理不可路由的消息

如果消息发布时设置了“强制(mandatory)”标志,但无法路由,代理会将其返回给发送客户端(通过AMQP.Basic.Return 命令)。

通知这样的回报,客户可以实现ReturnListener 接口并调用Channel.addReturnListener。如果客户端尚未配置特定通道的返回侦听器,则相关的返回消息将被静默放弃。

channel.addReturnListener(new ReturnListener() {

    public void handleReturn(int replyCode,

                                  String replyText,

                                  String exchange,

                                  String routingKey,

                                  AMQP.BasicProperties properties,

                                  byte[] body)

    throws IOException {

        ...

    }

});

例如,如果客户端发布的消息的“mandatory”标志设置为未绑定到队列的“direct”类型的交换,则会调用返回监听器。

关机协议

客户端关机过程概述

AMQP 0-9-1连接和通道共享相同的一般方法来管理网络故障,内部故障和明确的本地关闭。

AMQP 0-9-1连接和通道具有以下生命周期状态:

· open:对象已准备好使用

· closing:对象已明确通知本地关闭,已向任何支持的下层对象发出关闭请求,并且正在等待其关闭过程完成

· closed:对象已收到来自任何底层对象的所有关闭完成通知,因此已关闭

这些对象总是处于关闭状态,无论导致关闭的原因如应用程序请求,内部客户端库故障,远程网络请求还是网络故障。

AMQP连接和通道对象具有以下与关机相关的方法:

· addShutdownListener(ShutdownListener listener)和 removeShutdownListener(ShutdownListener listener)来管理任何侦听器,当对象转换到关闭(closing)状态时将会触发这些侦听器 。请注意,将ShutdownListener添加到已关闭的对象将立即触发侦听器

· getCloseReason(),以允许调查对象关闭的原因

· isOpen(),用于测试对象是否处于打开状态

· close(int closeCode,String closeMessage),以显式通知要关闭的对象

监听的简单用法如下所示:

import com.rabbitmq.client.ShutdownSignalException;import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {

    public void shutdownCompleted(ShutdownSignalException cause)

    {

        ...

    }

});

关于关机情况的信息

可以通过显式调用getCloseReason() 方法或使用ShutdownListener类的服务中的cause参数(ShutdownSignalException cause) 方法来检索 ShutdownSignalException,其中包含关于关闭原因的所有可用信息。

该ShutdownSignalException类提供方法来分析关机的原因。通过调用isHardError()方法,我们可以获得有关连接或通道错误的信息,getReason()以AMQP方法的形式返回有关原因的信息 -AMQP.Channel.Close或 AMQP.Connection.Close(如果原因是库中的某个异常(例如网络通信故障),则返回null,在这种情况下,可以使用getCause()检索异常。

public void shutdownCompleted(ShutdownSignalException cause){

  if (cause.isHardError())

  {

    Connection conn = (Connection)cause.getReference();

    if (!cause.isInitiatedByApplication())

    {

      Method reason = cause.getReason();

      ...

    }

    ...

  } else {

    Channel ch = (Channel)cause.getReference();

    ...

  }

}

  

原子性和使用isOpen()方法

不建议在生产代码中使用通道和连接对象 的isOpen()方法,因为方法返回的值取决于关闭原因的存在。以下代码说明了竞争条件的可能性:

public void brokenMethod(Channel channel){

    if (channel.isOpen())

    {

 //下面的代码依赖于处于打开状态的通道。//但是没有在信道状态变化的可能性ISOPEN()和basicQos(1)呼叫之间// 

       ...

        channel.basicQos(1);

    }

}  

相反,我们通常应该忽略这种检查,并简单地尝试所需的行动。如果在代码的执行过程中连接的通道关闭,则会引发ShutdownSignalException异常,指示对象处于无效状态。我们还应该捕获 由SocketException引起的IOException,当代理意外关闭连接时,或者在代理启动clean close时发生ShutdownSignalException。

public  void  validMethod (Channel channel) {

     try {

        ...

        channel.basicQos( 1);

    } catch(ShutdownSignalException sse){

         //可能检查频道是否被关闭

        //当我们开始操作时,

        //关闭它的原因 

        ...

    } catch(IOException ioe){

         //检查连接关闭的原因 

        ...

    }

}

高级连接选项

消费者线程池

消费者线程(请参阅下面的接收)默认情况下会自动分配到新的ExecutorService线程池中。如果需要更大的控制权,请在newConnection()方法上 提供ExecutorService,以便使用此线程池。下面是一个例子,其中提供了比通常分配的更大的线程池:

 ExecutorService es = Executors.newFixedThreadPool(20);

  Connection conn = factory.newConnection(es);

无论执行人及的ExecutorService类中的java.util.concurrent包。

当连接关闭时,默认的ExecutorService 将被shutdown(),但用户提供的 ExecutorService(如上面的es) 不会被shutdown()。提供定制ExecutorService的客户端必须确保它最终关闭(通过调用其shutdown() 方法),否则池的线程可能会阻止JVM终止。

同一个执行者服务可以在多个连接之间共享,或者在重新连接时被重复使用,但是在关闭后它不能使用()。

如果有证据表明消费者 回调处理中存在严重瓶颈,则应仅考虑使用此功能。如果没有消费者回调执行,或者很少,默认分配绰绰有余。开销最小,并且分配的总线程资源是有界的,即使偶尔会出现一连串的消费者活动。

使用主机列表

可以将Address数组传递给newConnection()。的地址是简单地在一个方便的类com.rabbitmq.client包与主机 和端口组件。例如:

 Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1), new Address(hostname2, portnumber2)};

  Connection conn = factory.newConnection(addrArr);

将尝试连接到hostname1:portnumber1,并且如果无法连接到hostname2:portnumber2。返回的连接是数组中的第一个成功(不抛出 IOException)。这完全等同于重复设置工厂的主机和端口,每次都调用factory.newConnection(),直到其中一个成功。

如果还提供了ExecutorService(使用表单factory.newConnection(es,addrArr)),则线程池将与(第一个)成功连接相关联。

如果您想要更多地控制主机连接到,请参阅 对服务发现的支持

使用AddressResolver接口进行服务发现

从版本3.6.6开始,可以让AddressResolver的实现 在创建连接时选择连接的位置:

  Connection conn = factory.newConnection(addressResolver);

该AddressResolver接口是这样的:

  public interface AddressResolver {

    List<Address> getAddresses() throws IOException;

  }

就像主机列表一样,返回的第一个地址将首先尝试,然后第二个地址返回,如果客户端无法连接到第一个地址,依此类推。

如果还提供了ExecutorService(使用表单factory.newConnection(es,addressResolver)),则线程池将与(第一个)成功连接相关联。

该AddressResolver是实现定制服务发现逻辑,这是一个动态的基础设施特别有用的理想场所。结合自动恢复功能,客户端可以自动连接到第一次启动时尚未达到的节点。亲和性和负载平衡是其中可以使用自定义AddressResolver的其他场景。

Java客户端随附以下实现(有关详细信息,请参阅javadoc):

1. DnsRecordIpAddressResolver:给定主机的名称,返回其IP地址(针对平台DNS服务器的分辨率)。这对于简单的基于DNS的负载平衡或故障转移很有用。

2. DnsSrvRecordAddressResolver:给定服务的名称,返回主机名/端口对。搜索被实现为DNS SRV请求。当使用像HashiCorp Consul这样的服务注册表时,这可能很有用 。

心跳超时

有关检测信号以及如何在Java客户端中配置它们的更多信息,请参阅Heartbeats指南

自定义线程工厂

诸如Google App Engine(GAE)等环境可以限制直接线程实例化。要在这样的环境中使用RabbitMQ Java客户端,有必要配置一个自定义的ThreadFactory,它使用适当的方法来实例化线程,例如GAE的ThreadManager。以下是Google App Engine的一个示例。

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();

cf.setThreadFactory(ThreadManager.backgroundThreadFactory();

支持Java非阻塞IO

Java客户端4.0版为Java非阻塞IO(又名Java NIO)带来实验性支持。NIO不一定比堵塞IO更快,它只是允许更容易地控制资源(在这种情况下,线程)。

在默认的阻塞IO模式下,每个连接使用一个线程从网络套接字读取。使用NIO模式,您可以控制从网络套接字读写的线程数。

如果Java进程使用许多连接(数十或数百),请使用NIO模式。您应该使用比使用默认阻止模式更少的线程。通过设置适当的线程数量,您不应该尝试降低性能,特别是在连接不太忙时。

NIO必须明确启用:

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.useNio();

NIO模式可以通过NioParams类来配置:

connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO模式使用合理的默认值,但您可能需要根据您自己的工作负载进行更改。其中一些设置是:使用的IO线程总数,缓冲区大小,用于IO循环的服务执行程序,内存写入队列的参数(写请求在网络上发送之前已排队)。请阅读Javadoc了解详情和默认值。

从网络故障中自动恢复

连接恢复

客户端和RabbitMQ节点之间的网络连接可能会失败。RabbitMQ Java客户端支持连接和拓扑(队列,交换,绑定和使用者)的自动恢复。许多应用程序的自动恢复过程遵循以下步骤:

1. 重新连接

2. 还原连接侦听器

3. 重新开放频道

4. 还原通道侦听器

5. 恢复频道basic.qos设置,发行商确认和交易设置

拓扑恢复包括为每个通道执行的以下操作

1. 重新申报交换机(除了预定义的)

2. 重新申报队列

3. 恢复所有绑定

4. 恢复所有消费者

从Java客户端的4.0.0版开始,默认情况下启用自动恢复(因此也是拓扑恢复)。

拓扑恢复依赖于实体(队列,交换,绑定,使用者)的每个连接缓存。当连接声明一个队列时,它将被添加到缓存中。当它被删除或计划删除(例如,因为它被自动删除)它将被删除。这个模型有一些局限在下面。

要禁用或启用自动连接恢复,请使用factory.setAutomaticRecoveryEnabled(boolean) 方法。以下片段显示了如何显式启用自动恢复(例如,对于Java 4.0.0之前的客户端):

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

factory.setAutomaticRecoveryEnabled(true);

//连接会自动恢复 

Connection conn = factory.newConnection();

如果由于异常导致恢复失败(例如,RabbitMQ节点仍然无法访问),它将在固定时间间隔后重试(默认为5秒)。间隔可以配置:

ConnectionFactory factory = new ConnectionFactory();

//每10秒尝试恢复一次 

factory.setNetworkRecoveryInterval(10000);

当提供地址列表时,列表会被混淆,并且所有地址都会在下一个地址之后被尝试:

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};

factory.newConnection(addresses);

何时会触发连接恢复?

自动连接恢复(如果启用)将由以下事件触发:

· I / O异常在连接的I / O循环中抛出

· 套接字读取操作超时

· 检测到错过的服务器心跳(超时)

· 连接的I / O循环中会引发任何其他意外的异常

以先发生者为准。

通道级别的异常不会触发任何形式的恢复,因为它们通常表示应用程序中存在语义问题(例如尝试从不存在的队列中使用)。

恢复监听器

可以在可恢复的连接和通道上注册一个或多个恢复监听器。当启用连接恢复时,由ConnectionFactory#newConnection和Connection#createChannel返回的 连接将 实现com.rabbitmq.client.Recoverable接口,提供两个具有相当描述性名称的方法:

· addRecoveryListener

· removeRecoveryListener

请注意,您目前需要将连接和频道投射到Recoverable 才能使用这些方法。

对发布的影响

连接断开时 使用Channel.basicPublish发布的消息将丢失。在连接恢复后,客户端不会将它们排队等待传递。为了确保发布的消息到达RabbitMQ应用程序需要使用Publisher确认 并考虑连接失败。

拓扑恢复

拓扑恢复涉及恢复交换,队列,绑定和消费者。当启用自动恢复功能时,它默认启用。因此,从Java客户端4.0.0开始,默认启用拓扑恢复。

如果需要,可以显式禁用拓扑恢复:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();

 //启用自动恢复(例如,先前的Java客户端4.0.0) 

factory.setAutomaticRecoveryEnabled(true);

//禁用拓扑恢复 

factory.setTopologyRecoveryEnabled(false);

故障检测和恢复限制

自动连接恢复具有许多应用程序开发人员需要注意的局限性和故意设计决策。

拓扑恢复依赖于实体(队列,交换,绑定,使用者)的每个连接缓存。当连接声明一个队列时,它将被添加到缓存中。当它被删除或计划删除(例如,因为它被自动删除)它将被删除。这使得可以在不出现意外结果的情况下在不同频道上声明和删除实体。这也意味着使用自动连接恢复的连接上的所有通道上的消费者标记(通道专用标识符)必须是唯一的。

当连接中断或丢失时,需要时间来检测。因此,库和应用程序都不知道有效的连接失败。在这个时间段内发布的任何消息都会像往常一样序列化并写入TCP套接字。他们只能通过发布商确认向代理商交付:通过AMQP 0-9-1进行发布完全是异步设计。

当启用了自动恢复功能的连接检测到套接字或I / O操作错误时,缺省情况下会在可配置延迟5秒后进行恢复。这种设计假定即使大量的网络故障是短暂的并且通常很短暂,但它们不会立即消失。连接恢复尝试将以相同的时间间隔继续,直到成功打开新连接。

当连接处于恢复状态时,任何在其频道上尝试发布的内容都将被拒绝,并有异常。客户端当前不执行此类传出消息的任何内部缓冲。应用程序开发者有责任跟踪这些消息并在恢复成功时重新发布它们。 发布商确认是一种协议扩展,应该由发布商不能承受消息丢失的情况下使用。

由于通道级别的异常导致通道关闭时,连接恢复不会启动。这种例外通常表示应用程序级别的问题。目前(library)无法就此情况做出明智的决定。

即使在连接恢复启动后,闭合通道也不会恢复。这包括明确关闭的通道和上面的通道级异常情况。

手动确认和自动恢复

当使用手动确认时,在消息传递和确认之间,到RabbitMQ节点的网络连接可能会失败。连接恢复后,RabbitMQ将重置所有通道上的交付标签。这意味着basic.ackbasic.nackbasic.reject 与旧的交付标签将导致通道异常。为了避免这种情况,RabbitMQ Java客户端跟踪并更新交付标签,使它们在恢复之间单调增长。 Channel.basicAck, Channel.basicNack和 Channel.basicReject然后将调整后的交付标签转换为RabbitMQ使用的标签。带有陈旧交付标签的确认将不会发送。使用手动确认和自动恢复的应用程序必须能够处理重新投递。

渠道Channels生命周期和拓扑恢复

对于应用程序开发人员来说,自动连接恢复应该尽可能透明,这就是为什么Channel实例保持不变,即使多个连接失败并在幕后恢复。从技术上讲,当自动恢复打开时,Channel实例充当代理或装饰器:他们将AMQP业务委托给实际的AMQP通道实现,并在其周围实施一些恢复机制。这就是为什么当它创建了一些资源(队列,交换,绑定)之后不应该关闭通道,或者这些资源的拓扑恢复稍后会失败,因为通道已关闭。相反,应该在应用程序的生命周期中创建通道。

未处理的异常

与连接,通道,恢复和消费者生命周期相关的未处理异常委派给异常处理程序。异常处理程序是实现ExceptionHandler接口的任何对象 。默认情况下,使用DefaultExceptionHandler的一个实例。它将异常详细信息打印到标准输出。

可以使用ConnectionFactory#setExceptionHandler覆盖处理程序 。它将用于工厂创建的所有连接:

ConnectionFactory factory = new ConnectionFactory();

cf.setExceptionHandler(customHandler);

异常处理程序应该用于异常记录。

Metrics 性能监控Metrics and monitoring

从版本4.0.0开始,客户端收集运行时指标(例如已发布消息的数量)。度量标准集合是可选的,并使用setMetricsCollector(metricsCollector)方法在ConnectionFactory级别进行设置 。此方法需要一个MetricsCollector实例,该实例在客户端代码的多个位置中调用。

客户端支持 Micrometer (截至版本4.3)和 Dropwizard Metrics开箱即用。

以下是收集的指标:

· 打开的连接数

· 开放频道的数量

· 已发布消息的数量

· 消费的消息数量

· 已确认消息的数量

· 被拒绝的信息数量

Micrometer和Dropwizard指标都提供计数,但也包括平均速率,最后五分钟速率等与消息相关的指标。他们还支持常见的监控和报告工具(JMX,Graphite,Ganglia,Datadog等)。有关更多详细信息,请参阅下面的专用章

请注意关于metrics collection的以下内容:

· 在使用Micrometer或Dropwizard指标时,不要忘记将适当的依赖关系(以Maven,Gradle或甚至JAR文件的形式)添加到JVM类路径。这些是可选的依赖关系,不会随Java客户端自动拖动。您可能还需要添加其他依赖项,具体取决于所使用的报告后端。

· metrics collection是可扩展的。鼓励为特定需求实施自定义 MetricsCollector。

· 所述MetricsCollector设置在ConnectionFactory,但可以在不同的实例共享。

· metrics collection不支持事务。例如,如果在事务中发送确认并且事务被回滚,则确认在客户metrics中被计数(显然不是broker实体)。请注意,确认实际上发送给代理,然后通过事务回滚取消,因此客户端指标在发送确认方面是正确的。总而言之,不要将客户端指标用于关键业务逻辑,它们不能保证完全准确。它们旨在简化关于正在运行的系统的推理并使操作更高效。

Micrometer 支持

您可以通过以下方式使用Micrometer 启用metrics collection :

ConnectionFactory connectionFactory = new ConnectionFactory();

MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();

connectionFactory.setMetricsCollector(metrics);

...

metrics.getPublishedMessages();//获得Micrometer的Counter对象

Micrometer支持 多种报告后端:Netflix Atlas,Prometheus,Datadog,Influx,JMX等。

您通常会将MeterRegistry的一个实例传递 给MicrometerMetricsCollector。这里是JMX的一个例子:

JmxMeterRegistry registry = new JmxMeterRegistry();

MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setMetricsCollector(metrics);

Dropwizard指标Metrics支持

您可以通过以下方式使用Dropwizard启用metrics collection :

ConnectionFactory connectionFactory = new ConnectionFactory();

StandardMetricsCollector metrics = new StandardMetricsCollector();

connectionFactory.setMetricsCollector(metrics);

...

metrics.getPublishedMessages();//获得Metrics的Meter对象

Dropwizard指标支持 多种报告后端:控制台,JMX,HTTP,Graphite,Ganglia等。

您通常会将MetricsRegistry的实例传递 给StandardMetricsCollector。这里是JMX的一个例子:

MetricRegistry registry = new MetricRegistry();

StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

 

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setMetricsCollector(metrics);

 

JmxReporter reporter = JmxReporter

  .forRegistry(registry)

  .inDomain("com.rabbitmq.client.jmx")

  .build();

reporter.start();

          

Google App Engine上的RabbitMQ Java客户端

在Google App Engine上使用RabbitMQ Java客户端(GAE)需要使用自定义线程工厂,使用GAE的ThreadManager实例化线程(请参阅上文)。此外,有必要设置一个低心跳间隔(4-5秒),以避免运行到低的InputStream上GAE读超时:

ConnectionFactory factory = new ConnectionFactory();

cf.setRequestedHeartbeat(5);

        

警告和限制

为了使拓扑恢复成为可能,RabbitMQ Java客户端维护已声明的队列,交换和绑定的缓存。缓存是按连接的。某些RabbitMQ功能使客户无法观察一些拓扑变化,例如,当由于TTL而删除队列时。RabbitMQ Java客户端尝试在最常见的情况下使缓存条目无效:

· 当队列被删除时。

· 交换被删除时。

· 当绑定被删除。

· 消费者在自动删除的队列上取消时。

· 当队列或交换机从自动删除的交易所解除锁定时。

但是,除了单个连接之外,客户端无法跟踪这些拓扑变化。依赖自动删除队列或交换机以及队列TTL(注意:不是消息TTL!)并使用自动连接恢复的应用程序应显式删除已知未使用或已删除的实体,以清除客户端拓扑高速缓存。这是通过促进通道#queueDelete, 通道#exchangeDelete,通道#queueUnbind和通道#exchangeUnbind 是幂等在RabbitMQ的3.3.x(删除的内容不是有不导致异常)。

RPC(请求/回复)模式:一个例子

为了方便编程,Java客户端API提供了一个类RpcClient,它使用临时答复队列通过AMQP 0-9-1 提供简单的RPC式通信工具。

该类不会对RPC参数和返回值施加任何特定的格式。它只是提供了一种机制,使用特定的路由密钥向给定的交换机发送消息,并等待回复队列上的响应。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(这个类如何使用AMQP 0-9-1的实现细节如下:请求消息是在 basic.correlation_id字段被设置为这个RpcClient实例唯一的值的情况下发送的,并且basic.reply_to被设置为回复队列。)

一旦创建了此类的实例,就可以使用它通过使用以下任何方法发送RPC请求:

byte[] primitiveCall(byte[] message);

String stringCall(String message)

Map mapCall(Map message)

Map mapCall(Object[] keyValuePairs)

该primitiveCall方法传送原始字节数组作为请求和响应机构。方法stringCall是一个简单的primitiveCall简便包装器,它将消息体作为默认字符编码中的String实例处理。

该mapCall变种是有点更复杂的:它们编码java.util.Map包含普通的Java值到AMQP 0-9-1二进制表表示,和解码以同样的方式回应。(请注意,这里可以使用哪些值类型有一些限制 - 请参阅javadoc了解详细信息。)

所有的marshalling/unmarshalling便利方法使用primitiveCall作为传输机制,并在其上提供一个包装层。

TLS支持

可以使用TLS加密客户端与代理之间的通信 。客户端和服务器认证(又名同行认证)也被支持。以下是对Java客户端使用加密的最简单方法:

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

factory.setPort(5671);

factory.useSslProtocol();

      

请注意,客户端并未强制执行上述示例中的任何服务器身份验证(对等证书链验证)作为缺省值,使用TrustManager的 “信任所有证书” 。这对本地开发很方便,但容易发生中间人攻击,因此不推荐用于生产。要了解更多关于RabbitMQ中TLS支持的信息,请参阅TLS指南。如果您只想配置Java客户端(尤其是对等验证和信任管理器部分),请阅读TLS指南的相应部分

 

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部