文档章节

RabbitMQ+PHP 教程三(Publish/Subscribe)用yii2测试通过

hansonwong
 hansonwong
发布于 11/16 10:40
字数 1975
阅读 3
收藏 0

介绍

在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都交付给一个工作人员处理。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者发送消息。此模式称为“发布/订阅”。

为了说明这个模式,我们将构建一个简单的日志系统。它将由两个程序组成,第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,接收程序的每个运行副本都会收到消息。这样我们就可以运行一个接收器,并将日志引导到磁盘;同时,我们还可以运行另一个接收器,并在屏幕上看到日志。

本质上,已发布的日志消息将被广播到所有接收器。

交换机(Exchanges)

在本教程的前几部分中,我们从队列中发送和接收消息。现在是在Rabbit中引入完整消息传递模型的时候了。

让我们快速浏览一下前面教程中介绍的内容:

  1. 生产者是发送消息的用户应用程序。
  2. 队列是存储消息的缓冲区。
  3. 消费者是接收消息的用户应用程序。

RabbitMQ消息传递模型的核心思想是,生产者不发送任何信息直接到队列。事实上,生产者甚至不知道消息是否会发送到任何队列。

相反,生产商只能向交换机(Exchange)发送消息。交换机做的事情很简单。一方面,它接收来自生产者的信息,另一边则推他们排队。Exchange必须知道如何处理接收到的消息。应该附加到特定队列吗?它应该被添加到多个队列?还是应该被抛弃?。这个规则是由交换类型定义的。

有几种交换类型可用:direct, topic, headers 和 fanout。我们将集中讨论最后一个——fanout。让我们创建这种类型的交换,并称之为日志:

$channel->exchange_declare('logs', 'fanout', false, false, false);

fanout交换非常简单。正如你可能从这个名字猜到的,它只广播它收到的所有消息给它所知道的所有队列。这正是我们需要的记录器。

Listing exchanges

列出服务器上的交换机,你可以运行rabbitmqctl:

sudo rabbitmqctl list_exchanges

在这个列表中会有一些amq. *交流和默认(未命名)交换。默认情况下创建这些>,但目前不太可能使用它们。

默认的交换机

在本教程的前几部分中,我们对交换机一无所知,但仍然能够将消息发送到队列中。这是可能的,因为我们使用的是默认的交换,我们通过空字符串(“”)来标识它们。

回想一下我们之前如何发布消息:

$channel->basic_publish($msg, '', 'hello');

我们在这里使用默认的或无名的交换:消息路由到指定的routing_key名称的队列,如果它存在的话。路由键是第三个参数:basic_publish

现在,我们可以将其发布到我们命名的Exchange中:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

临时队列(Temporary queues)

也许你还记得以前我们使用的队列所指定的名称(记得hellotask_queue?). 能够说出一个队列对我们来说至关重要 -- 我们需要把工人指向同一个队列。当你想在生产者和消费者之间共享一个队列时,给队列一个名字是很重要的。

但我们的记录器不是这样的。我们想了解所有日志消息,而不仅仅是其中的一个子集。我们也只对当前流动的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要两件事。

首先,每当我们与Rabbit连接时,我们需要一个新的空队列。为此,我们可以创建一个带有随机名称的队列,或者更好 - 让服务器为我们选择一个随机队列名。

第二,一旦断开消费者,队列应该自动删除。

在php客户端中,当我们将队列名称作为空字符串提供时,我们创建一个带有生成名称的非持久队列:

list($queue_name, ,) = $channel->queue_declare("");

方法返回时,queue_name变量包含一个随机生成的RabbitMQ队列名称。例如,它可能看起来像amq.gen-jzty20brgko-hjmujj0wlg

当声明它关闭的连接时,队列将被删除,因为它被声明为独占。

绑定(Bindings)

我们已经创建了fanout交换机和队列。现在我们需要告诉Exchange发送消息到我们的队列中。交换和队列之间的关系称为绑定。

$channel->queue_bind($queue_name, 'logs');

从现在开始,日志交换将向队列添加消息。

列出绑定列表(Listing bindings)

您可以使用现有的绑定列表,使用下面命令:

rabbitmqctl list_bindings

让我们把所有整理在一起(Putting it all together)

生成日志消息的生成程序与前面的教程没有多大区别。最重要的变化是,我们现在希望把消息发布到我们的日志交换,而不是无名的。这里给出emit_log.php代码:

<?php
/**
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 */

namespace app\commands;

use yii\console\Controller;
use yii\console\ExitCode;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * This command echoes the first argument that you have entered.
 *
 * This command is provided as an example for you to learn how to create console commands.
 *
 * @author Qiang Xue <qiang.xue@gmail.com>
 * @since 2.0
 */
class EmitLogController extends Controller
{

    public function actionIndex(array $argv)
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin');
        $channel = $connection->channel();

        $channel->exchange_declare('logs', 'fanout', false, false, false);

        $data = implode(' ', array_slice($argv, 0));
        if (empty($data)) {
            $data = "info: Hello World!";
        }

        $msg = new AMQPMessage($data);
        $channel->basic_publish($msg, 'logs');

        echo ' [x] Sent ', $data, "\n";


