文档章节

beanstalkd消息队列使用

宋和毅
 宋和毅
发布于 2013/08/27 00:58
字数 1365
阅读 19131
收藏 34

最近在做一个项目,需要用户在提交相关信息后,分析信息内容,然后将分析结果推送到相关的用户的信息模块中,用到了beanstalk这个队列系统。 

beanstalkd介绍:

Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。


Beanstalkd设计里面的核心概念:
◆ job
一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。
◆ tube
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。
◆ producer
Job的生产者,通过put命令来将一个job放到一个tube中。
◆ consumer
Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

Beanstalkd中一个job的生命周期如图所示。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。

特性:

◆ 优先级
支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024。
◆ 持久化
可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态。
◆ 分布式容错
分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。
◆ 超时控制

为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。

 下载:

服务端:http://kr.github.io/beanstalkd/download.html

客户端:https://github.com/kr/beanstalkd/wiki/client-libraries

安装:

ubuntu 

sudo apt-get install beanstalkd

centos 

yum install beanstalkd

源码安装 

tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.9.tar.gz

cd beanstalkd

make install PERFIX=/usr/bin/beanstalkd


后台启动:

beanstalkd -l 地址 -p 端口号 -z 最大的任务大小(byte) -c &

如果是外部客户端连接,ip地址要写外网地址,这样才能连接上

启动选项

 -b DIR   wal directory

 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help


php客户端的使用:我使用的是这个简易的类 https://github.com/davidpersson/beanstalk

发送任务:

<?php
//发送任务

require_once 'src/Socket/Beanstalk.php';

//实例化beanstalk
$beanstalk = new Socket_Beanstalk(array(
    'persistent' => false, //是否长连接
    'host' => 'ip地址',
    'port' => 11600,  //端口号默认11300
    'timeout' => 3    //连接超时时间
));

if (!$beanstalk->connect()) {
    exit(current($beanstalk->errors()));
}
//选择使用的tube
$beanstalk->useTube('test');
//往tube中增加数据
$put = $beanstalk->put(
    23, // 任务的优先级.
    0,  // 不等待直接放到ready队列中.
    60, // 处理任务的时间.
    'hello, beanstalk' // 任务内容
);

if (!$put) {
    exit('commit job fail');
}

$beanstalk->disconnect();

 处理任务:


<?php
require_once 'src/Socket/Beanstalk.php';
//实例化beanstalk
$beanstalk = new Socket_Beanstalk(array(
    'persistent' => false, //是否长连接
    'host' => 'ip地址',
    'port' => 11600,  //端口号默认11300
    'timeout' => 3    //连接超时时间
));

if (!$beanstalk->connect()) {
    exit(current($beanstalk->errors()));
}
//查看beanstalkd状态
//var_dump($beanstalk->stats());

//查看有多少个tube
//var_dump($beanstalk->listTubes());

$beanstalk->useTube('test');

//设置要监听的tube
$beanstalk->watch('test');

//取消对默认tube的监听,可以省略
$beanstalk->ignore('default');

//查看监听的tube列表
//var_dump($beanstalk->listTubesWatched());

//查看test的tube当前的状态
//var_dump($beanstalk->statsTube('test'));


while (true) {
    //获取任务,此为阻塞获取,直到获取有用的任务为止
    $job = $beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk')

    //处理任务
    $result = doJob($job['body']);

    if ($result) {
        //删除任务
        $beanstalk->delete($job['id']);
    } else {
        //休眠任务
        $beanstalk->bury($job['id']);
    }
    //跳出无限循环
    if (file_exists('shutdown')) {
        file_put_contents('shutdown', 'beanstalkd在'.date('Y-m-d H:i:s').'关闭');
        break;
    }
}
$beanstalk->disconnect();




© 著作权归作者所有

宋和毅
粉丝 4
博文 7
码字总数 6252
作品 0
厦门
私信 提问
加载中

评论(3)

superHu
superHu
很赞,通俗易懂
一根烟的寂寞
一根烟的寂寞
下载下来目录不对呢 src/Socket/Beanstalk.ph
inevermore
inevermore
总结的很好,尤其是job的状态转移
【队列源码】消息队列beanstalkd源码详解

1.消息队列简介 计算机软件发展的一个重要目标是降低软件耦合性; 网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每个阶段之...

陈雷_顺风车
2018/08/20
0
0
轻量级消息队列--beanstalkd

Beanstalk 是一个简单、快速的消息队列。Beanstalkd之于RabbitMQ,就好比Nginx之于Apache,Varnish之于Squid。后面在项目中使用Beanstalkd的过程中,更发现其简单、轻量级、高性能、易使用等...

匿名
2011/05/11
26.5K
2
phalcon队列使用Queueing

参考资料: phalcon文档 phalcon queueing使用心得 Phalcon with Beanstalkd Beanstalkd介绍1 Beanstalkd介绍2 beanstalkd消息队列使用 phalcon beanstalk队列的choose和watch方法有什么区别...

爬墙
2016/08/18
140
0
PHP memcache实现消息队列实例

现在memcache在服务器缓存应用比较广泛,下面我来介绍memcache实现消息队列等待的一个例子,有需要了解的朋友可参考。 memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀...

tiger_lee
2014/03/13
566
0
Beanstalkd 1.10 发布,轻量级消息队列

同往常一样,Beanstalked在2.0发布前都不会出现兼容标准方面的改变。以后在Beanstalked1.x上进行的操作将不能在Beanstalked1.0上操作。 新增功能: 修复了suspend和其他入口的崩溃(#220) ...

大胖森
2015/03/18
2.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

小知识:讲述Linux命令别名与资源文件的区别

别名 别名是命令的快捷方式。为那些需要经常执行,但需要很长时间输入的长命令创建快捷方式很有用。语法是: alias ppp='ping www.baidu.com' 它们并不总是用来缩短长命令。重要的是,你将它...

老孟的Linux私房菜
35分钟前
2
0
《JAVA核心知识》学习笔记(6. Spring 原理)-5

它是一个全面的、企业应用开发一站式的解决方案,贯穿表现层、业务层、持久层。但是 Spring 仍然可以和其他的框架无缝整合。 6.1.1. Spring 特点 6.1.1.1. 轻量级 6.1.1.2. 控制反转 6.1.1....

Shingfi
36分钟前
4
0
Excel导入数据库数据+Excel导入网页数据【实时追踪】

1.Excel导入数据库数据:数据选项卡------>导入数据 2.Excel导入网页数据【实时追踪】:

东方墨天
44分钟前
4
1
正则表达式如何匹配一个单词存在一次或零次并且不占捕获组位置

正则表达式如何匹配一个单词存在一次或零次并且不占捕获组位置 今天要用正则表达式实现匹配一个词出现一次或者不出现的情况,但是又不仅仅是这么简单的需求。先详细说下我这种情况吧,也许有...

Airship
50分钟前
6
0
第八讲:asp.net C# web 读取文件

本讲主要讲解如何在asp.net页面上传文件。 首先,前台页面: 其次,后台页面: 结果: 1、前台效果: 2、后台结果:

刘日辉
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部