文档章节

PHP memcache实现消息队列实例

tiger_lee
 tiger_lee
发布于 2014/03/13 16:03
字数 1467
阅读 545
收藏 13

现在memcache在服务器缓存应用比较广泛,下面我来介绍memcache实现消息队列等待的一个例子,有需要了解的朋友可参考。

memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志。然后通过定时程序将内容落地到文件或者数据库


php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列。
方便实现队列的轻量级队列服务器是:
starling支持memcache协议的轻量级持久化服务器
https://github.com/starling/starling
Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列
http://kr.github.com/beanstalkd/
php中也可以使用memcache/memcached来实现消息队列

<?php
/**
* Memcache 消息队列类
*/
class QMC {
const PREFIX = 'ASDFASDFFWQKE';
/**
* 初始化mc
* @staticvar string $mc
* @return Memcache
*/
static private function mc_init() {
static $mc = null;
if (is_null($mc)) {
$mc = new Memcache;
$mc->connect('127.0.0.1', 11211);
}
return $mc;
}
/**
* mc 计数器,增加计数并返回新的计数
* @param string $key   计数器
* @param int $offset   计数增量,可为负数.0为不改变计数
* @param int $time     时间
* @return int/false    失败是返回false,成功时返回更新计数器后的计数
*/
static public function set_counter( $key, $offset, $time=0 ){
$mc = self::mc_init();
$val = $mc->get($key);
if( !is_numeric($val) || $val < 0 ){
$ret = $mc->set( $key, 0, $time );
if( !$ret ) return false;
$val = 0;
}
$offset = intval( $offset );
if( $offset > 0 ){
return $mc->increment( $key, $offset );
}elseif( $offset < 0 ){
return $mc->decrement( $key, -$offset );
}
return $val;
}
/**
* 写入队列
* @param string $key
* @param mixed $value
* @return bool
*/
static public function input( $key, $value ){
$mc = self::mc_init();
$w_key = self::PREFIX.$key.'W';
$v_key = self::PREFIX.$key.self::set_counter($w_key, 1);
return $mc->set( $v_key, $value );
}
/**
* 读取队列里的数据
* @param string $key
* @param int $max  最多读取条数
* @return array
*/
static public function output( $key, $max=100 ){
$out = array();
$mc = self::mc_init();
$r_key = self::PREFIX.$key.'R';
$w_key = self::PREFIX.$key.'W';
$r_p   = self::set_counter( $r_key, 0 );//读指针
$w_p   = self::set_counter( $w_key, 0 );//写指针
if( $r_p == 0 ) $r_p = 1;
while( $w_p >= $r_p ){
if( --$max < 0 ) break;
$v_key = self::PREFIX.$key.$r_p;
$r_p = self::set_counter( $r_key, 1 );
$out[] = $mc->get( $v_key );
$mc->delete($v_key);
}
return $out;
}
}
/**
使用方法:
QMC::input($key, $value );//写入队列
$list = QMC::output($key);//读取队列
*/
?>

基于PHP共享内存实现的消息队列:

