文档章节

Swoole入门(3)☞使用Worker、Task模块

ali安东尼
 ali安东尼
发布于 2017/09/12 14:46
字数 1881
阅读 3
收藏 0
点赞 0
评论 0

###task模块的用途

task模块可以用来一些异步的慢速任务,比如广播消息,发送群邮件等等;同时还支持PHP的数据库连接池,异步队列等,功能很强大。

当swoole接收到任务时,worker进程将任务丢给task进程之后,worker进程可以继续处理新的数据请求。任务完成后会异步地通知worker进程告诉它此任务已经完成。

###再次深入了解Reactor、Worker、Task的关系 

Reactor线程

Reactor线程以多线程、异步非阻塞模式接收客户端机器的TCP连接、处理网络IO、收发数据;

Reactor层面的代码全部为C代码,除Start/Shudown事件回调外,不执行任何PHP代码

若连接为TCP连接,Reactor将发来的数据缓冲、拼接、拆分成完整的一个请求数据包。

Worker进程

Worker进程以多进程模式接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据;并生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端。

可以是异步非阻塞模式,也可以是同步阻塞模式

Task进程

Task进程以多进程模式接受由Worker进程通过swoole_server->task/taskwait方法投递的任务,处理任务后,并将结果数据返回给Worker进程。

完全是同步阻塞模式

三者关系:

假设Server就是一个工厂,那reactor就是销售,帮你接项目订单。而worker就是工人,当销售接到订单后,worker去工作生产出客户要的东西。而task_worker可以理解为行政人员,可以帮助worker干些杂事,让worker专心工作。

###示例代码1     <?php          class Test     {         public $index = 0;         public $fd = 0;     }          class Server     {         private $serv;         private $test;              public function __construct() {             $this->serv = new swoole_server("0.0.0.0", 9501);             $this->serv->set(array(                 'worker_num' => 8,                 'daemonize' => false,                 'max_request' => 10000,                 'dispatch_mode' => 2,                 'task_worker_num' => 8             ));             $this->serv->on('Start', array($this, 'onStart'));             $this->serv->on('Connect', array($this, 'onConnect'));             $this->serv->on('Receive', array($this, 'onReceive'));             $this->serv->on('Close', array($this, 'onClose'));             // bind callback             $this->serv->on('Task', array($this, 'onTask'));             $this->serv->on('Finish', array($this, 'onFinish'));             $this->serv->start();         }         public function onStart( $serv ) {             echo "Start\n";         }         public function onConnect( $serv, $fd, $from_id ) {             echo "Client {$fd} connect\n";         }         public function onClose( $serv, $fd, $from_id ) {             echo "Client {$fd} close connection\n";         }              public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {             //此处data是client传送过来的             echo "Get Message From Client {$fd}:{$data}\n";             $this->test = new Test();             $this->test->fd = $fd;             // var_dump($this->test);                          //task方法传递只能传递一个字符串,所以需要用json打包             $serv->task( json_encode($this->test) );         }              public function onTask($serv,$task_id,$from_id, $data) {             //from_id表示worker进程号             echo "This Task {$task_id} from Worker {$from_id}\n";                  var_dump($data);                  $data = json_decode($data, true);                  //发送给客户端             $serv->send($data['fd'], 'Task is over!');             //返回给worker进程表示完成任务             return "Finished";         }         public function onFinish($serv,$task_id, $data) {             echo "Task {$task_id} finish\n";             //此处data就是onTask()中return的数据             echo "Result: {$data}\n";                      }     }     $server = new Server();

####示例一代码结果

server.php

    root:/var/www/html/silence/swoole/course/task# php Server.php      Start     Client 1 connect     Get Message From Client 1:huangbin          This Task 0 from Worker 2     string(18) "{"index":0,"fd":1}"     Task 0 finish     Result: Finished

