小改下把tio-websocket-showcase变成可集群方式

原创
2018/05/10 11:41
阅读数 1.8W

    原先谭总(他不让我们叫老谭,他说他还小。。。不知道他哪里“小”,开个玩笑哈)的tio社区版是不带集群功能的,虽然大部分情况下已经能满足要求了,但是大家还是很关心集群方案,对于新手来说最好别太复杂,那就基于redis来做一个发布/订阅的方式达到多节点协作。

基于简单原则,就不考虑代码或者结构方面的修理了,直接基于谭总的websocket-showcase来改改。

大概原理相对来说好理解,类似如下图

客户端和服务端节点通过代理服务器来分发,服务端节点所有的接收到的消息均发布到redis指定频道,然后各个服务器节点去订阅此频道的消息,这样即使客户端不在同一个服务器节点均能接收到订阅消息。

实际上,以最简单的需求来说,各个节点之间只需要知道谁在线,谁离开,消息发送时能够到达对方,有这三个即可满足最简单的集群了,所以就这么干了。

先预览下代码修改处(社会我人傻话不多,直接上代码吧):

其中pojo包是封装用的bean,封装用户和消息体:

import java.io.Serializable;
import java.util.Set;

/**
 * 用户模型
 *
 * @author huanglin
 */
public class User implements Serializable {
    private String username;
    private Set<String> group;
    /**
     * 所在节点,只是为了方便后期的操作,暂时没用上
     */
    private String node;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public Set<String> getGroup() {
        return group;
    }

    public void setGroup(Set<String> group) {
        this.group = group;
    }

    public String getNode() {
        return node;
    }

    public void setNode(String node) {
        this.node = node;
    }
}
import java.io.Serializable;

/**
 * 消息bean对象.
 *
 * @author : huanglin
 * @version : 1.0
 * @since :2018/5/10 上午9:36
 */
public class Msg implements Serializable {
    private int action;
    private String msg;
    private String from;
    private String to;
    private String status;

    public int getAction() {
        return action;
    }

    public void setAction(int action) {
        this.action = action;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}

processor包是将握手后onAfterHandshaked、断开连接onBeforeClose、接收到文本onText消息时的抽象处理(多节点时要用到),其中DefaultServerProcessor只是将原来老谭的代码原原本本放回去,不做任何改动;ServerProcessorOnPubSub则是将这几个动作通过redis的发布订阅完成多节点协作(也算是集群了)消息互通。

抽象接口ServerProcessor:

import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;

/**
 * 主要的操作抽象
 *
 * @author huanglin
 */
public interface ServerProcessor {
    /**
     * 当关闭前做通知
     *
     * @param channelContext
     * @param throwable
     * @param remark
     * @param isRemove
     * @throws Exception
     */
    void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;

    /**
     * 握手成功后的通知
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @throws Exception
     */
    void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception;

    /**
     * 收到文本信息时的通知操作
     *
     * @param wsRequest
     * @param text
     * @param channelContext
     * @return
     * @throws Exception
     */
    Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception;
}

默认实现DefaultServerProcessor:

import net.hlin.wss.server.Const;
import net.hlin.wss.server.ShowcaseServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;

import java.util.Objects;

/**
 * 原来的showcase里面的东西不懂
 * @author huanglin
 */
public class DefaultServerProcessor implements ServerProcessor {

    private static Logger log = LoggerFactory.getLogger(DefaultServerProcessor.class);

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
        if (log.isInfoEnabled()) {
            log.info("onBeforeClose\r\n{}", channelContext);
        }

        WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute();

        if (wsSessionContext.isHandshaked()) {
            int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size();

            String msg = channelContext.getClientNode().toString() + " 离开了,现在共有【" + count + "】人在线";
            //用tio-websocket,服务器发送到客户端的Packet都是WsResponse
            WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET);
            //群发
            Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse);
        }
    }

    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        //绑定到群组,后面会有群发
        Aio.bindGroup(channelContext, Const.GROUP_ID);
        int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size();
        String msg = channelContext.getClientNode().toString() + " 进来了,现在共有【" + count + "】人在线";
        //用tio-websocket,服务器发送到客户端的Packet都是WsResponse
        WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET);
        //群发
        Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse);
    }

    @Override
    public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
        WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute();
        HttpRequest httpRequest = wsSessionContext.getHandshakeRequestPacket();//获取websocket握手包
        if (log.isDebugEnabled()) {
            log.debug("握手包:{}", httpRequest);
        }

        log.info("收到ws消息:{}", text);

        if (Objects.equals("心跳内容", text)) {
            return null;
        }

        String msg = channelContext.getClientNode().toString() + " 说:" + text;
        //用tio-websocket,服务器发送到客户端的Packet都是WsResponse
        WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET);
        //群发
        Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse);

        //返回值是要发送给客户端的内容,一般都是返回null
        return null;
    }
}

