smart-socket实战:服务端主动Push消息至客户端

原创
2020/04/25 21:48
阅读数 1.5W

在通信场景中比较常见的模式为客户端发送请求给服务端,服务端再回以响应。还有一种通信模式为服务端主动Push消息给客户端,这种通信通常有两种场景。

场景一
某个客户端发送指令给服务端,触发服务端push消息至其他客户端,例如:IM。

场景二
服务端基于某种业务场景主动Push消息至相连的客户端,例如:APP消息推送。


本文以场景一为例演示如何通过smart-socket实现Push消息下发,首先我们需要定义三个角色:

  • SendClient:消息发送者,该客户端会发送消息至服务端,再由服务端push至其他客户端。
  • ReceiverClient:消息接收者,接收服务端Push过来的消息。
  • PushServer:Push服务端,接收 SendClient 发送的消息,并转发给其他客户端ReceiverClient。

第一步:定义协议

通信编程的首要步骤,则是定义通信协议。出于演示目的,我们采用length+data的协议格式,即采用4个字节长度的int值表示消息头,而该int数值的大小代表着消息体的长度。SendClient与PushServer,PushServer与ReceiverClient皆采用此协议通信。

public class StringProtocol implements Protocol<String> {

    @Override
    public String decode(ByteBuffer readBuffer, AioSession<String> session) {
        int remaining = readBuffer.remaining();
        if (remaining < Integer.BYTES) {
            return null;
        }
        readBuffer.mark();
        int length = readBuffer.getInt();
        if (length > readBuffer.remaining()) {
            readBuffer.reset();
            return null;
        }
        byte[] b = new byte[length];
        readBuffer.get(b);
        readBuffer.mark();
        return new String(b);
    }
}

第二步:Push服务端处理器

PushServer的处理器需要具备以下几方面能力:

  1. 维护所有客户端连接。客户端与服务端建立连接后将 AioSession 存放至 sessionMap 中,断开连接时则从Map中移除掉。
  2. 接收SendClient发送的消息,并Push给其他客户端。
public class PushServerProcessorMessage implements MessageProcessor<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushServerProcessorMessage.class);
    private Map<String, AioSession<String>> sessionMap = new ConcurrentHashMap<>();

    @Override
    public void process(AioSession<String> session, String msg) {
        LOGGER.info("收到SendClient发送的消息:{}", msg);
        byte[] bytes = msg.getBytes();
        sessionMap.values().forEach(onlineSession -> {
            if (session == onlineSession) {
                return;
            }
            WriteBuffer writeBuffer = onlineSession.writeBuffer();
            try {
                LOGGER.info("发送Push至ReceiverClient:{}", onlineSession.getSessionID());
                writeBuffer.writeInt(bytes.length);
                writeBuffer.write(bytes);
                writeBuffer.flush();
            } catch (Exception e) {
                LOGGER.error("Push消息异常", e);
            }
        });
    }

    @Override
    public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
        switch (stateMachineEnum) {
            case NEW_SESSION:
                LOGGER.info("与客户端:{} 建立连接", session.getSessionID());
                sessionMap.put(session.getSessionID(), session);
                break;
            case SESSION_CLOSED:
                LOGGER.info("断开客户端连接: {}", session.getSessionID());
                sessionMap.remove(session.getSessionID());
                break;
            default:
        }
    }
}

第三步:ReceiverClient处理器

本文简化了消息接收者的处理逻辑,只是打印一行日志用于观察。实际应用中需要根据收到的消息执行一些业务逻辑。

public class PushClientProcessorMessage implements MessageProcessor<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushClientProcessorMessage.class);

    @Override
    public void process(AioSession<String> session, String msg) {
        LOGGER.info("ReceiverClient:{} 收到Push消息:{}", session.getSessionID(), msg);
    }

    @Override
    public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {

    }
}

第四步:启动服务

启动服务端:PushServer

public class PushServer {
    public static void main(String[] args) throws IOException {
        AioQuickServer<String> server = new AioQuickServer<>(8080, new StringProtocol(), new PushServerProcessorMessage());
        server.start();
    }
}

启动接收者:ReceiverClient

public class ReceiverClient {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
        StringProtocol protocol = new StringProtocol();
        PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage();
        AioQuickClient<String>[] clients = new AioQuickClient[4];
        for (int i = 0; i < clients.length; i++) {
            clients[i] = new AioQuickClient<>("localhost", 8080, protocol, clientProcessorMessage);
            clients[i].start(channelGroup);
        }
    }
}

启动发送者:SenderClient

public class SenderClient {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        StringProtocol protocol = new StringProtocol();
        PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage();
        AioQuickClient<String> clients = new AioQuickClient("localhost", 8080, protocol, clientProcessorMessage);
        AioSession<String> session = clients.start();
        byte[] msg = "HelloWorld".getBytes();
        while (true) {
            WriteBuffer writeBuffer = session.writeBuffer();
            writeBuffer.writeInt(msg.length);
            writeBuffer.write(msg);
            writeBuffer.flush();
            Thread.sleep(1000);
        }
    }
}

第五步:观察控制台

SenderClient每秒中发送一条:“HelloWorld” 消息至 PushServer。观察 PushServer控制台 可以看到服务端接收到消息之后,会即可转发至 ReceiverClient。

然后再去观察 ReceiverClient控制台,则会打印服务端Push过来的消息。

最后

本文通过一个简单的示例,演示了Push服务的实现原理。实际场景下还包括很多可靠性方面的问题需要考虑,感兴趣的读者可自行研究。

本文涉及到的示例代码可从smart-socket仓库中下载

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
7 收藏
2
分享
返回顶部
顶部