        $channel->close();
        $connection->close();
    }
}

emit_log.php源码

如您所见,在建立连接之后,我们声明交换。这一步是必要的,因为发布到一个不存在的交换机是禁止的。

如果没有队列绑定到Exchange,消息将丢失,但这对我们来说是好的;如果没有用户正在监听,我们可以安全地丢弃消息。

receive_logs.php代码:

<?php
/**
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 */

namespace app\commands;

use yii\console\Controller;
use yii\console\ExitCode;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * This command echoes the first argument that you have entered.
 *
 * This command is provided as an example for you to learn how to create console commands.
 *
 * @author Qiang Xue <qiang.xue@gmail.com>
 * @since 2.0
 */
class ReceiveLogsController extends Controller
{

    public function actionIndex()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin');
        $channel = $connection->channel();

        $channel->exchange_declare('logs', 'fanout', false, false, false);
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

        $channel->queue_bind($queue_name, 'logs');

        echo " [*] Waiting for logs. To exit press CTRL+C\n";

        $callback = function ($msg) {
            echo ' [x] ', $msg->body, "\n";
        };

        $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

receive_logs.php

如果要将日志保存到文件中,只需打开控制台并键入:

php receive_logs.php > logs_from_rabbit.log

如果您希望看到屏幕上的日志,生成一个新的终端并运行:

php receive_logs.php

当然,然后触发日志类型:

php emit_log.php

使用rabbitmqctl list_bindings可以验证代码实际上是创建绑定和队列是我们想要的。两receive_logs.php程序运行你应该看到:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:来自Exchange日志的数据使用服务器分配的名称到两个队列中。这正是我们想要的。

 

翻译来自 RabbitMQ - RabbitMQ tutorial - Publish/Subscribe

本文转载自:https://segmentfault.com/a/1190000013285229

共有 人打赏支持
hansonwong
粉丝 4
博文 119
码字总数 15349
作品 0
广州
私信 提问
immusen/yii2-swoole-mqtt

MQTT For Yii2 Base On Swoole 4 MQTT server for Yii2 base on swoole 4, Resolve topic as a route reflect into controller/action/param, And support redis pub/sub to trigger async t......

immusen
12/07
0
0
RabbitMQ+PHP 教程四(Routing)用yii2测试通过

开始 在本教程中,我们将为它添加一个特性——我们将只可能订阅消息的一个子集。例如,我们只能够将关键错误消息直接指向日志文件(以节省磁盘空间),同时仍然能够打印控制台上的所有日志消...

hansonwong
11/16
0
0
Yii 引入JS,css問題。 (轉發)

在布局中引用通用到js,或者css: <?php Yii::app()->clientScript->registerCoreScript('jquery');?> //注意这个将会插到<title></title>标签上..所以title标签要放在head文档顶部防止.jque......

resory
2012/10/11
0
0
yii2 window composer 安装

最近在学习PHP,着手找一个能快速上手的框架来学习。一开始看兄弟连视频时候讲师推荐ThinkPHP。于是我选择了ThinkPHP来尝试,这个框架的上手难度系数不大,能快速开发一款应用。适合小型的企...

Gjanuary
2017/06/06
0
0
这些最热门的 PHP 框架,哪一款是你的菜?

原文出处:opensource 译文出处:开源中国—两味真火 PHP 是世界上最流行的编程语言之一,广泛用于主要的项目中。例如,Facebook 就是利用 PHP 来创建和维护他们的内部系统;WordPress 内部基...

opensource
2016/12/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

错误: 找不到或无法加载主类

在IDEA的使用过程中,经常断掉服务或者重启服务,最近断掉服务重启时突然遇到了一个启动报错: 错误:找不到或无法加载主类 猜测:1,未能成功编译; 尝试:菜单---》Build---》Rebuild Pro...

安小乐
17分钟前
1
0
vue路由传参,刷新页面,引发的bug

最近遇到一个bug 通过vue路由跳转到页面, 然后接参控制(v-if ),成功显示 而刷新页面,显示失败。 苦苦地找了半天原因,打印参数发现正确,再打印下类型......,路由跳过来会保持传参时的...

hanbb
18分钟前
0
0
【58沈剑 架构师之路】InnoDB,select为啥会阻塞insert?

MySQL的InnoDB的细粒度行锁,是它最吸引人的特性之一。 但是,如《InnoDB,5项最佳实践》所述,如果查询没有命中索引,也将退化为表锁。 InnoDB的细粒度锁,是实现在索引记录上的。 一,Inn...

张锦飞
21分钟前
0
0
冒泡,选择和插入排序比较

/** * 冒泡排序,两层嵌套循环,内层局部比较后,找出最大或者最小数据并交换数据,使其局部有序,外层用于比较剩余元素,相较于选择排序,选择排序相当于是冒泡的一个优化版本(减少了交换...

strict_nerd
22分钟前
0
0
html内联(行内)元素、块级(块状)元素和行内块元素分类

HTML可以将元素分类方式分为内联(行内)元素、块级(块状)元素和行内块元素三种。 注:HTML是标签语言,那么既然是标签,就可以自己定义一些自己元素(如<wode>自定义的元素</wode>等),自...

NB-One
28分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部