基于redis的发布订阅实现:

import cn.hutool.core.io.resource.ResourceUtil;
import com.alibaba.fastjson.JSON;
import net.hlin.wss.server.Const;
import net.hlin.wss.server.pojo.Msg;
import net.hlin.wss.server.pojo.User;
import net.hlin.wss.server.util.MsgUtil;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;

import java.io.IOException;

public class ServerProcessorOnPubSub implements ServerProcessor {

    private static Logger log = LoggerFactory.getLogger(ServerProcessorOnPubSub.class);
    private RedissonClient client;
    private RTopic<Msg> topic;

    public ServerProcessorOnPubSub() {
        try {
            Config config = Config.fromJSON(ResourceUtil.getStream("classpath:redisson.json"));
            client = Redisson.create(config);
            topic = client.getTopic(Const.WS_MSG_TOPIC_CHANNEL);
            subcribeMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        String username = channelContext.getUserid();
        //TODO 如查询当前用户所在组的功能
        //Set<String> groups = userService.getUserGroups(username);
        // for 循环 :Aio.bindGroup(channelContext, group);
        //不管之前是否已经登录,直接覆盖,实际业务时会有具体处理
        User user = new User();
        // user.setGroup(groups);
        user.setUsername(username);
        user.setNode(channelContext.getServerNode().toString());
        RBucket<User> userRBucket = client.getBucket(Const.WS_USER_PREFIX + username);
        userRBucket.set(user);
        log.info("用户{}加入", username);
    }

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
        String username = channelContext.getUserid();
        client.getBucket(Const.WS_USER_PREFIX + username).delete();
        log.info("用户{}离开", username);
    }

    @Override
    public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
        Msg msg = JSON.parseObject(text, Msg.class);
        topic.publish(msg);
        return null;
    }

    private void subcribeMsg() {
        topic.addListener(new MessageListener<Msg>() {
            @Override
            public void onMessage(String channel, Msg msg) {
                int action = msg.getAction();
                Msg respMsg = new Msg();
                //响应信息则直接返回给客户端即可
                if (action % 11 == 0 && MsgUtil.existsUser(msg.getTo())) {
                    //重新包装下后再发送
                    respMsg.setMsg(msg.getMsg());
                    respMsg.setAction(msg.getAction());
                    respMsg.setStatus(msg.getStatus());
                    MsgUtil.sendToUser(msg.getTo(), respMsg);
                } else {
                    respMsg.setTo(msg.getFrom());
                    respMsg.setStatus("200");
                    if (action == Const.Action.P2P_MSG_REQ.val()) {
                        respMsg.setAction(Const.Action.P2P_MSG_RESP.val());
                        if (MsgUtil.existsUser(msg.getTo())) {
                            MsgUtil.sendToUser(msg.getTo(), msg);
                            topic.publish(respMsg);
                        }
                    } else if (action == Const.Action.GROUP_MSG_REQ.val()) {
                        MsgUtil.sendToGroup(msg.getTo(), msg);
                        respMsg.setAction(Const.Action.GROUP_MSG_RESP.val());
                        topic.publish(respMsg);
                    }
                }
            }
        });
    }
}

其中工具包中的MsgUtil是封装了Aio的部分功能,简化代码目的:

import cn.hutool.json.JSONUtil;
import net.hlin.wss.server.ShowcaseServerConfig;
import net.hlin.wss.server.pojo.Msg;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.utils.lock.SetWithLock;
import org.tio.websocket.common.WsResponse;

import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 聊天工具类.
 *
 * @author : huanglin
 * @version : 1.0
 * @since :2018/5/8 上午11:23
 */
public class MsgUtil {

