文档章节

基于ThinkPHP使用Beanstalk的使用 入门篇(一)

胡椒大叔
 胡椒大叔
发布于 06/06 11:45
字数 2478
阅读 148
收藏 3

Beanstalk,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。

高性能离不开异步,异步离不开队列,而其内部都是Producer-Consumer模式的原理。

Beanstalkd设计里面的核心概念:

◆ job

一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。

◆ tube

一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。

◆ producer

Job的生产者,通过put命令来将一个job放到一个tube中。

◆ consumer

Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

Beanstalkd中一个job的生命周期。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。

Beanstalkd基于的源码安装和使用很简单,在此略过。这里重点介绍一下其几个很nice的特性。

◆ 优先级

支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024。

◆ 持久化

可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态。

◆ 分布式容错

分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。

◆ 超时控制

为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。

Beanstalkd不足之处:在使用中发现一个Beanstalkd尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。

 

下面,我们通过php类Pheanstalk来进行简单入门。

1. composer安装

切换到项目目录,使用composer进行加载安装:composer require pda/pheanstalk

(如出现报错,请更新composer在下载:composer update)

2. 连接Beanstalk

--- 生产者 ---

关于如何安装Beanstalk,在这里就不进行说明了,操作很简单,请自行在执行系统安装好,才能使用。

作者是使用 4.0 版本的类库

连接服务

注意的是,在4.0之前的版本,是直接使用 new Pheanstalk($host, $port, ...) 方式连接的,4.0版本使用 create 静态方法进行连接。

连接成功,我们来完成一个简单测试,“像管道demo中,put进值”,如

查看demo管道信息

这样,生产者就完成任务放进管道的操作。

3. 如何处理管道任务

--- 消费者 ---

我们创建一个消费者的方法,通过while来处理管道每次产生的任务(这是模拟,实际生产中,下一篇文章会详细说明),如:

以下的代码是监听管道demo,并将任务取出来处理

当我们在终端执行时,我看到每次处理任务的数据,如:

整个生产者到消费者的流程就是这样,至于具体业务,需要根据自身的情况进行处理。

入门篇(一)完!

 

以下是整理了一些常用方法,仅供参考(来源于网上,感谢网友的提供)

//----------------------------------------维护类----------------------------------

//1.查看目前pheanStalkd状态信息
//print_r($ph->stats()); 

//2.显示目前存在的管道
//print_r($ph->listTubes()); 

//3.查看NewUsers管道的信息
//$ph->useTube('NewUsers')->put('test'); 
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道添加一个up任务
//print_r($ph->statsTube('NewUsers'));//3.查看NewUsers管道的信息

//6.查看指定管道中某一个任务的情况
//$job = $ph->watch('NewUsers')->reserve(); //5.从管道中取出任务(消费)
//print_r($ph->statsJob($job)); //6.查看指定管道中某一个任务的情况

//7.查看任务id为1的任务详情
//$job = $ph->peek(1);7.直接取出任务id为1的任务 [注:beanstalkd中所有任务的id都具有唯一性] 
//print_r($ph->statsJob($job));//查看任务id为1的任务详情


//----------------------------------------生产类--------------------------------------

////第一种 put()

//$tube = $ph->useTube('NewUsers');//连接NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道添加任务four,并返回结果
//注: put()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)

////第二种 putInTube() [注: putInTube()就是对useTube()和put()的封装]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任务three
////注: putInTube()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)
//print_r($res);//返回任务id

//print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的详细情况


//---------------------------------------消费类--------------------------------------

// 1.watch 监听NewUsers管道 [ 注: watch()同样可以监听多个管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//打印已经监听的管道


// 2.watch 监听多个管道
//$tube = $ph->watch('NewUsers')
//           ->watch('default');
//print_r($ph->listTubesWatched());//打印已经监听的管道


// 3.ignore 监听NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
//            ->ignore('default');
//print_r($ph->listTubesWatched());//打印已经监听的管道


// 4.reserve 监听NewUsers管道,并且取出任务
//$job = $ph->watch('NewUsers')
//          ->reserve();
//
////注reserve()有1个参数,阻塞的时间,过了阻塞时间,不管有没有东西,直接返回
//
//var_dump($job);//打印已经取出的任务
//$ph->delete($job);//删除已经取出的任务


// 5.putInTube/put 向NewUsers管道写入任务 [ 注:此为生产者方法,放到此处是为了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.查看NewUsers管道详细信息


// 6.release 将取出的任务放回ready状态,还有2个参数(优先级和延迟)
//$job = $ph->watch('NewUsers')->reserve();//6.监听NewUsers管道,并取出任务

//if (true) {
//    sleep(30);
//    $ph->release($job);//6.将任务取出之后,停留30秒,然后将任务状态重新变为ready
//} else {
//    $ph->delete($job);
//}


// 7.bury (预留) 将任务取出之后,发现后面执行的逻辑不成熟(比如发邮件,突然发现邮件服务器挂掉了),
//或者说还不能执行后面的逻辑,需要把任务先封存起来,等待时机成熟了,再拿出这个任务进行消费

