tp5下基于Swoole通用队列实现

原创
2017/08/04 16:51
阅读数 1.8K

一【队列用途】

一般队列主要用途就是异步任务,用来缓解:

1)耗时操作,如生成图片等;

2)并行任务,如消息下发通知、批量处理任务等。

理论上不需要实时返回结果的请求都可以放到队列里面执行(当然实际项目中,不可能这么傻的用队列),队列的好处是:

1)它可以快速的返回请求,减少服务端因为等待而产生的压力,同时不影响业务的处理。

2)另外在处理大批量的任务时能很好的分散处理压力。

二【队列扩展说明】

1、geaman队列

公司老的项目一直都是使用geaman作为队列服务,蛮稳定的,比较喜欢它,但也有不能忍受的地方:

1)易学习掌握,使用方便。大家可以先封装了一个geaman通用工具类,包含server和client,后来的使用者就可以很方便的再业务中使用;

2)可以通过命令查看队列的执行情况,这对于查bug很有效;

3)不能确定异步任务的最终执行情况,如,插库任务,丢到队列如果插库失败,调用者并不会知道,这对于需要结果的队列任务来说会让人很抓狂。当然这种情况我们也有很多迂回的解决方案,如,数据表做标识符等。

2、swoole队列

对于swoole,我们很早就接触了解,它跟php的结合无疑是最佳搭档,一度有一段时期我们用了swoole框架来开发,可惜后面由于人为原因都放弃了;但总体开发体验还是很不错的,包括它的扩展性、给开发人员的自由度、本身性能、特别对于比较底层的功能实现让人不能罢手,比如异步队列、异步文件读写、网络通信部分等;另外,swoole不会像纯C写的php框架那么重,如phalcon、鸟哥的yaf,纯c框架,学习曲线高不说,在出现问题需要调试也让人烦躁。

回到队列正题,这里我们只用到swoole的异步队列功能,swoole队列特点(根据我们使用情况的小结):

1)上手快,使用简单方便,简单到直接拷贝官方示例就可以跑起来;但不能轻敌,想要用好swoole的队列及高级点功能还是需要费点功夫,根据自己服务器的配置情况来进行微调。

2)对window开发环境不友好,扩展在linux下安装方便,不过官方也给出了在window下使用方法。如果你是window环境开发的话,个人建议,在虚拟机弄个linux环境。

3)它提供了每个队列执行情况的回调,很好的解决上面geaman的第3点问题

4)不能查看队列情况(即上面geaman的第2点),即你不知道目前队列是否有任务存在

5)每个服务队列需要占用一个监听端口,即当你有多个不同业务需要跑队列任务时,你需要swoole server端监听不同的端口,来为不同的业务队列服务;----通用队列实现就为了缓解此问题。

官方环境依赖说明:https://wiki.swoole.com/wiki/page/7.html

想了解更多swoole内容,大家可以参考官网:http://www.swoole.com/ 

三【swoole的通用队列实现】

1、swoole队列 -- Server端启动代码

$serv = new Swoole\Server("127.0.0.1", 8888);//监听本机8888端口

//swoole配置信息

$serv->set([

// 一般设置为服务器CPU数的1-4倍

'worker_num'      => 12,

// task进程的数量(一般任务都是同步阻塞的,可以设置为单进程单线程)

'task_worker_num' => 20,

'daemonize'       => true,

'open_eof_split'  => true,//打开eof_split检测

'package_eof'     => PHP_EOL,//设置EOF

// 以守护进程执行

//  'task_ipc_mode' => 1,  // 使用unix socket通信,默认模式

'log_file'        => $this->logFile,

// swoole日志

// 数据包分发策略(dispatch_mode=1/3时,底层会屏蔽onConnect/onClose事件,

// 原因是这2种模式下无法保证onConnect/onClose/onReceive的顺序,非请求响应式的服务器程序,请不要使用模式1或3)

//  'dispatch_mode' => 2,        // 固定模式,根据连接的文件描述符分配worker。这样可以保证同一个连接发来的数据只会被同一个worker处理

]);

$serv->on('Receive', function($serv, $fd, $from_id, $data) {

$task_id = $serv->task("Async");

    echo "Dispath AsyncTask: id=$task_id\n";

});