    public static boolean existsUser(String userId) {
        SetWithLock<ChannelContext> set = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId);
        if(set == null || set.size() < 1) {
            return false;
        }
        return true;
    }

    /**
     * 发送到指定用户
     * @param userId
     * @param message
     */
    public static void sendToUser(String userId, Msg message) {
        SetWithLock<ChannelContext> toChannleContexts = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId);
        if(toChannleContexts == null || toChannleContexts.size() < 1) {
            return;
        }
        ReentrantReadWriteLock.ReadLock readLock = toChannleContexts.getLock().readLock();
        readLock.lock();
        try{
            Set<ChannelContext> channels = toChannleContexts.getObj();
            for(ChannelContext channelContext : channels){
                send(channelContext, message);
            }
        }finally{
            readLock.unlock();
        }
    }

    /**
     * 功能描述:[发送到群组(所有不同协议端)]
     * @param group
     * @param msg
     */
    public static void sendToGroup(String group, Msg msg){
        if(msg == null) {
            return;
        }
        SetWithLock<ChannelContext> withLockChannels = Aio.getChannelContextsByGroup(ShowcaseServerConfig.groupContext, group);
        if(withLockChannels == null) {
            return;
        }
        ReentrantReadWriteLock.ReadLock readLock = withLockChannels.getLock().readLock();
        readLock.lock();
        try{
            Set<ChannelContext> channels = withLockChannels.getObj();
            if(channels != null && channels.size() > 0){
                for(ChannelContext channelContext : channels){
                    send(channelContext,msg);
                }
            }
        }finally{
            readLock.unlock();
        }
    }

    /**
     * 发送到指定通道;
     * @param channelContext
     * @param msg
     */
    public static void send(ChannelContext channelContext,Msg msg){
        if(channelContext == null) {
            return;
        }
        WsResponse response = WsResponse.fromText(JSONUtil.toJsonStr(msg), ShowcaseServerConfig.CHARSET);
        Aio.sendToId(channelContext.getGroupContext(), channelContext.getId(), response);
    }
}

 

其他几个用红色箭头的代码表示在原来的基础上有所改动。

常量类Const:

/**
 * @author tanyaowu
 * @modify huanglin
 */
public class Const {
	/**
	 * 用于群聊的group id
	 */
	public static final String GROUP_ID = "showcase-websocket";

	public static final String WS_MSG_TOPIC_CHANNEL = "WS_MSG_TOPIC_CHANNEL";

	public static final String WS_USER_PREFIX = "WS_USER_PREFIX:";

	/**
	 * 客户端和服务端的交互动作枚举
	 */
	public enum Action {
		/**
		 * 点对点消息请求
		 */
		P2P_MSG_REQ(3),
		/**
		 * 点对点消息响应
		 */
		P2P_MSG_RESP(33),
		/**
		 * 群组消息请求
		 */
		GROUP_MSG_REQ(4),
		/**
		 * 群组消息响应
		 */
		GROUP_MSG_RESP(44);

		
		private int action;
		Action(int action) {
			this.action = action;
		}
		public int val(){
			return this.action;
		}
	}
}

ShowcaseIpStatListener无变化,这里就不列了

ShowcaseServerAioListener:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.websocket.server.WsServerAioListener;

/**
 * @author tanyaowu
 * 用户根据情况来完成该类的实现
 * @modify huanglin
 */
public class ShowcaseServerAioListener extends WsServerAioListener {
	private static Logger log = LoggerFactory.getLogger(ShowcaseServerAioListener.class);

	public static final ShowcaseServerAioListener me = new ShowcaseServerAioListener();

	private ShowcaseServerAioListener() {

	}

	@Override
	public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
		super.onAfterConnected(channelContext, isConnected, isReconnect);
		if (log.isInfoEnabled()) {
			log.info("onAfterConnected\r\n{}", channelContext);
		}

	}

	@Override
	public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
		super.onAfterSent(channelContext, packet, isSentSuccess);
		if (log.isInfoEnabled()) {
			log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext);
		}
	}

	@Override
	public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
		super.onBeforeClose(channelContext, throwable, remark, isRemove);
		ShowcaseServerConfig.processor.onBeforeClose(channelContext, throwable, remark, isRemove);
	}

	@Override
	public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
		super.onAfterDecoded(channelContext, packet, packetSize);
		if (log.isInfoEnabled()) {
			log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext);
		}
	}

	@Override
	public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
		super.onAfterReceivedBytes(channelContext, receivedBytes);
		if (log.isInfoEnabled()) {
			log.info("onAfterReceivedBytes\r\n{}", channelContext);
		}
	}

	@Override
	public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
		super.onAfterHandled(channelContext, packet, cost);
		if (log.isInfoEnabled()) {
			log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext);
		}
	}

}