<?php
/**
* 使用共享内存的PHP循环内存队列实现
* 支持多进程, 支持各种数据类型的存储
* 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区
*
* @author wangbinandi@gmail.com
* @created 2009-12-23
*/
class ShmQueue
{
private $maxQSize = 0; // 队列最大长度
private $front = 0; // 队头指针
private $rear = 0;  // 队尾指针
private $blockSize = 256;  // 块的大小(byte)
private $memSize = 25600;  // 最大共享内存(byte)
private $shmId = 0;
private $filePtr = './shmq.ptr';
private $semId = 0;
public function __construct()
{
$shmkey = ftok(__FILE__, 't');
$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );
$this->maxQSize = $this->memSize / $this->blockSize;
// 申?一个信号量
$this->semId = sem_get($shmkey, 1);
sem_acquire($this->semId); // 申请进入临界区
$this->init();
}
private function init()
{
if ( file_exists($this->filePtr) ){
$contents = file_get_contents($this->filePtr);
$data = explode( '|', $contents );
if ( isset($data[0]) && isset($data[1])){
$this->front = (int)$data[0];
$this->rear  = (int)$data[1];
}
}
}
public function getLength()
{
return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
}
public function enQueue( $value )
{
if ( $this->ptrInc($this->rear) == $this->front ){ // 队满
return false;
}
$data = $this->encode($value);
shmop_write($this->shmId, $data, $this->rear );
$this->rear = $this->ptrInc($this->rear);
return true;
}
public function deQueue()
{
if ( $this->front == $this->rear ){ // 队空
return false;
}
$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
$this->front = $this->ptrInc($this->front);
return $this->decode($value);
}
private function ptrInc( $ptr )
{
return ($ptr + $this->blockSize) % ($this->memSize);
}
private function encode( $value )
{
$data = serialize($value) . "__eof";
echo '';
echo strlen($data);
echo '';
echo $this->blockSize -1;
echo '';
if ( strlen($data) > $this->blockSize -1 ){
throw new Exception(strlen($data)." is overload block size!");
}
return $data;
}
private function decode( $value )
{
$data = explode("__eof", $value);
return unserialize($data[0]);
}
public function __destruct()
{
$data = $this->front . '|' . $this->rear;
file_put_contents($this->filePtr, $data);
sem_release($this->semId); // 出临界区, 释放信号量
}
}
/*
// 进队操作
$shmq = new ShmQueue();
$data = 'test data';
$shmq->enQueue($data);
unset($shmq);
// 出队操作
$shmq = new ShmQueue();
$data = $shmq->deQueue();
unset($shmq);
*/
?>

对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。 
如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码: 
代码如下:

class Memcache_Queue 
{ 
private $memcache; 
private $name; 
private $prefix; 
function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") 
{ 
if ($memcache == null) { 
throw new Exception("memcache object is null, new the object first."); 
} 
$this->memcache = $memcache; 
$this->name = $name; 
$this->prefix = $prefix; 
$this->maxSize = $maxSize; 
$this->front = 0; 
$this->real = 0; 
$this->size = 0; 
} 
function __get($name) 
{ 
return $this->get($name); 
} 
function __set($name, $value) 
{ 
$this->add($name, $value); 
return $this; 
} 
function isEmpty() 
{ 
return $this->size == 0; 
} 
function isFull() 
{ 
return $this->size == $this->maxSize; 
} 
function enQueue($data) 
{ 
if ($this->isFull()) { 
throw new Exception("Queue is Full"); 
} 
$this->increment("size"); 
$this->set($this->real, $data); 
$this->set("real", ($this->real + 1) % $this->maxSize); 
return $this; 
} 
function deQueue() 
{ 
if ($this->isEmpty()) { 
throw new Exception("Queue is Empty"); 
} 
$this->decrement("size"); 
$this->delete($this->front); 
$this->set("front", ($this->front + 1) % $this->maxSize); 
return $this; 
} 
function getTop() 
{ 
return $this->get($this->front); 
} 
function getAll() 
{ 
return $this->getPage(); 
} 
function getPage($offset = 0, $limit = 0) 
{ 
if ($this->isEmpty() || $this->size < $offset) { 
return null; 
} 
$keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); 
$num = 1; 
for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) 
{ 
$keys[] = $this->getKeyByPos($pos); 
$num++; 
if ($limit > 0 && $limit == $num) { 
break; 
} 
} 
return array_values($this->memcache->get($keys)); 
} 
function makeEmpty() 
{ 
$keys = $this->getAllKeys(); 
foreach ($keys as $value) { 
$this->delete($value); 
} 
$this->delete("real"); 
$this->delete("front"); 
$this->delete("size"); 
$this->delete("maxSize"); 
} 
private function getAllKeys() 
{ 
if ($this->isEmpty()) 
{ 
return array(); 
} 
$keys[] = $this->getKeyByPos($this->front); 
for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) 
{ 
$keys[] = $this->getKeyByPos($pos); 
} 
return $keys; 
} 
private function add($pos, $data) 
{ 
$this->memcache->add($this->getKeyByPos($pos), $data); 
return $this; 
} 
private function increment($pos) 
{ 
return $this->memcache->increment($this->getKeyByPos($pos)); 
} 
private function decrement($pos) 
{ 
$this->memcache->decrement($this->getKeyByPos($pos)); 
} 
private function set($pos, $data) 
{ 
$this->memcache->set($this->getKeyByPos($pos), $data); 
return $this; 
} 
private function get($pos) 
{ 
return $this->memcache->get($this->getKeyByPos($pos)); 
} 
private function delete($pos) 
{ 
return $this->memcache->delete($this->getKeyByPos($pos)); 
} 
private function getKeyByPos($pos) 
{ 
return $this->prefix . $this->name . $pos; 
} 
}



