smart-socket实战:玩转心跳消息

原创
2020/05/02 23:18
阅读数 2W

一、背景

在通信中设计的心跳消息,通常是为了检查网络链路是否正常。虽然TCP协议提供keep-alive机制,但需要在链路空闲2小时后才触发检测,这显然对业务非常不友好。当存在大量连接异常,而服务端却需要等2个小时后才感知到的时候,有限的系统资源会被逐渐耗尽,最终无法为新连接请求继续提供服务。

二、原理

要解决此类问题,业界的普遍做法是在应用层加入心跳机制。心跳消息可以是单向心跳也可以是双向心跳,所谓单向心跳表示由服务端或者客户端的其中一方主动发送心跳请求消息,而另一方返回响应消息(如下图)。双向心跳表示服务端与客户端相互发送心跳请求和响应。因为无论何种类型,实现方案都是一样的,本文以单向心跳为例给大家做讲解。

三、方案

心跳消息通常是周期性的发送,或者是在链路空闲一定时长后触发。如果经历几个周期后都未收到响应,则可以视为链路异常。此时可以继续尝试发送心跳,也可以执行告警并断开连接。

在 smart-socket 中我们提供了现成的心跳插件 HeartPlugin,可以很方便的实现心跳。本文是假定读者朋友对 smart-socket 已有了初步的了解,所以不会涉及 smart-socket 的基础使用,重点描述如何在服务中集成心跳插件。

3.1 HeartPlugin插件概述

3.1.1 心跳策略

在HeartPlugin中有三种心跳策略可供选择,通过选择不同的构造方案确定。

  1. HeartPlugin(int heartRate, TimeUnit timeUnit)
    heartRate 表示心跳消息的发送频率;timeUnit 表示 heartRate 的数值单位。例如:heartRate=3,timeUnit=TimeUnit.SECONDS,表示每 3秒钟发送一次心跳。heartRate=2000,timeUnit=TimeUnit.MILLISECONDS,表示每 2秒钟发送一次心跳。该策略为周期性发送心跳消息,无论对方是否返回响应。
  2. HeartPlugin(int heartRate, int timeout, TimeUnit unit)
    该构造方法相较前一个多出一个参数:timeout(过期时间),必须大于heartRate。如果在timeout时长内发送的心跳消息都没有收到响应消息,则视为链路异常并且该链路会被关闭,释放资源。
  3. HeartPlugin(int heartRate, int timeout, TimeUnit timeUnit, TimeoutCallback timeoutCallback)
    该构造方法支持指定超时回调策略 timeoutCallback,其实上一个构造方法就是设置了超时断链策略。如果不满足业务所需,用户可按需定义。

3.1.2 心跳的识别与触发

心跳策略确定好后,下一步就是如何去发送心跳消息,以及如何识别收到的消息是否为响应消息。在 HeartPlugin 中已经定义了这两个接口,需要开发人员去实现处理逻辑:

  • sendHeartRequest
    发送心跳。HeartPlugin 在判断某个连接需要触发心跳后,会执行该方法。用户需要在该方法中实现心跳消息的编码并输出数据。
    public void sendHeartRequest(AioSession session) throws IOException{
        WriteBuffer writeBuffer = session.writeBuffer();
        byte[] heartBytes = "heart_req".getBytes();
        writeBuffer.writeInt(heartBytes.length);
        writeBuffer.write(heartBytes);
        writeBuffer.flush();
     }
    
  • isHeartMessage
    请求消息识别。true:表示本次收到的是心跳消息(请求/响应);false:其他业务消息,交由MessageProcessor#processor处理。
    public boolean isHeartMessage(AioSession session, String msg) {
        //心跳请求消息,返回响应
        if("heart_req".equals(msg)){
            try {
                WriteBuffer writeBuffer = session.writeBuffer();
                byte[] heartBytes = "heart_rsp".getBytes();
                writeBuffer.writeInt(heartBytes.length);
                writeBuffer.write(heartBytes);
                writeBuffer.flush();
            }catch (Exception e){
            }
            return true;
        }
        //是否为心跳响应消息
        return "heart_rsp".equals(msg);
    }
    