$serv->on('Task', function ($serv, $task_id, $from_id, $data) {

    echo "New AsyncTask[id=$task_id]".PHP_EOL;

    $serv->finish("$data -> OK");

});

$serv->on('Finish', function ($serv, $task_id, $data) {

    echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;

});

$serv->start();

2、swoole队列 -- client代码

$client = new \swoole_client(SWOOLE_SOCK_TCP);
$client->connect('127.0.0.1', 8888, 1);//与server端对应
$client->send($data.PHP_EOL);

通过以上代码,就可以愉快的使用swoole提供的队列功能;

3、通用队列实现

1)将以上Server和Client封装为各自基类,以后上层的server及client业务均需集成相应的基类,如图:

 

2)建立一个Server.php类

本代码基于thinkphp5实现,如果是其他框架可以类似实现,具体说明见下面代码及注释说明:以异步发邮件做为例子说明

<?php
/**
 * swoole通用服务
 */
namespace app\console\swooletask;
use app\console\common\ServerCommand;
use think\Log;
//继承Server基类
class Server extends ServerCommand {

//实际业务的接口地址前缀
    private $swooleTaskPath = '\\app\console\\swooletask\\task\\';
    protected function configure() {
       //此为tp5特有的cli命名模式 $this->setName('kmads-common-swoole')->setDescription('swoole通用服务');
    }

    public function onReceive($serv, $fd, $from_id, $data) {
        $data = trim($data);
        $datas = json_decode($data, true);

/*注意:核心点都在这里

* 实现思路:client端通过传递不同的type来区分业务队列,然后通

* 过神奇的函数call_user_func_array对接到实际业务logic代码

*/
        switch($datas['type']) {

    //异步发邮件功能
            case 'Email': {
                $taskClass = $this->swooleTaskPath . 'email\\' . $datas['type'] . 'Task';
                call_user_func_array(array(
                    new $taskClass(),
                    'index'
                ), array(
                    $serv,
                    $datas['data']
                ));
                break;
            }
            case Test: {
                $taskClass = $this->swooleTaskPath . 'wechat\\' . $datas['type'] . 'Task';
                call_user_func_array(array(
                    new $taskClass(),
                    'index'
                ), array(
                    $serv,
                    $datas['data']
                ));
                break;
            }

            default:
                return [
                    'type'   => 'undefided',
                    'status' => false
                ];
        }
    }
    public function onTask($serv, $task_id, $src_worker_id, $data) {
        $className = $data['class'];
        $action    = $data['action'];
        return call_user_func_array(array(
            new $className(),
            $action
        ), array($data['datas']));
    }
}

业务的对应实现接口:EmailTask.php

class EmailTask{
    /**
     * swoole task执行触发入口
     * @param $serv
     * @param $data
     */
    public function index($serv,$data) {

//添加swoole队列收到请求数据onReceive的其他逻辑

//执行swoole队列的task任务;task与worker的关系见官网文档
        $serv->task([
            'class'  => '\app\console\swooletask\logic\Email',
            'action' => 'sendEmail',
            'datas'  => $data
        ]);
    }
}

task实际逻辑处理,logic/Email.php -- 这个就是实现实际的发邮件功能,代码就不贴出来了

目录结构:大家可以根据自己的业务结构进行自由组织

 

4、client端调用

$email = [
    'type' => 'Email',//队列业务类型
    'data' => $data,//你需要传递的数据
];
$client = new \ClientCommand();
$client->sendData(json_encode($email));//基类ClientCommand提供的发送数据到server端的方法

四【小结】

1、通过定义type及实现相应type的接口就可以很方便的扩展一个swoole队列需求,而不需要单独监听一个端口

2、Client端调用,引入ClientCommand 基类即可调用

 五【使用过程坑的记录】

【坑1】使用swoole跑队列任务,且队列里涉及数据库操作时,注意进程间数据库连接会被共享,导致数据库执行失败或不执行现象

【解决方案1】加了上面那段代码后,会对每一个队列进程建进程对应的链接,不影响其他进程,如:

附,官方回复:https://group.swoole.com/question/106787

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部