ShowcaseServerConfig:

增加部分

/**
 * 如果使用DefaultServerProcessor就是单节点,shiyong ServerProcessorOnPubSub就能集群了
 */
public static ServerProcessor processor;

/**
 * 给MsgUtil hold住实例,直接调用
 */
public static ServerGroupContext groupContext;

完整代码:

import net.hlin.wss.server.processor.ServerProcessor;
import org.tio.server.ServerGroupContext;
import org.tio.utils.time.Time;

/**
 * @author tanyaowu
 * @modify Huang lin
 *
 */
public abstract class ShowcaseServerConfig {
	/**
	 * 协议名字(可以随便取,主要用于开发人员辨识)
	 */
	public static final String PROTOCOL_NAME = "showcase";
	
	public static final String CHARSET = "utf-8";
	/**
	 * 监听的ip null表示监听所有,并不指定ip
	 */
	public static final String SERVER_IP = null;

	/**
	 * 监听端口
	 */
	public static final int SERVER_PORT = 9326;

	/**
	 * 心跳超时时间,单位:毫秒
	 */
	public static final int HEARTBEAT_TIMEOUT = 1000 * 60;

	/**
	 * 如果使用DefaultServerProcessor就是单节点,shiyong ServerProcessorOnPubSub就能集群了
	 */
	public static ServerProcessor processor;

	/**
	 * 给MsgUtil hold住实例,直接调用
	 */
	public static ServerGroupContext groupContext;

	/**
	 * ip数据监控统计,时间段
	 * @author tanyaowu
	 *
	 */
	public interface IpStatDuration {
		Long DURATION_1 = Time.MINUTE_1 * 5;
		Long[] IP_STAT_DURATIONS = new Long[] { DURATION_1 };
	}

}

ShowcaseWebsocketStarter:

import java.io.IOException;

import net.hlin.wss.server.processor.ServerProcessorOnPubSub;
import org.apache.commons.lang3.StringUtils;
import org.tio.core.ssl.SslConfig;
import org.tio.server.ServerGroupContext;
import org.tio.websocket.server.WsServerStarter;

/**
 * @author tanyaowu
 * 2017年6月28日 下午5:34:04
 */
public class ShowcaseWebsocketStarter {

	private WsServerStarter wsServerStarter;
	private ServerGroupContext serverGroupContext;

	/**
	 *
	 * @author tanyaowu
	 */
	public ShowcaseWebsocketStarter(int port, ShowcaseWsMsgHandler wsMsgHandler) throws Exception {
		wsServerStarter = new WsServerStarter(port, wsMsgHandler);

		serverGroupContext = wsServerStarter.getServerGroupContext();
		serverGroupContext.setName(ShowcaseServerConfig.PROTOCOL_NAME);
		serverGroupContext.setServerAioListener(ShowcaseServerAioListener.me);

		//设置ip统计时间段
		serverGroupContext.ipStats.addDurations(ShowcaseServerConfig.IpStatDuration.IP_STAT_DURATIONS);
		//设置ip监控
		serverGroupContext.setIpStatListener(ShowcaseIpStatListener.me);
		//设置心跳超时时间
		serverGroupContext.setHeartbeatTimeout(ShowcaseServerConfig.HEARTBEAT_TIMEOUT);
		//如果你希望通过wss来访问,就加上下面这一行吧,不过首先你得有证书哦
		//initSsl(serverGroupContext);
	}
	
	private static void initSsl(ServerGroupContext serverGroupContext) throws Exception {
		String keyStoreFile = "classpath:config/ssl/keystore.jks";
		String trustStoreFile = "classpath:config/ssl/keystore.jks";
		String keyStorePwd = "214323428310224";

		if (StringUtils.isNotBlank(keyStoreFile) && StringUtils.isNotBlank(trustStoreFile)) {
			SslConfig sslConfig = SslConfig.forServer(keyStoreFile, trustStoreFile, keyStorePwd);
			serverGroupContext.setSslConfig(sslConfig);
		}
	}

