文档章节

Netty实现服务端客户端长连接通讯及心跳检测

robin-yao
 robin-yao
发布于 2015/04/11 21:35
字数 1334
阅读 9257
收藏 264

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

      通过netty实现服务端与客户端的长连接通讯,及心跳检测。

       基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

     设计消息类型:

public enum  MsgType {
    PING,ASK,REPLY,LOGIN
}

    Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg  implements Serializable {
    private static final long serialVersionUID = 1L;
    private MsgType type;
    //必须唯一,否者会出现channel调用混乱
    private String clientId;

    //初始化客户端id
    public BaseMsg() {
        this.clientId = Constants.getClientId();
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public MsgType getType() {
        return type;
    }

    public void setType(MsgType type) {
        this.type = type;
    }
}

常量设置:

public class Constants {
    private static String clientId;
    public static String getClientId() {
        return clientId;
    }
    public static void setClientId(String clientId) {
        Constants.clientId = clientId;
    }
}

登录类型消息:

public class LoginMsg extends BaseMsg {
    private String userName;
    private String password;
    public LoginMsg() {
        super();
        setType(MsgType.LOGIN);
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

 心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {
    public PingMsg() {
        super();
        setType(MsgType.PING);
    }
}

请求类型消息:

public class AskMsg extends BaseMsg {
    public AskMsg() {
        super();
        setType(MsgType.ASK);
    }
    private AskParams params;

    public AskParams getParams() {
        return params;
    }

    public void setParams(AskParams params) {
        this.params = params;
    }
}
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable {
    private static final long serialVersionUID = 1L;
    private String auth;

    public String getAuth() {
        return auth;
    }

    public void setAuth(String auth) {
        this.auth = auth;
    }
}

响应类型消息:

public class ReplyMsg extends BaseMsg {
    public ReplyMsg() {
        super();
        setType(MsgType.REPLY);
    }
    private ReplyBody body;

    public ReplyBody getBody() {
        return body;
    }

    public void setBody(ReplyBody body) {
        this.body = body;
    }
}
//相应类型body对像
public class ReplyBody implements Serializable {
    private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
    private String clientInfo;

    public ReplyClientBody(String clientInfo) {
        this.clientInfo = clientInfo;
    }

    public String getClientInfo() {
        return clientInfo;
    }

    public void setClientInfo(String clientInfo) {
        this.clientInfo = clientInfo;
    }
}
public class ReplyServerBody extends ReplyBody {
    private String serverInfo;
    public ReplyServerBody(String serverInfo) {
        this.serverInfo = serverInfo;
    }
    public String getServerInfo() {
        return serverInfo;
    }
    public void setServerInfo(String serverInfo) {
        this.serverInfo = serverInfo;
    }
}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {
    private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
    public static void add(String clientId,SocketChannel socketChannel){
        map.put(clientId,socketChannel);
    }
    public static Channel get(String clientId){
       return map.get(clientId);
    }
    public static void remove(SocketChannel socketChannel){
        for (Map.Entry entry:map.entrySet()){
            if (entry.getValue()==socketChannel){
                map.remove(entry.getKey());
            }
        }
    }

}

Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //channel失效,从Map中移除
        NettyChannelMap.remove((SocketChannel)ctx.channel());
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

        if(MsgType.LOGIN.equals(baseMsg.getType())){
            LoginMsg loginMsg=(LoginMsg)baseMsg;
            if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
                //登录成功,把channel存到服务端的map中
                NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
                System.out.println("client"+loginMsg.getClientId()+" 登录成功");
            }
        }else{
            if(NettyChannelMap.get(baseMsg.getClientId())==null){
                    //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
                    LoginMsg loginMsg=new LoginMsg();
                    channelHandlerContext.channel().writeAndFlush(loginMsg);
            }
        }
        switch (baseMsg.getType()){
            case PING:{
                PingMsg pingMsg=(PingMsg)baseMsg;
                PingMsg replyPing=new PingMsg();
                NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
            }break;
            case ASK:{
                //收到客户端的请求
                AskMsg askMsg=(AskMsg)baseMsg;
                if("authToken".equals(askMsg.getParams().getAuth())){
                    ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
                    ReplyMsg replyMsg=new ReplyMsg();
                    replyMsg.setBody(replyBody);
                    NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
                }
            }break;
            case REPLY:{
                //收到客户端回复
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
                System.out.println("receive client msg: "+clientBody.getClientInfo());
            }break;
            default:break;
        }
        ReferenceCountUtil.release(baseMsg);
    }
}

