thinkphp6配置多个workerman

原创
2023/02/04 11:39
阅读数 829
  • 目录结构
app
  -| worker
     -| command
	     WorkerCommand.php
	     Cron.php
	 Cron.php
	 Server.php
config
  console.php
  worker.php
cron
  • WorkerCommand.php
<?php

namespace app\worker\command;

use Workerman\Worker;
use think\console\Input;
use think\facade\Config;
use think\console\Output;
use think\console\Command;
use think\console\input\Option;
use think\console\input\Argument;
use app\worker\Server as WorkerServer;

/**
 * Worker Server 命令行类
 */
abstract class WorkerCommand extends Command
{
    protected $name = '';
    protected $config = '';

    public function configure()
    {
        $this->setName($this->name)
            ->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
            ->addOption('daemon', 'd', Option::VALUE_NONE, 'Run the workerman server in daemon mode.')
            ->setDescription('Workerman Server');
    }

    public function execute(Input $input, Output $output)
    {
        $action = $input->getArgument('action');

        if (DIRECTORY_SEPARATOR !== '\\') {
            if (!in_array($action, ['start', 'stop', 'reload', 'restart', 'status', 'connections'])) {
                $output->writeln("<error>Invalid argument action:{$action}, Expected start|stop|restart|reload|status|connections .</error>");
                return false;
            }

            //例:php think worker:xxx start -d
            global $argv;
            array_shift($argv); //think
            $command = array_shift($argv); //command

            if (!isset($argv[0]) || $argv[0] != $action) {
                array_unshift($argv, $command, $action);
            } else {
                array_unshift($argv, $command);
            }
        } elseif ('start' != $action) {
            $output->writeln("<error>Not Support action:{$action} on Windows.</error>");
            return false;
        }

        $config = Config::get($this->config);

        if ('start' == $action) {
            $output->writeln('Starting Workerman server...');
        }

        // 自定义服务器入口类
        $class = (array) $config['worker_class'];

        foreach ($class as $server) {
            $this->startServer($server);
        }

        // Run worker
        Worker::runAll();
    }

    protected function startServer(string $class)
    {
        if (class_exists($class)) {
            $worker = new $class;
            if (!$worker instanceof WorkerServer) {
                $this->output->writeln("<error>Worker Server Class Must extends \\think\\worker\\Server</error>");
            }
        } else {
            $this->output->writeln("<error>Worker Server Class Not Exists : {$class}</error>");
        }
    }
}
  • Server.php
<?php
namespace app\worker;

use Workerman\Worker;

/**
 * Worker控制器扩展类
 */
abstract class Server
{
    protected const HEARTBEAT_TIME = 55;

    protected $worker;
    protected $socket   = '';
    protected $protocol = 'http';
    protected $host     = '0.0.0.0';
    protected $port     = '2346';
    protected $option   = [];
    protected $context  = [];
    protected $event    = ['onWorkerStart', 'onConnect', 'onMessage', 'onClose', 'onError', 'onBufferFull', 'onBufferDrain', 'onWorkerReload', 'onWebSocketConnect'];

    /**
     * 架构函数
     * @access public
     */
    public function __construct()
    {
        // 实例化 Websocket 服务
        $this->worker = new Worker($this->socket ?: $this->protocol . '://' . $this->host . ':' . $this->port, $this->context);

        // 设置参数
        if (!empty($this->option)) {
            foreach ($this->option as $key => $val) {
                $this->worker->$key = $val;
            }
        }

        // 设置回调
        foreach ($this->event as $event) {
            if (method_exists($this, $event)) {
                $this->worker->$event = [$this, $event];
            }
        }

        // 初始化
        $this->init();
    }

    protected function init()
    {
    }

    public function start()
    {
        Worker::runAll();
    }

    public function __set($name, $value)
    {
        $this->worker->$name = $value;
    }

    public function __call($method, $args)
    {
        call_user_func_array([$this->worker, $method], $args);
    }
}
  • Cron.php
<?php

declare(strict_types=1);

namespace app\worker;

use Workerman\Worker;
use Workerman\Crontab\Crontab;

/**
 * 计划任务
 */
class Cron extends Server
{
    protected $processes = 1;
    protected $worker_name = 'console_cron';

    /**
     * 架构函数
     *
     */
    public function __construct()
    {
        $this->worker = new Worker();
        $this->worker->count = $this->processes;
        $this->worker->name = $this->worker_name;

        // 设置回调
        foreach ($this->event as $event) {
            if (method_exists($this, $event)) {
                $this->worker->$event = [$this, $event];
            }
        }
    }

    /**
     * 设置参数
     *
     * @param array $option 参数
     * @return void
     */
    public function option(array $option)
    {
        // 设置参数
        if (!empty($option)) {
            foreach ($option as $key => $val) {
                $this->worker->$key = $val;
            }
        }
    }

    /**
     * onWorkerStart 事件回调
     *
     * @param \Workerman\Worker $worker
     * @return void
     */
    public function onWorkerStart(Worker $worker)
    {
    }
}
  • command/cron.php
<?php

namespace app\worker\command;

use app\worker\command\WorkerCommand;

/**
 * Worker Cron 命令行类
 */
class Cron extends WorkerCommand
{
    protected $name = 'worker:cron';
    protected $config = 'worker.cron';
}
  • config/console.php
return [
    'commands' => [
        'worker:cron'  => app\worker\command\Cron::class,
    ],
];
  • config/worker.php
<?php
return [
    'cron' => [
        'worker_class'    =>    [
            'app\worker\Cron'
        ],
    ],
];
  • cron
#!/usr/bin/env php
<?php
namespace think;

// 命令行入口文件
// 加载基础文件
require __DIR__ . '/vendor/autoload.php';

// 应用初始化
(new App())->console->run();

每个workerman都需要一个独立的入口

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部