3.2 代码演示

3.2.1 服务端

public class HeartServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(HeartServer.class);

    public static void main(String[] args) throws IOException {
        //定义消息处理器
        AbstractMessageProcessor<String> processor = new AbstractMessageProcessor<String>() {
            @Override
            public void process0(AioSession<String> session, String msg) {
                LOGGER.info("收到客户端:{}消息:{}", session.getSessionID(), msg);
            }

            @Override
            public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
                switch (stateMachineEnum) {
                    case SESSION_CLOSED:
                        LOGGER.info("客户端:{} 断开连接", session.getSessionID());
                        break;
                }
            }
        };

        //注册心跳插件:每隔1秒发送一次心跳请求,5秒内未收到消息超时关闭连接
        processor.addPlugin(new HeartPlugin<String>(1, 5, TimeUnit.SECONDS) {
            @Override
            public void sendHeartRequest(AioSession session) throws IOException {
                WriteBuffer writeBuffer = session.writeBuffer();
                byte[] heartBytes = "heart_req".getBytes();
                writeBuffer.writeInt(heartBytes.length);
                writeBuffer.write(heartBytes);
                writeBuffer.flush();
            }

            @Override
            public boolean isHeartMessage(AioSession session, String msg) {
                //心跳请求消息,返回响应
                if ("heart_req".equals(msg)) {
                    try {
                        WriteBuffer writeBuffer = session.writeBuffer();
                        byte[] heartBytes = "heart_rsp".getBytes();
                        writeBuffer.writeInt(heartBytes.length);
                        writeBuffer.write(heartBytes);
                        writeBuffer.flush();
                    } catch (Exception e) {
                    }
                    return true;
                }
                //是否为心跳响应消息
                if ("heart_rsp".equals(msg)) {
                    LOGGER.info("收到来自客户端:{} 的心跳响应消息", session.getSessionID());
                    return true;
                }
                return false;
            }
        });

        //启动服务
        AioQuickServer<String> server = new AioQuickServer<>(8888, new StringProtocol(), processor);
        server.start();
    }
}

3.2.2 客户端

  • client_1:接受服务端的心跳消息,不做任何回应
  • client_2:及时响应服务端的心跳消息
public class HeartClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartClient.class);


    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        AbstractMessageProcessor<String> client_1_processor = new AbstractMessageProcessor<String>() {
            @Override
            public void process0(AioSession<String> session, String msg) {
                LOGGER.info("client_1 收到服务端消息:" + msg);
            }

            @Override
            public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
                LOGGER.info("stateMachineEnum:{}", stateMachineEnum);
            }
        };
        AioQuickClient<String> client_1 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_1_processor);
        client_1.start();

        AbstractMessageProcessor<String> client_2_processor = new AbstractMessageProcessor<String>() {
            @Override
            public void process0(AioSession<String> session, String msg) {
                LOGGER.info("client_2 收到服务端消息:" + msg);
                try {
                    if ("heart_req".equals(msg)) {
                        WriteBuffer writeBuffer = session.writeBuffer();
                        byte[] heartBytes = "heart_rsp".getBytes();
                        writeBuffer.writeInt(heartBytes.length);
                        writeBuffer.write(heartBytes);
                        LOGGER.info("client_2 发送心跳响应消息");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
                LOGGER.info("stateMachineEnum:{}", stateMachineEnum);
            }
        };
        AioQuickClient<String> client_2 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_2_processor);
        client_2.start();
    }
}

3.2.3观察控制台

服务端

客户端

总结

本文围绕着心跳原理作了简单的实践分享。现实场景中如果对接的设备数量高达几万,甚至十几万,本文的心跳方案是否依旧适用,欢迎一起交流讨论。

本文涉及到的示例代码可从smart-socket仓库中下载

展开阅读全文
加载中
点击加入讨论🔥(1) 发布并加入讨论🔥
打赏
1 评论
13 收藏
3
分享
返回顶部
顶部