本文转载自:http://www.php100.com/html/php/lei/2013/0905/5388.html

tiger_lee
粉丝 4
博文 10
码字总数 463
作品 0
闵行
程序员
私信 提问
Linux下常用轻量级队列服务比较

Linux IPC: IPC进程间通信(Inter-Process Communication)就是指多个进程之间相互通信,交换信息的方法。 系统消息队列功能是这些方法中的其中一种。使用此队列不需要额外安装服务,是系统内...

苗雨顺
2014/03/24
0
1
轻量级持久化服务器--Starling

twitter最近将ruby实现的消息队列服务器starling开源了,这是一个支持memcache协议的轻量级持久化服务器,因此使用php/perl/ruby/java等多种客户端都没问题,可以将较慢的处理逻辑通过消息队...

匿名
2010/11/25
2.1K
0
【转】 starling试用手记

twitter最近将ruby实现的消息队列服务器starling开源了,这是一个支持memcache协议的轻量级持久化服务器,因此使用php/perl/ruby/java等多种客户端都没问题,可以将较慢的处理逻辑通过消息队...

鉴客
2010/11/25
468
0
队列 activeMQ

在web开发过程中,我们会使用到队列。先进先出的特点。在最开始接触的是张宴的HTTPSQS,后来在使用Redis的时候,使用过reids作为队列,同时memcache也可以作为队列。今天主要是说一下Apache出...

bieru
2014/06/15
0
0
学习python-12,memcached和redis

memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度...

segastar660
2018/06/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

用Python帮你上马,哪里无码打哪里

目录 0 引言 1 环境 2 需求分析 3 代码实现 4 代码全景展示 5 后记 0 引言 所谓的像素图,就是对图像做一个颗粒化的效果,使其产生一种妙不可言的朦胧感。费话不多说,先来看一张效果图。 <c...

上海小胖
14分钟前
1
0
python from import与import as 的含义

from os import makedirs, unlink, sep #从os包中引入 makedirs.unlink,sep类 from os.path import dirname, exists, isdir, splitext 从 os包中的path类中引入 dirmame exists 等方法 impo......

dillonxiao
14分钟前
1
0
【转】URL最大长度问题

今天在测试Email Ticket的时候发现在进行Mark as Read/Unread操作时,请求是通过GET方式进行的。URL中列出了所有参与该操作的Ticket Id。于是,我想起GET请求是有最大长度限制的。遂输入超长...

ZhangLG
16分钟前
0
0
Segment段

CurrentHashMap和HashMap相比支持并发操作,整个CurrentHashMap是由一个个的Segment组成的,也是就是常说的分段锁 Segment继承了重入锁ReentrantLock来进行加锁, 可以简单的把CurrentHashMa...

周慕云
18分钟前
0
0
JS Date 自定义格式化方法

JS Date 自定义格式化方法 Date 时间对象 快速 自定义格式化 定义方法 // 自定义格式化方法Date.prototype.format = function(fmt) { var o = { "M+" : this.getMonth()+1, ......

DrChenXX
22分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部