文档章节

swoole应答式消息发送重试功能实现

吾爱
 吾爱
发布于 2018/01/28 16:15
字数 690
阅读 129
收藏 1

用swoole做了个长连接TCP服务器,server和多台client之间交互是发送/应答的形式,这里面需要做一个消息重试机制,比如:

Server 发送一条消息 message_1 给 ClientA ,约定5秒重发一次,最多尝试3次,期间server如果收到 message_2 则自动取消重试。

首先想到的是,在发送message_1后,启动一个 swoole_timer_tick 定时器,设定间隔时间为5秒,同时将$timerId 保存到全局变量 array $timers 中例如:

//... 假设 $msg1 是一个封装的Msg对象,包含消息标识,数据等信息
$server->send($fd, $msg1->data);
//创建一个定时器
$timerID = \swoole_timer_tick(5000, function() use ($server,$fd,$msg1) {
    $server->send($fd,$msg1->data);
});
//保存$timerID, key是由fd和消息标识组成,保证每个连接每条消息的定时器不重复
$key = $fd.'_'.$msg1->id; //假设 id=1
$timers[$key] = $timerID;

当接收到消息的时候,清除定时器

$server->on('receive',function($server,$fd){
    global $timers;
     //解析消息过程略
    $key = $fd.'_1';
    if (isset($timers[$key])) {
        $server->clearTimer($key);
        unset($timers[$key]);
    }    
});

看起来没啥问题,但是首先需要知道一个重要的点:

定时器是进程隔离的!在一个woker进程中创建的定时器,哪怕把timerID存放到redis或者其他共享内存中,也无法在其他进程中通过timerID清除,在哪个进程中创建的定时器就必须在哪个进程中销毁。

不过幸运的是,如果你没有特别配置,swoole默认一个fd固定在一个woker上的,这里的代码并不会有什么问题。

但是如果在其他worker中主动向fd发送了一条消息,比如在运维端口下面 (多端口监听)向指定的client发送指定的消息,这时候由于执行发送程序是在运维的TCP链路上,并不一定和fd在同一个worker中,如果此时启动了定时器,但是接收消息的时候是在fd的worker中,clearTimer就会报错,定时器不存在,因为定时器是在运维worker中创建的。

这个问题困扰了我一天,甚至想过换语言,用go重写整个服务,去TMD恶心的进程间通信,最终还是找到了解决方法,用的是 pipeMessage。

大致步骤如下:

$server对象中有一个 woker_id 属性,当client连上来的时候可以把 fd,worker_id保存到 swoole_table或redis中。

然后发送消息封装一下,发送之前先判断一下当前worker_id和指定fd的worker_id是否一致,不一致则通过 $server->sendMessage() 发送给fd绑定的worker处理。

算是解决了这个问题吧。。。

© 著作权归作者所有

共有 人打赏支持
吾爱
粉丝 143
博文 268
码字总数 90617
作品 0
后端工程师
私信 提问
Apache ActiveMQ实战(1)-基本安装配置与消息类型

ActiveMQ简介 ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都...

lifetragedy
2016/07/06
0
0
swoole实现Timer定时器、心跳检测及Task进阶实例:mysql连接池

Table of Contents 1.Timer定时器 2.心跳检测 3.Task进阶:MySQL连接池 环境说明: 系统:Ubuntu14.04 (安装教程包括CentOS6.5) PHP版本:PHP-5.5.10 swoole版本:1.7.7-stable 1.Timer定时...

太阳黑子
2016/10/28
127
0
基于SWOOLE的分布式SOCKET消息服务器架构

消息服务器使用socket,为避免服务器过载,单台只允许500个socket连接,当一台不够的时候,扩充消息服务器是必然,问题来了,如何让链接在不同消息服务器上的用户可以实现消息发送呢? 要实现...

tomener
2016/06/16
339
0
PHP的异步并行扩展Swoole发布1.7版本

Swoole 1.7.0 发布了,该版本主要改进内容包括: reactor线程与writer线程合并 对send优化,加入out_buffer机制 增加AIO异步读写文件的API 增加DNS异步查询函数 swoole_client在php-fpm或apa...

matyhtf
2014/04/17
6.1K
39
RocketMQ源码分析(二)Producer端发送数据

1 系列 - 整体架构图- producer端发送消息- broker端接收消息- broker端消息的存储- consumer消费消息- 分布式事务的实现- 定时消息的实现- 关于顺序消费话题- 关于重复消息话题- 关于高可用...

乒乓狂魔
2016/10/09
469
1

没有更多内容

加载失败,请刷新页面

加载更多

《货币商人》读后感作文选登3800字

《货币商人》读后感作文选登3800字: 领导之法、管理之术的大智慧与小技巧(宝安支行纪委书记葛希) 非常感谢夏书记向我们推荐了这本《货币商人》。这本书我读第一遍时惊现它像一个宝藏,蕴藏...

原创小博客
31分钟前
1
0
面试之ssm粗略简答

说实在的,spring源码对我来说可能就是报错的时候会一个个点进去找错误源头,其他都是为了让自己学习大神们优秀的编程思想和理念(顺便面试的时候吹吹牛皮~) 这次zhjj就直接抛了一个范围很...

无极之岚
32分钟前
2
0
史上最强Dubbo面试25题含答案详解:核心组件+架构设计+服务治理等

1.Dubbo是什么? Dubbo 是一个分布式、高性能、透明化的 RPC 服务框架,提供服务自动注册、自动发现等高效服务治理方案, 可以和 Spring 框架无缝集成。 RPC 指的是远程调用协议,也就是说两...

mikechen优知
56分钟前
2
0
如何正确的选择云数据库?

本文由云+社区发表 作者:数据库 江湖传说在选择和使用云数据库过程中 10个人有9个会遇到以下问题: 数据库正常使用过程中莫名卡顿 经常遭遇主从延迟和主从不一致 不知如何实现无损跨云跨数据...

腾讯云加社区
57分钟前
1
0
虚拟机下centos7.x简易命令大全与试玩体验

OS: liunx version: centos7.x date: 2019-01-18 1. cd / : 进入服务器根目录 2. cd .. : 进入当前目录的上一级 3. ls : 显示当前目录下的所有文件夹或文件(list的缩写) 4. ip addr : 展示服...

皇冠小丑
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部