RabbitMQ-Hello World(一)

原创
2017/04/12 14:33
阅读数 104

介绍

先决条件

本教程假定RabbitMQ已在标准端口(5672)上的localhost上安装并运行。如果使用不同的主机,端口或凭据,连接设置将需要调整。 

在哪里得到帮助

如果您无法通过本教程,您可以 通过邮件列表与我们联系。

RabbitMQ是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确信Postman先生最终会将邮件发送给收件人。在这个比喻中,RabbitMQ是一个邮箱,邮局和邮递员。

RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块的消息

RabbitMQ和消息传递一般使用一些术语。

  • 生产 即消息发送。发送消息的程序是一个生产者

  • 队列 是存在于RabbitMQ中的邮箱的名称。虽然消息流过RabbitMQ和您的应用程序,但它们只能存储在队列中队列 仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者都可以发送消息到一个队列,许多消费者都可以尝试从一个队列消费数据。这是我们如何表示一个队列:

  • 消费 具有与接收相似的含义。一个消费者是一个程序,主要是等待接收信息:

请注意,生产者,消费者和消息服务器不必驻留在同一个主机上; 确实在大多数应用程序中,它们不是。

“Hello,World”

在本教程的这一部分,我们将用Java编写两个程序; 发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将介绍Java API中的一些细节,专注于这个非常简单的事情,只需要开始。这是一个“Hello World”的消息传递。

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是队列 - 消费者的消息缓冲区。

(P) - > [|||]  - >(C)

Java客户端库

RabbitMQ支持多种协议。本教程使用AMQP 0-9-1,它是一种开放的通用协议,用于消息传递。有许多不同语言的 RabbitMQ客户端。我们将使用RabbitMQ提供的Java客户端。

下载客户端库 及其依赖项(SLF4J API和 SLF4J Simple)。沿着教程Java文件将这些文件复制到工作目录中。

请注意SLF4J Simple就够用了,但你应该在生产环境使用一个全面的日志库比如logback

(RabbitMQ Java客户端也位于Maven中央仓库中,其中包含groupId为com.rabbitmq和artifactId为amqp-client。)

现在我们有Java客户端及其依赖关系,我们可以编写一些代码。

发送

(P) - > [|||]

我们会调用我们的消息发布者(发件人)Send和我们的消息消费者(接收者) Recv。发布者将连接到RabbitMQ,发送单个消息,然后退出。

在 Send.java中,我们需要一些导入的类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置类并命名队列:

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

然后我们创建一个到服务器的连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

连接为套接字连接,并对我们提供协议版本协商和认证等。下面我们连接本地服务器-本地主机localhost。如果我们想连接到不同机器上的服务器,我们可以在此处指定其名称或IP地址。

接下来,我们创建一个通道,这是大部分用于完成任务的API所在的地方。

要发送,我们必须声明一个队列供我们发送消息; 那么我们可以将消息发布到队列中:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

声明队列是幂等的 - 只有当它不存在时才会被创建。消息内容是一个字节数组,所以你可以编码你喜欢的任何东西。

最后,我们关闭通道和连接;

channel.close();
connection.close();

这是整个Send.java类

发送不起作用

如果这是您第一次使用RabbitMQ,并且没有看到“已发送”消息,那么您可能会摸不着头脑到底哪里是错误的。也许服务器没有足够的可用磁盘空间启动(默认情况下至少需要200 MB),因此拒绝接受消息。检查服务器日志文件以确认并降低这个限制(如有必要)。该配置文件文档会告诉你如何设置disk_free_limit。

接收

对应发布者,RabbitMQ推送消息到我们的消费者,所以不同于发布单个消息的发布者,我们将持续监听消息并打印出来。

[|||]  - >(C)

代码(在Recv.java中)具有与发送几乎相同的包导入:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

额外的DefaultConsumer是一个Consumer 接口的实现类,用于缓冲由服务器推送给我们的消息。

设置与发布商相同; 我们打开一个连接和一个通道,并声明我们要消费的队列。注意必须匹配发送到发送者的队列。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

请注意,我们也在这里声明队列。因为我们可能会在发布者之前启动消费者,所以我们要确保队列存在,然后再尝试从中消费消息。

我们即将告诉服务器将队列中的消息传递给我们。由于它会异步地推送我们的消息,所以我们提供一个对象形式的回调,该对象将缓冲消息,直到我们准备好使用它们。这是一个DefaultConsumer子类。

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

这是整个Recv.java类

把它们放在一起

您可以在类路径上只使用RabbitMQ java客户端来编译这两个:

javac -cp amqp-client-4.0.2.jar Send.java Recv.java

要运行它们,您需要使用rabbitmq-client.jar及其对类路径的依赖。在终端中,运行消费者(接收者):

java -cp。:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv

然后运行发布者(发件人):

java -cp。:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send

在Windows上,使用分号而不是冒号分隔类路径中的项目。

消费者将通过RabbitMQ打印从发布商获得的消息。消费者将继续运行,等待消息(使用Ctrl-C停止它),所以尝试从另一个终端运行发布者。

列出队列

您可能希望看到RabbitMQ有什么队列和其中有多少个消息。您可以使用rabbitmqctl工具(作为特权用户)

 

sudo rabbitmqctl list_queues

 

在Windows上,省略sudo:

rabbitmqctl.bat list_queues

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部