ServerBootstrap:

public class NettyServerBootstrap {
    private int port;
    private SocketChannel socketChannel;
    public NettyServerBootstrap(int port) throws InterruptedException {
        this.port = port;
        bind();
    }

    private void bind() throws InterruptedException {
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup worker=new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        //保持长连接状态
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline p = socketChannel.pipeline();
                p.addLast(new ObjectEncoder());
                p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                p.addLast(new NettyServerHandler());
            }
        });
        ChannelFuture f= bootstrap.bind(port).sync();
        if(f.isSuccess()){
            System.out.println("server start---------------");
        }
    }
    public static void main(String []args) throws InterruptedException {
        NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
        while (true){
            SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
            if(channel!=null){
                AskMsg askMsg=new AskMsg();
                channel.writeAndFlush(askMsg);
            }
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
    //利用写空闲发送心跳检测消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    PingMsg pingMsg=new PingMsg();
                    ctx.writeAndFlush(pingMsg);
                    System.out.println("send ping to server----------");
                    break;
                default:
                    break;
            }
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
        MsgType msgType=baseMsg.getType();
        switch (msgType){
            case LOGIN:{
                //向服务器发起登录
                LoginMsg loginMsg=new LoginMsg();
                loginMsg.setPassword("yao");
                loginMsg.setUserName("robin");
                channelHandlerContext.writeAndFlush(loginMsg);
            }break;
            case PING:{
                System.out.println("receive ping from server----------");
            }break;
            case ASK:{
                ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
                ReplyMsg replyMsg=new ReplyMsg();
                replyMsg.setBody(replyClientBody);
                channelHandlerContext.writeAndFlush(replyMsg);
            }break;
            case REPLY:{
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
                System.out.println("receive client msg: "+replyServerBody.getServerInfo());
            }
            default:break;
        }
        ReferenceCountUtil.release(msgType);
    }
}

bootstrap

public class NettyClientBootstrap {
    private int port;
    private String host;
    private SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
    public NettyClientBootstrap(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }
    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);
        bootstrap.remoteAddress(host,port);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
                socketChannel.pipeline().addLast(new ObjectEncoder());
                socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                socketChannel.pipeline().addLast(new NettyClientHandler());
            }
        });
        ChannelFuture future =bootstrap.connect(host,port).sync();
        if (future.isSuccess()) {
            socketChannel = (SocketChannel)future.channel();
            System.out.println("connect server  成功---------");
        }
    }
    public static void main(String[]args) throws InterruptedException {
        Constants.setClientId("001");
        NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

        LoginMsg loginMsg=new LoginMsg();
        loginMsg.setPassword("yao");
        loginMsg.setUserName("robin");
        bootstrap.socketChannel.writeAndFlush(loginMsg);
        while (true){
            TimeUnit.SECONDS.sleep(3);
            AskMsg askMsg=new AskMsg();
            AskParams askParams=new AskParams();
            askParams.setAuth("authToken");
            askMsg.setParams(askParams);
            bootstrap.socketChannel.writeAndFlush(askMsg);
        }
    }
}

具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient

转发请注明来源:http://my.oschina.net/robinyao/blog/399060

© 著作权归作者所有

robin-yao
粉丝 167
博文 54
码字总数 61436
作品 0
杭州
私信 提问
加载中

评论(21)

robin-yao
robin-yao 博主

引用来自“XiangYu_Lv”的评论

我想知道怎么测试啊
直接把例子下到本地,先启动Server 在启动Client.我这个程序里有bug,而且用的是netty5,你可以直接去github下载netty代码,里面example直接跑,比较适合学习
XiangYu_Lv
XiangYu_Lv
我想知道怎么测试啊
waylau
waylau

引用来自“空北”的评论

netty不是提供了原生的ChannelGroups来做map的事情吗?

引用来自“robin-yao”的评论

ChannelGroups可以做的

引用来自“天生我狂”的评论