	/**
	 * @author tanyaowu
	 * @throws IOException
	 */
	public static void start() throws Exception {
		ShowcaseWebsocketStarter appStarter = new ShowcaseWebsocketStarter(ShowcaseServerConfig.SERVER_PORT, ShowcaseWsMsgHandler.me);
		ShowcaseServerConfig.processor = new ServerProcessorOnPubSub();
		ShowcaseServerConfig.groupContext = appStarter.getServerGroupContext();
		appStarter.wsServerStarter.start();
	}

	/**
	 * @return the serverGroupContext
	 */
	public ServerGroupContext getServerGroupContext() {
		return serverGroupContext;
	}

	public WsServerStarter getWsServerStarter() {
		return wsServerStarter;
	}
	
	public static void main(String[] args) throws Exception {
		start();
	}

}

ShowcaseWsMsgHandler:

import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.server.handler.IWsMsgHandler;

/**
 * @author tanyaowu
 * 2017年6月28日 下午5:32:38
 */
public class ShowcaseWsMsgHandler implements IWsMsgHandler {
	private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class);

	public static ShowcaseWsMsgHandler me = new ShowcaseWsMsgHandler();

	private ShowcaseWsMsgHandler() {

	}

	/**
	 * 握手时走这个方法,业务可以在这里获取cookie,request参数等
	 */
	@Override
	public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
		String username = request.getParam("username");
		//不做合法性处理了,根据具体业务来处理就好了
		if (StrUtil.isNotEmpty(username)) {
			Aio.bindUser(channelContext, username);
			return httpResponse;
		}
		return null;
	}

	/** 
	 * @param httpRequest
	 * @param httpResponse
	 * @param channelContext
	 * @throws Exception
	 * @author tanyaowu
	 */
	@Override
	public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
		ShowcaseServerConfig.processor.onAfterHandshaked(httpRequest, httpResponse, channelContext);
	}

	/**
	 * 字节消息(binaryType = arraybuffer)过来后会走这个方法
	 */
	@Override
	public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
		return null;
	}

	/**
	 * 当客户端发close flag时,会走这个方法
	 */
	@Override
	public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
		Aio.remove(channelContext, "receive close flag");
		return null;
	}

	/**
	 * 字符消息(binaryType = blob)过来后会走这个方法
	 */
	@Override
	public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
		return ShowcaseServerConfig.processor.onText(wsRequest, text, channelContext);
	}

}

最后来看看运行结果(启用了两个端口分别启动了2个服务器节点):

源码请移步:https://gitee.com/hlinwork/tio-websocket-showcase

展开阅读全文
打赏
3
24 收藏
分享
加载中
$UNKNOWN:1, isClosed:true, isRemoved:true ,怎么处理,我看源码是直接抛出来的。
2020/05/24 23:22
回复
举报
后天雨街博主
补上了一句规范化心跳逻辑
2018/05/11 11:50
回复
举报
现在已经不分社区版和自用版了,全部开源了
2018/05/10 20:09
回复
举报
赞一个,给力!
2018/05/10 16:22
回复
举报
后天雨街博主
该评论暂时无法显示,详情咨询 QQ 群:912889742
后天雨街博主

引用来自“talent-tan”的评论

这速度可以啊,tio-websocket-showcase才出来没几天,你这集群版的就出来了

建议:源代码放@红薯 家的码云,然后给博客打一下tag,否则搜都搜不到
@talent-tan 通过fork你的代码后来做的,码云地址:https://gitee.com/hlinwork/tio-websocket-showcase
2018/05/10 14:18
回复
举报
mark一哈
2018/05/10 13:24
回复
举报
这速度可以啊,tio-websocket-showcase才出来没几天,你这集群版的就出来了

建议:源代码放@红薯 家的码云,然后给博客打一下tag,否则搜都搜不到
2018/05/10 13:12
回复
举报
后天雨街博主
忘记了,maven依赖多加一个redisson的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
2018/05/10 11:46
回复
举报
更多评论
打赏
9 评论
24 收藏
3
分享
返回顶部
顶部