文档章节

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

mac_zhao
 mac_zhao
发布于 2014/09/26 19:07
字数 621
阅读 767
收藏 6

行业解决方案、产品招募中!想赚钱就来传!>>>

1、AMQP_EX_TYPE_DIRECT:直连型

直连型又包括: 1对1 和1对N(N对1、 N对N)
\
接收端receive.php代码如下
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<!--?php
 
$connect =newAMQPConnection();
$connect--->connect();
 
$channel =newAMQPChannel($connect);
 
$exchange =newAMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
 
$queue =newAMQPQueue($channel);
$queue->setName('logs');
$queue->declare();
 
$queue->bind('exchange','logs');
 
while(true) {
    $queue->consume('callback');
}
 
$connection->close();
 
function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}

发送端send.php代码如下
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!--?php
 
$connect =newAMQPConnection();
$connect--->connect();
 
$channel =newAMQPChannel($connect);
 
$exchange =newAMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
 
$exchange->publish('direct type test','logs');
var_dump("Send Message OK");
 
$connect->disconnect();

运行结果如图所示
\



\ 创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看 receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<!--?php
 
$connect =newAMQPConnection();
$connect--->connect();
 
$channel =newAMQPChannel($connect);
 
$exchange =newAMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
 
$queue =newAMQPQueue($channel);
$queue->setName('logs');
@$queue->declare();
 
$queue->bind('exchange','logs');
 
while(true) {
    $queue->consume('callback');
}
 
$connection->close();
 
function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}




send.php
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!--?php
 
$connect =newAMQPConnection();
$connect--->connect();
 
$channel =newAMQPChannel($connect);
 
$exchange =newAMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
 
for($index =1; $index <5; $index++) {
    $exchange->publish($index,'logs');
    var_dump("Send:$index");
}
 
$exchange->delete();
$connect->disconnect();

运行结果如下 \

列队会把消息分配给每一个接收端分配处理这里看似完美但是如果想要更好的处理不同的任务就需要 公平调度
比如当1、3处理的都是简单的人 2、4都是处理的复杂的任务 如果任务过多时 receive_one.php是空闲的而receive_two.php是任务繁重的 我们进行如下测试 send.php改成5改成50
?
1
2
3
4
for($index =1; $index <50; $index++) {
    $exchange->publish($index,'logs');
    var_dump("Send:$index");
}

receive_two.php 加上 sleep(3)
?
1
2
3
4
5
function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    sleep(3);
    $queue->nack($envelope->getDeliveryTag());
}

我们运行程序结果如下
\

receive_one全部运行完而receive_two才运行一个 之后receive_one一直空闲 我们可以通过 在接收端设置 $channel->setPrefetchCount(1);
任务没人完成前不接收新的消息把消息发送给其他接收端
如下receive_one.php 和 receive_two.php
?
1
$channel =newAMQPChannel($connect);
改成如下
?
1
2
$channel =newAMQPChannel($connect);
$channel->setPrefetchCount(1);
mac_zhao
粉丝 41
博文 249
码字总数 304671
作品 0
普陀
程序员
私信 提问
加载中
请先登录后再评论。
Netty那点事(三)Channel与Pipeline

Channel是理解和使用Netty的核心。Channel的涉及内容较多,这里我使用由浅入深的介绍方法。在这篇文章中,我们主要介绍Channel部分中Pipeline实现机制。为了避免枯燥,借用一下《盗梦空间》的...

黄亿华
2013/11/24
2W
22
opm-server-mirror

代码更新 2009-11-25: 加入反爬虫功能。直接Web访问服务器将跳转到Google。 使用方法 下载index.zip 解压index.zip得到index.php 将index.php传到支持php和cURL的国外服务器上 打开 http:/...

luosheng86
2013/01/29
1K
0
tiny php template--TPT

关于TPT TPT是php实现的用于模板解析小工具,全部实现仅仅60行代码。 配置 DIRCOMPILED和DIRTEMPLATE,分别表示模版编译目录和模版文件目录: define('DIRCOMPILED','/compileddiy');define(......

红猪-侠
2013/03/03
1K
1
PHP web 服务器--YACS

YACS 是一个强大的 PHP 脚本,可以让你维护一个动态的 Web 服务器。 特性: - Runs on your own server, or on a shared web site - Post articles with web forms, by e-mail, or remotely ......

匿名
2013/03/18
857
0
硬实时操作系统--Raw OS

Raw-OS 起飞于2012年,Raw-OS志在制作中国人自己的最优秀硬实时操作系统。 Raw-OS 操作系统特性 内核最大关中断时间无限接近0us, s3c2440系统最大关中断时间实测0.8us。 支持idle任务级别的事...

jorya_txj
2013/03/19
6.2K
1

没有更多内容

加载失败,请刷新页面

加载更多

Azure Application Gateway(一)对后端 Web App 进行负载均衡

一,引言   今天,我们学习一个新的知识点-----Azure Application Gateway,通过Azure 应用程序网关为我么后端的服务提供负载均衡的功能。我们再文章头中大概先了解一下什么是应用程序网关...

osc_lc4icfkt
58分钟前
10
0
WoLai(我来) 注册码 ——国内版 notion 【笔记】

注册码: SQGYG23 ❤ W4T9PKP JLTHNJP KMTXK7P JDHKJEM KRJXX5P 6M7PPAP DEGLMG3 N3BZMRI 87BR22I GSIWGWP GNGBNTI QA8URIM UDUV9VM IHKJA7P MD9ZA3M 3XR67ZI TBUP9JX TI4TYMM 注册完了可以把......

osc_c05lkk3u
59分钟前
21
0
2020hdu多校第二场比赛及补题

这一场我们队只A了一题 1010 Lead of Wisdom 直接爆搜,但是T了好几发,剪了下枝 如果一个物品的a,b,c,d都比不上另外一个同种物品的a,b,c,d,那这个物品就可以直接淘汰掉了 #include<iostrea...

osc_usgpahnw
今天
21
0
为什么Java有瞬态字段? - Why does Java have transient fields?

问题: 为什么Java有瞬态字段? 解决方案: 参考一: https://stackoom.com/question/3opS/为什么Java有瞬态字段 参考二: https://oldbug.net/q/3opS/Why-does-Java-have-transient-fields...

富含淀粉
今天
16
0
轻松搭建CAS 5.x系列(6)-在CAS Server上增加OAuth2.0协议

概述说明 CAS Server默认搭建出来,客户端程序只能按照CAS自身的协议接入。CAS的强大在于,有官方的插件,可以支持其他的协议。本章节就让CAS Server怎么增加OAuth2.0的登录协议。 安装步骤 ...

osc_inj0cicw
今天
24
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部