//$job = $ph->watch('NewUsers')->reserve();//取出任务
//$ph->bury($job);//取出任务后,将任务放到一边(预留)

// 8.peekBuried() 将处在bury状态的任务读取出来
//$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
//var_dump($ph->statsJob($job));//打印任务状态(此时任务状态应该是bury)

// 9.kickJob() 将处在bury任务状态的任务转化为ready状态
//$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
//$ph->kickJob($job);

// 10.kick()  将处在bury任务状态的任务转化为ready状态,有第二个参数int, 批量将任务id小于此数值的任务转化为ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任务id小于65,并且任务状态处于bury的任务全部转化为ready

// 11.peekReady() 将管道中处于ready状态的任务读出来
//$job = $ph->peekReady('NewUser');//将NewUser管道中处于ready状态的任务读取出来
//var_dump($job);
//$ph->delete($job);

// 12.peekDelay() 将管道中所有处于delay状态的任务读取出来
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);


// 13.pauseTube() 对整个管道进行延迟设置,让管道处于延迟状态
//$ph->pauseTube('NewUser',10);//设置管道NewUser延迟时间为10s
//$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
//var_dump($job);

// 14.resumeTube() 恢复管道,让管道处于不延迟状态,立即被消费
//$ph->resumeTube('NewUser');//取消管道NewUser的延迟状态,变为立即读取
//$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
//var_dump($job);

// 15.touch() 让任务重新计算任务超时重发ttr时间,相当于给任务延长寿命

 

© 著作权归作者所有

胡椒大叔
粉丝 0
博文 9
码字总数 5784
作品 0
广州
私信 提问
新手指导Thinkphp开发指南

小编于昨日参加朋友婚礼,浑身喜气洋洋。今天给用户带来的是关于thinkphp开发指南的纯技术性的文档,本文档目的很简单: 1、 帮助开发人员掌握thinkphp入门 2、 快速利用thinkphp进行项目开发...

汤圆
2012/12/25
332
1
适合PHP新手入门上手的开发框架-thinkphp

ThinkPHP是一个开源的PHP框架,是为了简化企业级应用开发和敏捷WEB应用开发而诞生的。最早诞生于2006年初,原名FCS,2007年元旦正式更名为ThinkPHP,并且遵循Apache2开源协议发布。早期的思想...

big_cat
2013/06/15
1K
0
ThinkPHP 框架出现 Bug,致中文网站遭受了一周的攻击

据 ZDNET 报道,有超过 45000 个中国网站由于使用 ThinkPHP 框架受到了攻击。 这些攻击针对的是使用 ThinkPHP 构建的网站,ThinkPHP 是一个中国的 PHP 框架,在中国 Web 开发领域非常受欢迎。...

程六金
2018/12/26
8.7K
28
ThinkPHP中的三大自动简介

ThinkPHP中的三大自动简介 文章TAG:thinkphp 自动简介 过期已备案域名,注册就能用!终身VIP会员,畅享源码下载织梦精美仿站,火热预定中! 本文较为详细的讲述了ThinkPHP中的三大自动,是非...

thinkyoung
2015/10/10
0
0
thinkphp开发的框架的简单介绍

  说起来现在的互联网时代中用到thinkphp的地方也是很多的,这也是一项技术活,开发框架也是比较有难度的,下面就给大家简单地介绍一下。   ThinkPHP是一个免费开源的,快速、简单的面向...

孙智绘
2012/06/28
56
0

没有更多内容

加载失败,请刷新页面

加载更多

Jenkins admin 密码忘记解决

一、admin密码未更改情况 1.进入\Jenkins\secrets目录,打开initialAdminPassword文件,复制密码; find / -name initialAdminPassword [root@jenkins jenkins]# cat /var/lib/jenkins/secre......

SuShine
31分钟前
5
0
LiveData原理分析

LiveData原理分析 1 LiveData简介 大部分Android应用会从网络或SQLite数据库存取数据,并根据数据更新界面。为了避免ANR,主线程中不能存取数据。而后台线程中无法更新界面。通常的做法是让后...

tommwq
45分钟前
4
0
Java描述设计模式(20):命令模式

本文源码:GitHub·点这里 || GitEE·点这里 一、生活场景 1、场景描述 智能电脑的品牌越来越多,由此诞生了一款电脑控制的APP,万能遥控器,用户在使用遥控器的时候,可以切换为自家电视的品...

知了一笑
46分钟前
3
0
java---网络编程(上)

1.1网络编程 网络编程指的是编写运行在多个设备计算机的程序,这些计算机通过网络连接起来 java.net包中提供了两种常见的网络协议的支持: TCP:TCP是传输控制层协议的缩写,它保障了两个应用...

Firefly-
50分钟前
15
0
城市搜索插件 city-query

  今天,给大家介绍一个比较简单有用的插件city-query,大家可以从coding上面下载下来。 git clone https://gitee.com/jflsy/city-query.git   引用插件时只需要src文件下的内容就可以了...

芳缘
55分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部