Netty就是这么回事(七)

原创
2017/01/16 11:48
阅读数 119

这一章,主要介绍下Netty的心跳处理,心跳处理在通信开发中是最常用的,服务端通过心跳可以监控客户端的链接状态,进行相应的处理。

记得,之前用NIO做了一个客户端和服务端通信的项目,客户端并不是用java写的,而且一个嵌入式的设备,走的lwapp协议栈,有时候嵌入式设备点击复位或者直接掉电后,服务端还没有反应过来,还认为链接是连接状态,资源也就是一直没有得到释放。早在BIO的时候通过检测返回值是否是-1,异常捕获,setSoTimeout(超时时间)来确定客户端是否连接有效。到了nio只能自己实现一个心跳检测,非常的麻烦。好在Netty为我们提供了IdleStateHandler类来完成心跳检测功能,它非常简单,只有三个参数:public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) 读超时时间,写超时时间,读写超时时间,然后实现用户事件触发监听userEventTriggered这个方法,在这个方法里做相应的处理就可以了,是不是非常的方便!

下面来看一下服务端的代码:

package com.dlb.note.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 功能:心跳时间服务器
 * 版本:1.0
 * 日期:2016/12/13 10:51
 * 作者:馟苏
 */
public class IdleTimerServer {
    /**
     * 主函数
     */
    public static void main(String []args) {
        // 配置服务端的NIO线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            // 添加心跳处理器 5s读,5s写,10s读写
                            channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
                            channel.pipeline().addLast(new IdleTimerServerHandler());
                        }
                    });

            // 绑定端口,同步等待成功
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            System.out.println("服务器在8888端口监听hello");

            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
            System.out.println("服务器关闭bye");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class IdleTimerServerHandler extends ChannelHandlerAdapter {
    // 可读
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 读数据
        ByteBuf buf = (ByteBuf) msg;

        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        String body = new String(req, "UTF-8");
        System.out.println("receive:" + body);

        // 写数据
        ByteBuf res = Unpooled.copiedBuffer("hello,client!".getBytes());
        ctx.write(res);
        ctx.flush();
    }

    /**
     * 用户事件触发
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){ // 接受心跳事件
            IdleStateEvent event = (IdleStateEvent)evt;

            if(event.state() == IdleState.ALL_IDLE){ // 读和写状态
                System.out.println("心跳结束");
                //清除超时会话
                ByteBuf res = Unpooled.copiedBuffer("you will close!".getBytes());
                ChannelFuture writeAndFlush = ctx.writeAndFlush(res);
                // 监听结果
                writeAndFlush.addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        ctx.channel().close();
                    }
                });
            }
        }
    }

    // 连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client come,ip:" + ctx.channel().remoteAddress());
    }

    // 关闭
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    // 异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.toString());
        ctx.close();
    }
}
展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部