client.php

    root:/var/www/html# telnet 127.0.0.1 9501     Trying 127.0.0.1...     Connected to 127.0.0.1.     Escape character is '^]'.     huangbin

####示例1工作流程

worker进程触发task()方法,将任务投递到task进程,task调用onTask()方法处理任务,处理完成后在该方法return消息,并触发worker进程的onFinsh()方法。

####示例1代码流程

1.构造函数处注册Task事件回调函数为Task、Finish,分别指向onTask()、onFinish()方法

2.onTask()方法

onTask()方法在task_worker进程内被调用。当worker进程向task_worker进程投递新的任务时。当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。

function onTask(swoole_server $serv, int $task_id, int $src_worker_id, mixed $data);

$task_id是任务ID,由swoole扩展内自动生成,用于区分不同的任务。

$src_worker_id来自于哪个worker进程

$data 是任务的内容

【注意】:$task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同

当任务完成后,可在onTask函数中使用return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成,也就是说return的数据是返回给onFinish函数。

【注意】:onTask函数执行时遇到致命错误退出,或者被外部进程强制kill,当前的任务会被丢弃,但不会影响其他正在排队的Task           2.onFinish()方法 当worker进程投递的任务在task_worker中完成时,task进程会调用swoole_server->finish()方法将任务处理的结果发送给worker进程。

void onFinish(swoole_server $serv, int $task_id, string $data)

$task_id是任务的ID

$data是任务处理的结果内容

【注意】

执行onFinish方法的worker进程与下发task任务的worker进程是同一个进程

如果task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish

####示例1注意事项

1.worker进程传递给worker进程的数据,也就是task($data)中的data数据小于8K是通过管道传输,大于8K是通过临时文件的写入进行传递。同时data数据只能是一个字符串,所以需要用json进行打包。

2.若在task进程需要发送数据给client,这时候需要在worker进程中传递client的fd,来能保证传递到正确的client。

3.worker进程和task进程的对象是不会共享的,毕竟两者的内存是不共享的。下面使用一个实例证明一下:

onReceive()和onFinish()方法同属worker进程,test对象共享;而onTask()方法是task进程test对象独立,不和worker进程共享。

    class Test     {         public $index = 0;     }

    class Server     {         public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {             $this->test = new Test();             var_dump($this->test);                        $serv->task( serialize($this->test) );         }              public function onTask($serv,$task_id,$from_id, $data) {                      $data = unserialize($data);             $data->index = 2;                  $this->test = new Test();             $this->test->index = 2;                  return "Finished";         }         public function onFinish($serv,$task_id, $data) {                   var_dump($this->test);         }     }

###mysql连接池的介入

    <?php     class MySQLPool     {         private $serv;         private $pdo;         public function __construct() {             $this->serv = new swoole_server("0.0.0.0", 9501);             $this->serv->set(array(                 'worker_num' => 8,                 'daemonize' => false,                 'max_request' => 10000,                 'dispatch_mode' => 3,                 'debug_mode'=> 1 ,                 'task_worker_num' => 8             ));             $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));             $this->serv->on('Connect', array($this, 'onConnect'));             $this->serv->on('Receive', array($this, 'onReceive'));             $this->serv->on('Close', array($this, 'onClose'));             // bind callback             $this->serv->on('Task', array($this, 'onTask'));             $this->serv->on('Finish', array($this, 'onFinish'));             $this->serv->start();         }         public function onConnect( $serv, $fd, $from_id ) {             echo "Client {$fd} connect\n";         }         public function onClose( $serv, $fd, $from_id ) {             echo "Client {$fd} close connection\n";         }              public function onWorkerStart( $serv , $worker_id) {             //echo "onWorkerStart\n";             if($serv->taskworker){                 $this->pdo = mysqli_connect("127.0.0.1", "root", "123456", "silence");;                 echo "Task Worker\n";             } else {                 echo "Worker Process\n";             }         }              public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {                          $task = [                 'sql' => 'insert into user(username) values (?)',                 'user_name' => 'silence',                 'fd' => $fd             ];             $serv->task(json_encode($task));         }              public function onTask($serv,$task_id,$from_id, $data) {             try{                 $data = json_decode($data,true);                 $statement = $this->pdo->prepare($data['sql']);                 $statement->bind_param('s',  $data['user_name']);                 $statement->execute();                      $serv->send($data['fd'] , "Insert succeed");                 return "true";             } catch( PDOException $e ) {                 var_dump( $e );                 return "false";             }         }         public function onFinish($serv,$task_id, $data) {             var_dump("result: " + $data);         }     }     new MySQLPool();