ChannelGroups保存channel的代码有么,请发给看看
有,见 http://my.oschina.net/waylau/blog/380957 第一段代码即是
天生我狂

引用来自“空北”的评论

netty不是提供了原生的ChannelGroups来做map的事情吗?

引用来自“robin-yao”的评论

ChannelGroups可以做的
ChannelGroups保存channel的代码有么,请发给看看
一一叶
一一叶

引用来自“robin-yao”的评论

引用来自“夜域诡士”的评论

给你提交个大量bug 换是给你修改??

??啥Bug

还总是报找不到class
一一叶
一一叶

引用来自“robin-yao”的评论

引用来自“夜域诡士”的评论

给你提交个大量bug 换是给你修改??

??啥Bug

时间长了,出现一堆bug
robin-yao
robin-yao 博主

引用来自“夜域诡士”的评论

给你提交个大量bug 换是给你修改??

??啥Bug
一一叶
一一叶
给你提交个大量bug 换是给你修改??
一一叶
一一叶
你自己测试了吗???????????
这个男人来自地球
这个男人来自地球

引用来自“zhustanley”的评论

这个效率太差!!!
public static void remove(SocketChannel socketChannel){ for (Map.Entry entry:map.entrySet()){ if (entry.getValue()==socketChannel){ map.remove(entry.getKey()); } } }
有什么改进的想法?
Netty(一) SpringBoot 整合长连接心跳机制

前言 Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。 最终能达到的效果: 客户端每隔 N 秒检测是否需要发送心跳。 服务端也每隔 N 秒检测是否需要...

crossoverJie
2018/05/28
0
0
Netty + ZooKeeper 实现简单的服务注册与发现

一. 背景 最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。 二. Netty 的使用 在接收到派单任务之后,通过 Netty 推送到指定门...

Tony沈哲
06/18
0
0
Netty干货分享:京东京麦的生产级TCP网关技术实践总结

1、引言 京东的京麦商家后台2014年构建网关,从HTTP网关发展到TCP网关。在2016年重构完成基于Netty4.x+Protobuf3.x实现对接PC和App上下行通信的高可用、高性能、高稳定的TCP长连接网关。 早期...

JackJiang2011
2017/12/01
0
0
【Netty】Netty实例开源项目

版权声明:本文为谙忆原创文章,转载请附上本文链接,谢谢。 https://blog.csdn.net/qq_26525215/article/details/81989644 Netty netty-not-sticky-pack-demo 项目地址 Netty 本篇博客讲解:...

谙忆
2018/08/23
0
0
Netty 那些事儿 ——— 心跳机制

本文是Netty文集中“Netty 那些事儿”系列的文章。主要结合在开发实战中,我们遇到的一些“奇奇怪怪”的问题,以及如何正确且更好的使用Netty框架,并会对Netty中涉及的重要设计理念进行介绍...

tomas家的小拨浪鼓
2017/11/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

在每个GROUP BY组中选择第一行?

顾名思义,我想选择以GROUP BY分组的每组行的第一行。 具体来说,如果我有一个如下的purchases表: SELECT * FROM purchases; 我的输出: id | customer | total---+----------+------ 1...

技术盛宴
37分钟前
5
0
python 安装与使用总结

https://www.jetbrains.com/pycharm/download/#section=mac

T型人才追梦者
40分钟前
5
0
每个开发人员都应该知道的11个Linux命令

本文主要挑选出读者有必要首先学习的 11 个 Linux 命令,如果不熟悉的读者可以在虚拟机或云服务器上实操下,对于开发人员来说,能熟练掌握 Linux 做一些基本的操作是必要的! 事不宜迟,这里...

武培轩
49分钟前
7
0
window.onload与$(document).ready()

JavaScript的window.onload和jQuery的$(document).ready()方法有什么区别? #1楼 关于在Internet Explorer中使用$(document).ready()的警告。 如果在整个文档加载之前 HTTP请求被中断(例如,...

javail
52分钟前
8
0
对比yml配置文件与properties的区别

我们在日常编码中少不了配置文件,说到配置文件就不得不说起yml和properties这两种后缀的配置文件 接下来我带大家简述一下他们具体有什么区别 - yml格式的文件 server: port: 9090 spring: a...

理性思考
55分钟前
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部