swoole应答式消息发送重试功能实现

原创
2018/01/28 16:15
阅读数 714

用swoole做了个长连接TCP服务器,server和多台client之间交互是发送/应答的形式,这里面需要做一个消息重试机制,比如:

Server 发送一条消息 message_1 给 ClientA ,约定5秒重发一次,最多尝试3次,期间server如果收到 message_2 则自动取消重试。

首先想到的是,在发送message_1后,启动一个 swoole_timer_tick 定时器,设定间隔时间为5秒,同时将$timerId 保存到全局变量 array $timers 中例如:

//... 假设 $msg1 是一个封装的Msg对象,包含消息标识,数据等信息
$server->send($fd, $msg1->data);
//创建一个定时器
$timerID = \swoole_timer_tick(5000, function() use ($server,$fd,$msg1) {
    $server->send($fd,$msg1->data);
});
//保存$timerID, key是由fd和消息标识组成,保证每个连接每条消息的定时器不重复
$key = $fd.'_'.$msg1->id; //假设 id=1
$timers[$key] = $timerID;

当接收到消息的时候,清除定时器

$server->on('receive',function($server,$fd){
    global $timers;
     //解析消息过程略
    $key = $fd.'_1';
    if (isset($timers[$key])) {
        $server->clearTimer($key);
        unset($timers[$key]);
    }    
});

看起来没啥问题,但是首先需要知道一个重要的点:

定时器是进程隔离的!在一个woker进程中创建的定时器,哪怕把timerID存放到redis或者其他共享内存中,也无法在其他进程中通过timerID清除,在哪个进程中创建的定时器就必须在哪个进程中销毁。

不过幸运的是,如果你没有特别配置,swoole默认一个fd固定在一个woker上的,这里的代码并不会有什么问题。

但是如果在其他worker中主动向fd发送了一条消息,比如在运维端口下面 (多端口监听)向指定的client发送指定的消息,这时候由于执行发送程序是在运维的TCP链路上,并不一定和fd在同一个worker中,如果此时启动了定时器,但是接收消息的时候是在fd的worker中,clearTimer就会报错,定时器不存在,因为定时器是在运维worker中创建的。

这个问题困扰了我一天,甚至想过换语言,用go重写整个服务,去TMD恶心的进程间通信,最终还是找到了解决方法,用的是 pipeMessage。

大致步骤如下:

$server对象中有一个 woker_id 属性,当client连上来的时候可以把 fd,worker_id保存到 swoole_table或redis中。

然后发送消息封装一下,发送之前先判断一下当前worker_id和指定fd的worker_id是否一致,不一致则通过 $server->sendMessage() 发送给fd绑定的worker处理。

算是解决了这个问题吧。。。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部