上面主要展示了如何在worker进程和task进程进行mysql数据交互。

function onWorkerStart(swoole_server $server, int $worker_id);

此事件在worker进程/task进程启动时发生。这里可以创建如mysql对象,可以保证在进程生命周期内正常使用;

swoole_server::$taskworker表示是否为task进程,是为true。

worker_id表示当前worker进程的id

###结语

接下来继续深入了解swoole的毫秒级定时器模块把!

由睿江云人员提供,想了解更多,请登陆www.eflycloud.com

© 著作权归作者所有

共有 人打赏支持
ali安东尼
粉丝 3
博文 192
码字总数 173101
作品 0
广州
Swoole 分布式通讯框架--SwooleDistributed

SwooleDistributed 是swoole分布式系统的实现,他提供了一套基于swoole扩展的分布式通讯框架。 结构图: SwooleDistributed 不仅提供了分布式搭建的必要设施,还提供了4大组件帮助你提高编写...

白_猫 ⋅ 2016/07/25 ⋅ 3

Swoole 1.10.0 发布,增加多项新特性

PHP的异步、并行、高性能网络通信引擎 Swoole 已发布 1.10.0 版本。此版本增加了多项新特性。 自动 DNS 解析 新版本的异步客户端不再需要使用 swooleasyncdnslookup 解析域名了,底层实现了自...

matyhtf ⋅ 01/08 ⋅ 11

swoole项目思维转换 -- 前篇

PHP是最好的语言,Swoole重新定义了最好的语言,这当然是个梗了,不过php做为一个入门低、开发快、执行效率高的一门语言,而在以快速著称的pc互联网时代,无可争议的成为首选,这是php的优势...

杨太化 ⋅ 2015/10/15 ⋅ 0

swoole实现Timer定时器、心跳检测及Task进阶实例:mysql连接池

Table of Contents 1.Timer定时器 2.心跳检测 3.Task进阶:MySQL连接池 环境说明: 系统:Ubuntu14.04 (安装教程包括CentOS6.5) PHP版本:PHP-5.5.10 swoole版本:1.7.7-stable 1.Timer定时...

太阳黑子 ⋅ 2016/10/28 ⋅ 0

php异步高并发扩展 swoole-1.6.11 版发布

简介: swoole是一个php版本的异步、高并发扩展,是国人被php官方pecl包收录的力作之一。 很高兴的通知大家,1.6.11版本发布了。 内核更新: - Disable by default asyncmysql (默认关闭asy...

半桶水_桶哥 ⋅ 2014/03/04 ⋅ 14

swoole项目开发思维转换 -- 长驻内存

从上篇的执行流程,可以得出第一个需要思维转换的点: Swoole是完全的长驻内存的 这个是和web开发第一个很大的不同,之前我们在做web开发,基本不怎么考虑内存控制的问题,这里从两个方面来进...

杨太化 ⋅ 2015/10/15 ⋅ 0

swoole/swoole-src

Swoole Swoole is an event-driven asynchronous & concurrent networking communication framework with high performance written only in C for PHP. Document: https://github.com/swool......

swoole ⋅ 2013/11/27 ⋅ 0

swoole-1.7.16 版本已发布,BUG 修复版本

PHP的异步并行网络扩展 swoole1.7.16 版本已发布,此版本为BUG修复版本,建议所有用户升级。下载地址: http://pecl.php.net/package/swoole https://github.com/swoole/swoole-src/releases...

matyhtf ⋅ 2015/05/11 ⋅ 28

基于SWOOLE的分布式SOCKET消息服务器架构

消息服务器使用socket,为避免服务器过载,单台只允许500个socket连接,当一台不够的时候,扩充消息服务器是必然,问题来了,如何让链接在不同消息服务器上的用户可以实现消息发送呢? 要实现...

tomener ⋅ 2016/06/16 ⋅ 0

使用Swoole+Yaf 监听MQ

主要用于短信验证码和语音验证码异步发送使用Swoole的定时器读取MQ队列,然后扔给Task异步执行,在onTask中调用Yaf实现发送 worker_num设置为2 1个用于监听短信验证码队列 1个用于监听语音验证...

发粪涂墙online ⋅ 2016/02/09 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

uWSGI + Django @ Ubuntu

创建 Django App Project 创建后, 可以看到路径下有一个wsgi.py的问题 uWSGI运行 直接命令行运行 利用如下命令, 可直接访问 uwsgi --http :8080 --wsgi-file dj/wsgi.py 配置文件 & 运行 [u...

袁祾 ⋅ 30分钟前 ⋅ 0

JVM堆的理解

在JVM中,我们经常提到的就是堆了,堆确实很重要,其实,除了堆之外,还有几个重要的模块,看下图: 大 多数情况下,我们并不需要关心JVM的底层,但是如果了解它的话,对于我们系统调优是非常...

不羁之后 ⋅ 昨天 ⋅ 0

推荐:并发情况下:Java HashMap 形成死循环的原因

在淘宝内网里看到同事发了贴说了一个CPU被100%的线上故障,并且这个事发生了很多次,原因是在Java语言在并发情况下使用HashMap造成Race Condition,从而导致死循环。这个事情我4、5年前也经历...

码代码的小司机 ⋅ 昨天 ⋅ 1

聊聊spring cloud gateway的RetryGatewayFilter

序 本文主要研究一下spring cloud gateway的RetryGatewayFilter GatewayAutoConfiguration spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/G......

go4it ⋅ 昨天 ⋅ 0

创建新用户和授予MySQL中的权限教程

导读 MySQL是一个开源数据库管理软件,可帮助用户存储,组织和以后检索数据。 它有多种选项来授予特定用户在表和数据库中的细微的权限 - 本教程将简要介绍一些选项。 如何创建新用户 在MySQL...

问题终结者 ⋅ 昨天 ⋅ 0

android -------- 颜色的半透明效果配置

最近有朋友问我 Android 背景颜色的半透明效果配置,我网上看资料,总结了一下, 开发中也是常常遇到的,所以来写篇博客 常用的颜色值格式有: RGB ARGB RRGGBB AARRGGBB 这4种 透明度 透明度...

切切歆语 ⋅ 昨天 ⋅ 0

CentOS开机启动subversion

建立自启动脚本: vim /etc/init.d/subversion 输入如下内容: #!/bin/bash## subversion startup script for the server## chkconfig: 2345 90 10# description: start the subve......

随风而飘 ⋅ 昨天 ⋅ 0

版本控制工具

CSV , SVN , GIT ,VSS

颖伙虫 ⋅ 昨天 ⋅ 0

【2018.06.19学习笔记】【linux高级知识 13.1-13.3】

13.1 设置更改root密码 13.2 连接mysql 13.3 mysql常用命令

lgsxp ⋅ 昨天 ⋅ 0

LVM

LVM: 硬盘划分分区成物理卷->物理卷组成卷组->卷组划分逻辑分区。 1.磁盘分区: fdisk /dev/sdb 划分几个主分区 输入t更改每个分区类型为8e(LVM) 使用partprobe生成分区的文件:如/dev/sd...

ZHENG-JY ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部