Netty整理(二)

原创
2019/09/28 22:00
阅读数 1.6K

Netty整理

现在我们来验证一下channel的生命周期。

我们将EchoServerHandler修改如下,增加全部的监听事件,并打印事件方法名称。

/**
 * 事件处理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 监听读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 将channel注册到EventLoop的Selector多路复用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未注册到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有连接,变为活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 没有连接,非活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}

启动EchoServer,打开telnet连接到端口,我们可以看到

admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfs
sdfs
^]
telnet> quit
Connection closed.

整个过程为连接,发送字符串sdfs,退出连接

服务端日志为

2019-10-01 05:33:36.960  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-10-01 05:33:36.960  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-10-01 05:33:54.439  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : sdfs

2019-10-01 05:33:54.442  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-01 05:34:22.527  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-01 05:34:22.529  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelInactive
2019-10-01 05:34:22.529  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregistered

整个生命周期正如前面写到一样

Channel的生命周期为:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered

ChannelPipeline:
        好比厂里的流水线一样,可以在上面添加多个ChannelHanler,也可看成是一串 ChannelHandler 实例,拦截穿过 Channel 的输入输出 event,  ChannelPipeline 实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权。

我们来看一下ChannelPipeline的源码

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    /**
     * 在管道的首位置添加一个channelhandler
     */
    ChannelPipeline addFirst(String name, ChannelHandler handler);

    /**
     * 同上,多了一个线程池参数
     */
    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

    /**
     * 在管道的最末端添加一个channelhandler
     */
    ChannelPipeline addLast(String name, ChannelHandler handler);

    /**
     * 同上,多了一个线程池参数
     */
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

    /**
     * 在一个管道中已存在的channelhandler之前插入另外一个channelhandler
     */
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

    /**
     * 同上,多了一个线程池参数
     */
    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    /**
     * 在管道已有多一个channelhandler之后插入另外一个channelhandler
     */
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

    /**
     * 同上,多了一个线程池参数
     */
    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    /**
     * 在该管道的首位置放入一组channelhandler
     *
     */
    ChannelPipeline addFirst(ChannelHandler... handlers);

    /**
     * 同上,多了一个线程池参数
     *
     */
    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

    /**
     * 在管道的最末端放入一组channelhandler
     *
     */
    ChannelPipeline addLast(ChannelHandler... handlers);

    /**
     * 同上,多了一个线程池参数
     *
     */
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

    /**
     * 从管道中移除一个channelhandler
     */
    ChannelPipeline remove(ChannelHandler handler);

    /**
     * 根据名字在管道中移除一个channelhandler
     */
    ChannelHandler remove(String name);

    /**
     * 根据类名在管道中移除一个channelhandler
     */
    <T extends ChannelHandler> T remove(Class<T> handlerType);

    /**
     * 移除管道中首个channelhandler
     */
    ChannelHandler removeFirst();

    /**
     * 移除管道中末个channelhandler
     */
    ChannelHandler removeLast();

    /**
     * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字
     */
    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

    /**
     * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字
     */
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    /**
     * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字
     */
    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);

    /**
     * 返回管道中首个channelhandler
     */
    ChannelHandler first();

    /**
     * 获取第一个管道处理器上下文
     */
    ChannelHandlerContext firstContext();

    /**
     * 获取管道中最后一个channelhandler
     */
    ChannelHandler last();

    /**
     * 获取管道中最后一个管道处理器上下文
     */
    ChannelHandlerContext lastContext();

    /**
     * 根据名字获取管道中的一个channelhandler
     */
    ChannelHandler get(String name);

    /**
     * 根据类获取一个channelhandler
     */
    <T extends ChannelHandler> T get(Class<T> handlerType);

    /**
     * 根据channelhandler获取一个管道处理器上下文
     */
    ChannelHandlerContext context(ChannelHandler handler);

    /**
     * 根据名字获取一个管道处理器上下文
     */
    ChannelHandlerContext context(String name);

    /**
     * 根据一个channelhandler的类名获取一个管道处理器上下文
     */
    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);

    /**
     * 返回一个管道
     */
    Channel channel();

    /**
     * 返回管道中的channelhandler的名称列表
     */
    List<String> names();

    /**
     * Converts this pipeline into an ordered {@link Map} whose keys are
     * handler names and whose values are handlers.
     */
    Map<String, ChannelHandler> toMap();

    @Override
    ChannelPipeline fireChannelRegistered();

     @Override
    ChannelPipeline fireChannelUnregistered();

    @Override
    ChannelPipeline fireChannelActive();

    @Override
    ChannelPipeline fireChannelInactive();

    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);

    @Override
    ChannelPipeline fireUserEventTriggered(Object event);

    @Override
    ChannelPipeline fireChannelRead(Object msg);

    @Override
    ChannelPipeline fireChannelReadComplete();

    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    @Override
    ChannelPipeline flush();
}

ChannelHandlerContext模块的作用和分析

我们在ChannelHandler的方法中会看到有一个参数。如

/**
 * 监听读取事件
 * @param ctx
 * @param msg
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf data = (ByteBuf) msg;
    log.info(data.toString(CharsetUtil.UTF_8));
    ctx.writeAndFlush(data);
}

这个ChannelHandlerContext就是一个处理器的上下文。

1、ChannelHandlerContext是连接ChannelHandler和ChannelPipeline的桥梁


            ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,好比调用write方法,
            Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者都会在整个管道流里传播,而ChannelHandlerContext就只会在后续的Handler里面传播

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //第一种,会在整个管道中传播
        Channel channel = ctx.channel();
        channel.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
        //第二种,会在整个管道中传播
        ChannelPipeline pipeline = ctx.pipeline();
        pipeline.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
        //第三种,只会在后续的channelhandler中传播
        ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
//        ByteBuf data = (ByteBuf) msg;
//        log.info(data.toString(CharsetUtil.UTF_8));
//        ctx.writeAndFlush(data);
    }

2、AbstractChannelHandlerContext类
                    双向链表结构,next/prev分别是后继节点,和前驱节点        

volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;

            3、DefaultChannelHandlerContext 是实现类,但是大部分都是父类那边完成,这个只是简单的实现一些方法
                主要就是判断Handler的类型

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

                ChannelInboundHandler之间的传递,主要通过调用ctx里面的FireXXX()方法来实现下个handler的调用

我们修改一下EchoServerHandler,再增加一个EchoServerHandler2

/**
 * 事件处理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 监听读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        //第一种,会在整个管道中传播
//        Channel channel = ctx.channel();
//        channel.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
//        //第二种,会在整个管道中传播
//        ChannelPipeline pipeline = ctx.pipeline();
//        pipeline.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
//        //第三种,只会在后续的channelhandler中传播
//        ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(Unpooled.copiedBuffer("我是第一个handler", CharsetUtil.UTF_8));
        ctx.fireChannelRead(msg);
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 将channel注册到EventLoop的Selector多路复用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未注册到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有连接,变为活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 没有连接,非活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}
@Slf4j
public class EchoServerHandler2 extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(Unpooled.copiedBuffer("我是第二个handler", CharsetUtil.UTF_8));
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 将channel注册到EventLoop的Selector多路复用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未注册到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有连接,变为活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 没有连接,非活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}

将EchoServerHandler2添加到管道中

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

我们可以看到EchoServerHandler2是添加到EchoServerHandler后面的。

启动EchoServer,打开telnet

admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
kdfaf
我是第一个handler我是第二个handler^]
telnet> quit
Connection closed.

服务端的日志为

2019-09-29 22:42:42.626  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-09-29 22:42:42.627  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-09-29 22:42:48.108  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : kdfaf

2019-09-29 22:42:48.114  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler2   : kdfaf

2019-09-29 22:42:48.114  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-09-29 22:44:09.799  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-09-29 22:44:09.802  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelInactive
2019-09-29 22:44:09.802  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregistered

多个入站出站ChannelHandler的执行顺序

1、一般的项目中,inboundHandler和outboundHandler有多个,在Pipeline中的执行顺序?
                InboundHandler顺序执行,OutboundHandler逆序执行
                问题:ch.pipeline().addLast(new InboundHandler1());
                     ch.pipeline().addLast(new OutboundHandler1());
                     ch.pipeline().addLast(new OutboundHandler2());
                     ch.pipeline().addLast(new InboundHandler2());
                或者:
                     ch.pipeline().addLast(new OutboundHandler1());
                     ch.pipeline().addLast(new OutboundHandler2());
                     ch.pipeline().addLast(new InboundHandler1());
                     ch.pipeline().addLast(new InboundHandler2());

现在我们以实际代码来验证一下

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
        //会传递到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
@Slf4j
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
        //在此结束调用,不会在管道中继续传递
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
@Slf4j
public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("OutBoundHandler1 write : " + data.toString(CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer("OutBoundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        ctx.flush();
    }
}
@Slf4j
public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("OutboundHaneler2 write : " + data.toString(CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer("OutboundHandler2 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        ctx.flush();
    }
}
@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
//                            socketChannel.pipeline().addLast(new EchoServerHandler());
//                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler1());
                            socketChannel.pipeline().addLast(new InboundHandler2());
                            socketChannel.pipeline().addLast(new OutboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler2());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

启动EchoServer,打开telnet

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
dsfs
InboundHandler2 InboundHandler1 dsfs


服务端日志

2019-10-02 09:51:44.865  INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服务端收到数据:dsfs

2019-10-02 09:51:44.869  INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 dsfs

我们可以看到此时,事件在InboundHandler中就结束了,并没有传递到OutboundHandler中。这是因为OutboundHandler在InboundHandler之后才开始监听,InboundHandler在处理中,并没有监听写出站事件,所以不会执行到OutboundHandler之中的代码。

但是如果把监听事件的顺序调整一下,在入站之前就开始监听出站事件。

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
//                            socketChannel.pipeline().addLast(new EchoServerHandler());
//                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                            socketChannel.pipeline().addLast(new OutboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler1());
                            socketChannel.pipeline().addLast(new InboundHandler2());
                            
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

运行EchoServer,打开telnet

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sfdfg
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
^]
telnet> quit
Connection closed.

从右到左为依次执行顺序,先是InboundHandler1->InboundHandler2->OutboundHandler2->OutboundHandler1

服务端日志为

2019-10-02 10:27:03.683  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服务端收到数据:sfdfg

2019-10-02 10:27:03.686  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 sfdfg

2019-10-02 10:27:03.686  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sfdfg

2019-10-02 10:27:03.686  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg

因为没有监听活跃,断开事件,所以不会打印相关日志。这里之所以OutboundHandler会执行是因为在入站之前就开始监听写出事件,而InboundHandler会先执行是因为只有接收到客户端的消息的时候才会进行写出操作,这个时候才会被OutboundHandler监听到,进行相关操作,但是由于OutboundHandler是从后往前执行,所以会先执行OutboundHandler2,再执行OutboundHandler1。而整个数据传输是贯穿管道始终的。

执行顺序是:
                    InboundHandler1 channelRead
                    InboundHandler2 channelRead
                    OutboundHandler2 write
                    OutboundHandler1 write

 

现在把InboundHandler1提到最前又是什么情况呢?

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
//                            socketChannel.pipeline().addLast(new EchoServerHandler());
//                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler2());

                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

由分析可知,OutboundHandler出站是不会监听InboudHandler1的写出事件的,但可以监听到InboundHandler2的写出事件。

我们修改一下两个InboundHandler来加以验证。

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
        
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        //会传递到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
@Slf4j
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
        //在此结束调用,不会在管道中继续传递
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 alone \n".getBytes()));
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

重启EchoServer,打开telnet

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfljioldf
InboundHandler1 sdfljioldf    //此处InboundHandler1只是简单的输出了,没有被OutboundHandler监听。
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone  //此处InboundHandler2被OutboundHandler监听,所以会追加输出OutBoundHandler1 OutboundHandler2
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf   //此处也是InboundHandler2被OutboundHandler监听输出的。

服务端日志

2019-10-02 11:04:09.345  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服务端收到数据:sdfljioldf

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 sdfljioldf

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 alone

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sdfljioldf

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf

跟我们分析的一样。

但如果我们修改一下InboundHandler1的代码如下

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));

        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        //会传递到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

或者

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));

        ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        //会传递到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

现在无论InboundHandler1的位置放前、放后,是否被OutboundHandler监听,它都会流通整个管道。

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler1 alone ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg

服务端日志为

2019-10-02 11:18:10.761  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服务端收到数据:ljlfrg

2019-10-02 11:18:10.764  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler1 alone ljlfrg

2019-10-02 11:18:10.764  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler1 alone ljlfrg

2019-10-02 11:18:10.767  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 ljlfrg

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 alone

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 InboundHandler1 ljlfrg

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg

 结论:
                1)InboundHandler顺序执行,OutboundHandler逆序执行
                
                2)InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)
                
                3)InboundHandler通过ctx.write(msg),则会传递到outboundHandler
                
                4) 使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行;
                    但是使用channel.write(msg)、pipline.write(msg)情况会不一致,都会执行

                5) outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先outbound再inbound,服务端则相反

7、Netty异步操作模块ChannelFuture

Netty中的所有I/O操作都是异步的,这意味着任何I/O调用都会立即返回,而ChannelFuture会提供有关的信息I/O操作的结果或状态。

ChannelFuture的使用主要是在EchoServer中

//绑定端口,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//阻塞,等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();

  
        1)ChannelFuture状态:
            未完成:当I/O操作开始时,将创建一个新的对象,新的最初是未完成的 - 它既没有成功,也没有被取消,因为I/O操作尚未完成。 

            已完成:当I/O操作完成,不管是成功、失败还是取消,Future都是标记为已完成的, 失败的时候也有具体的信息,例如原因失败,但请注意,即使失败和取消属于完成状态。

        ChannelFuture的四种状态

*                                      +---------------------------+
*                                      | Completed successfully    |
*                                      +---------------------------+
*                                 +---->      isDone() = true      |
* +--------------------------+    |    |   isSuccess() = true      |
* |        Uncompleted       |    |    +===========================+
* +--------------------------+    |    | Completed with failure    |
* |      isDone() = false    |    |    +---------------------------+
* |   isSuccess() = false    |----+---->      isDone() = true      |
* | isCancelled() = false    |    |    |       cause() = non-null  |
* |       cause() = null     |    |    +===========================+
* +--------------------------+    |    | Completed by cancellation |
*                                 |    +---------------------------+
*                                 +---->      isDone() = true      |
*                                      | isCancelled() = true      |
*                                      +---------------------------+

            注意:不要在IO线程内调用future对象的sync或者await方法
                不能在channelHandler中调用sync或者await方法

        2)ChannelPromise:继承于ChannelFuture,进一步拓展用于设置IO操作的结果

其中的继承关系图如上所示,第一个Future是JDK自带的,第二个Future是netty中增加的。关于future一般的使用方法可以参考Fork/Join框架原理和使用探秘

Netty编写的网络数据传输中的编码和解码

前面说的:高性能RPC框架的3个要素:IO模型、数据协议、线程模型

            最开始接触的编码码:java序列化/反序列化(就是编解码)、url编码、base64编解码

            为啥jdk有编解码,还要netty自己开发编解码?
                java自带序列化的缺点
                    1)无法跨语言
                    2) 序列化后的码流太大,也就是数据包太大
                    3) 序列化和反序列化性能比较差
            
            业界里面也有其他编码框架: google的 protobuf(PB)、Facebook的Trift、Jboss的Marshalling、Kyro等,关于Kyro的使用方法可以参考浅析克隆

 Netty里面的编解码:
                解码器:负责处理“入站 InboundHandler”数据
                编码器:负责“出站 OutboundHandler” 数据

                Netty里面提供默认的编解码器,也支持自定义编解码器
                Encoder:编码器
                Decoder:解码器
                Codec:编解码器

Netty的解码器Decoder和使用场景

        Decoder对应的就是ChannelInboundHandler,主要就是字节数组转换为消息对象,在我们之前的样例中,都是处理一些简单的字符串来进行消息传递,但如果我们需要转换的是Java的对象的话,则需要使用到Decoder。

        主要是两个方法
            decode (常用)
            decodeLast (用于最后的几个字节处理,也就是channel关闭的时候,产生的最后一个消息)
        抽象解码器
            1)ByteToMessageDecoder
                用于将字节转为消息,需要检查缓冲区是否有足够的字节,通过源码可知,ByteToMessageDecoder实际上就是一个ChannelInboundHandler。

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter

            2)ReplayingDecoder
                继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,不是所有的ByteBuf都支持


            选择:项目复杂性高则使用ReplayingDecoder,否则使用 ByteToMessageDecoder

解码器具体的实现,用的比较多的是(更多是为了解决TCP底层的粘包和拆包问题)

            DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
            LineBasedFrameDecoder: 以换行符为结束标志的解码器
            FixedLengthFrameDecoder:固定长度解码器
            LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
            StringDecoder:文本解码器,将接收到的对象转化为字符串,一般会与上面的进行配合,然后在后面添加业务handle

Netty编码器Encoder

        Encoder对应的就是ChannelOutboundHandler,消息对象转换为字节数组

        Netty本身未提供和解码一样的编码器,是因为场景不同,两者非对等的


        1)MessageToByteEncoder
            消息转为字节数组,调用write方法,会先判断当前编码器是否支持需要发送的消息类型,如果不支持,则透传;其本身其实就是一个ChannelOutboundHandler。

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter

        2)MessageToMessageEncoder
            用于从一种消息编码为另外一种消息(例如POJO到POJO)

Netty编解码器类Codec

组合解码器和编码器,以此提供对于字节和消息都相同的操作(一般不常用)
        
        优点:成对出现,编解码都是在一个类里面完成

        缺点:耦合在一起,拓展性不佳


        Codec:组合编解码
            1)ByteToMessageCodec

            2)MessageToMessageCodec

        decoder:解码
             1)ByteToMessageDecoder

             2)MessageToMessageDecoder
        
        encoder:编码
             1)ByteToMessageEncoder

            2)MessageToMessageEncoder

什么是TCP粘包拆包

        1)TCP拆包: 一个完整的包可能会被TCP拆分为多个包进行发送
        2)TCP粘包: 把多个小的包封装成一个大的数据包发送, client发送的若干数据包 Server接收时粘成一包

       发送方和接收方都可能出现这个原因
        
        发送方的原因:TCP默认会使用Nagle算法
        
        接收方的原因: TCP接收到数据放置缓存中,应用程序从缓存中读取
       
       UDP: 是没有粘包和拆包的问题,有边界协议

2、TCP半包读写常见解决方案
        简介:讲解TCP半包读写常见的解决办法

             发送方:可以关闭Nagle算法
             接受方: TCP是无界的数据流,并没有处理粘包现象的机制, 且协议本身无法避免粘包,半包读写的发生需要在应用层进行处理
                    应用层解决半包读写的办法
                        1)设置定长消息 (10字符)
                            xdclass000xdclass000xdclass000xdclass000
                        2)设置消息的边界 ($$ 切割)
                            sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$

                        3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息
                                Header+Body

 3、Netty自带解决TCP半包读写方案

            DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
            LineBasedFrameDecoder: 以换行符为结束标志的解码器
            FixedLengthFrameDecoder:固定长度解码器
            LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器

现在我们来做一个实验,如果不使用解码器,会产生什么样的效果。

服务端的入站事件处理器

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        byte[] bytes = new byte[data.readableBytes()];
        //将收到到字节流放入内存字节数组
        data.readBytes(bytes);
        //将字节数组转成字符串,并截取该字符串从0到换行符(不包含换行符)到子串
        //System.getProperty("line.separator")为获取操作系统的换行符,每种操作系统可能各不相同
        String body = new String(bytes, CharsetUtil.UTF_8)
                .substring(0,bytes.length - System.getProperty("line.separator").length());
        log.info("服务端收到消息内容为:" + body + "收到消息次数:" + ++counter);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端入站事件处理器

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        //定义一段包含换行符的字符串,并转化为字节数组
        byte[] req = ("guanjian.net" + System.getProperty("line.separator")).getBytes();
        //连续发送10条该字符串到服务端
        for (int i = 0;i < 10;i++) {
            //申请一段内存缓冲区
            msg = Unpooled.buffer(req.length);
            //将字节数组写入缓冲区
            msg.writeBytes(req);
            //发送该字符串到服务端
            ctx.writeAndFlush(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分别启动服务端和客户端

我们可以看到服务端日志为

2019-10-05 08:41:11.733  INFO 614 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net收到消息次数:1

虽然客户端是分了10次发送到数据,但是服务端却是只接收了一次,收到消息次数1,这明显是一次粘包。

现在我们给客户端加上.option(ChannelOption.TCP_NODELAY,true)来看一下是什么情况

@AllArgsConstructor
public class EchoClient {
    private String host;
    private int port;

    public void run() throws InterruptedException {
        //客户端处理线程组(其实就是一个线程池)
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //客户端netty启动对象
            Bootstrap bootstrap = new Bootstrap();
            //将客户端线程组添加到启动对象中
            bootstrap.group(group)
                    //给启动对象添加Socket管道
                    .channel(NioSocketChannel.class)
                    //主动连接到远程服务器IP端口
                    .remoteAddress(new InetSocketAddress(host,port))
                    .option(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            //连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞直到客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            group.shutdownGracefully();
        }
    }
}

服务端日志为

2019-10-05 08:45:52.654  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:1
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:2
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:3
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:4
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:5
2019-10-05 08:45:52.656  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:6

我们可以看到,虽然加了.option(ChannelOption.TCP_NODELAY,true),但并不能保证它不产生粘包,有些包包含了两条字符串,收到的消息也就不是10次了。

现在我们把换行解码器LineBasedFrameDecoder放入

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
                            //这个1024是在1024个字节内去寻找换行符,如果在1024个字节内没有找到换行符,就会报错
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

再次运行,服务端日志

2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:1
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:2
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:3
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:4
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:5
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:6
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:7
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:8
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:9
2019-10-05 09:04:15.694  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.ne收到消息次数:10

LineBasedFrameDecoder是继承于ByteToMessageDecoder,是一个ChannelInboundHandler

public class LineBasedFrameDecoder extends ByteToMessageDecoder

由上面可见,粘包经过LineBasedFrameDecoder处理,再逐条发往下一个ChannelInboundHandler,而不是从客户端逐条发送过来的。

如果我们觉得ServerHandler的channelRead方法太麻烦了,还要由字节数组转成字符串,那我们直接将收到的信息强制转成字符串会怎么样呢?

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    log.info("服务端收到消息内容为:" + body + "收到消息次数:" + ++counter);
}

启动后服务端报错如下(部分)

java.lang.ClassCastException: io.netty.buffer.PooledSlicedByteBuf cannot be cast to java.lang.String
    at com.guanjian.websocket.netty.packet.ServerHandler.channelRead(ServerHandler.java:29)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)

说明此处无法将收到的信息直接强制转换成字符串,这个时候我们可以加入字符串解码器StringDecoder.

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
                            //这个1024是在1024个字节内去寻找换行符,如果在1024个字节内没有找到换行符,就会报错
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //字符串解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

启动运行,服务端日志为

2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:1
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:2
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:3
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:4
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:5
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:6
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:7
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:8
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:9
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:guanjian.net收到消息次数:10

使用解码器LineBasedFrameDecoder解决半包读写问题
            
            1)LineBaseFrameDecoder 以换行符为结束标志的解码器 ,构造函数里面的数字表示最长遍历的帧数

            2)StringDecoder解码器将对象转成字符串

现在我们修改一下客户端处理器ClientHandler。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    String message = "Starting EchoclientApplication &_" +
            "on admindeMBP.lan with PID 741 &_" +
            "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" +
            "/Users/admin/Downloads/nettyecho)";
    ByteBuf buf = null;
    //申请一段内存缓冲区
    buf = Unpooled.buffer(message.getBytes().length);
    //将字节数组写入缓冲区
    buf.writeBytes(message.getBytes());
    //发送字节数组到服务端
    ctx.writeAndFlush(buf);
}

我们可以看到message字符串中都带有&_字符,我们将在服务端以该字符为分隔符进行解码。

修改EchoServer到代码如下

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
                            //使用指定消息分隔符解码器进行解码
                            //1024的意思是在1024个字节内查找&_,如果找不到就会抛出异常
                            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            //字符串解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

分别启动服务端,客户端。服务端到日志如下

2019-10-06 12:55:36.078  INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:Starting EchoclientApplication 收到消息次数:1
2019-10-06 12:55:36.078  INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:on admindeMBP.lan with PID 741 收到消息次数:2
2019-10-06 12:55:36.079  INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次数:3

对比message

"Starting EchoclientApplication &_" +
        "on admindeMBP.lan with PID 741 &_" +
        "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" +
        "/Users/admin/Downloads/nettyecho)"

我们会发现最后一句/Users/admin/Downloads/nettyecho)被丢弃了,所以要全部拿到我们需要的消息,我们需要在最后一段字符串中加入&_

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    String message = "Starting EchoclientApplication &_" +
            "on admindeMBP.lan with PID 741 &_" +
            "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" +
            "/Users/admin/Downloads/nettyecho)&_";
    ByteBuf buf = null;
    //申请一段内存缓冲区
    buf = Unpooled.buffer(message.getBytes().length);
    //将字节数组写入缓冲区
    buf.writeBytes(message.getBytes());
    //发送字节数组到服务端
    ctx.writeAndFlush(buf);
}

再次启动客户端,服务端日志为

2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:Starting EchoclientApplication 收到消息次数:1
2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:on admindeMBP.lan with PID 741 收到消息次数:2
2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次数:3
2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:/Users/admin/Downloads/nettyecho)收到消息次数:4

其实DelimiterBasedFrameDecoder有很多个构造器。我们这里使用的两参构造器其实调用到是一个三参构造器。

public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
    this(maxFrameLength, true, delimiter);
}
public DelimiterBasedFrameDecoder(
        int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
    this(maxFrameLength, stripDelimiter, true, delimiter);
}

而三参构造器其实是调用了一个四参构造器

public DelimiterBasedFrameDecoder(
        int maxFrameLength, boolean stripDelimiter, boolean failFast,
        ByteBuf delimiter) {
    this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] {
            delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
}

我们来说明一下这个四参构造器每个参数到含义。

             maxFrameLength:
                表示一行最大的长度,如果超过这个长度依然没有检测自定义分隔符,将会抛出TooLongFrameException

            stripDelimiter:
                解码后的消息是否去除掉分隔符(true去掉分隔符,flase保留分隔符)

            failFast:
                如果为true,则超出maxLength后立即抛出TooLongFrameException,不进行继续解码
                如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常

            delimiters:
                分隔符,ByteBuf类型

现在我们来看一下第二个参数到含义。

修改EchoServer如下

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
                            //使用指定消息分隔符解码器进行解码,1024为在1024个字节内查找分隔符(可以自己任意定义),如果
                            //找不到会抛出异常
                            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,false,true,delimiter));
                            //字符串解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

重新启动服务端,客户端。服务端到日志如下

2019-10-06 13:33:24.806  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:Starting EchoclientApplication &_收到消息次数:1
2019-10-06 13:33:24.807  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:on admindeMBP.lan with PID 741 &_收到消息次数:2
2019-10-06 13:33:24.807  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_收到消息次数:3
2019-10-06 13:33:24.808  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服务端收到消息内容为:/Users/admin/Downloads/nettyecho)&_收到消息次数:4

自定义长度半包读写器LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder也有很多构造器,一般我们使用的是一个五参构造器

public LengthFieldBasedFrameDecoder(
        int maxFrameLength,
        int lengthFieldOffset, int lengthFieldLength,
        int lengthAdjustment, int initialBytesToStrip) {
    this(
            maxFrameLength,
            lengthFieldOffset, lengthFieldLength, lengthAdjustment,
            initialBytesToStrip, true);
}

当然它调用的是一个六参构造器。

public LengthFieldBasedFrameDecoder(
        int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
        int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
    this(
            ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,
            lengthAdjustment, initialBytesToStrip, failFast);
}

这六个参数的含义如下

        maxFrameLength 数据包的最大长度
        lengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节之后的才是消息体字段

        lengthFieldLength 长度字段占的字节数, 帧数据长度的字段本身的长度

        lengthAdjustment
        一般 Header + Body,添加到长度字段的补偿值,如果为负数,开发人员认为这个 Header的长度字段是整个消息包的长度,则Netty应该减去对应的数字
        initialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包之后,忽略前面的指定位数的长度字节,应用解码器拿到的就是不带长度域的数据包
        failFast 是否快速失败

六参构造器调用到是一个七参构造器(不讨论)。

现在我们来设置一个存储消息内容到消息类。(服务端和客户端相同)

public class CCMessageHeader {
    @Getter
    @Setter
    private byte[] messageFlag = new byte[2]; //消息长度偏移量
    @Getter
    @Setter
    private int length; //消息长度
    @Getter
    @Setter
    private int type; //消息类型
    @Getter
    private String data; //消息内容

    public CCMessageHeader() {
        //170,这两个数没有实际意义,只用来描述消息长度偏移量而填充进去的
        messageFlag[0] = (byte) 0xaa;
        //187
        messageFlag[1] = (byte) 0xbb;
    }

    public void setData(String data) {
        this.data = data;
        //消息体的长度为字符串data的长度加4,4为type整形的长度
        this.length = data.length() + 4;
    }
}

服务端入站处理器

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf body = (ByteBuf) msg;
        //如果字节流到可读字节数小于等于0,转给下一个InboundHandler处理
        if (body.readableBytes() <= 0) {
            ctx.fireChannelRead(msg);
        }
        //初始化一个接收消息对象
        CCMessageHeader recHd = new CCMessageHeader();
        //字节流读取到的第一个整形赋给nLength
        int nLength = body.readInt();
        //字节流读取到到第二个整形赋给nType
        int nType = body.readInt();
        //获取字节流的可读字节数,并分配一个该大小的字节数组
        int nDataSize = body.readableBytes();
        byte[] aa = new byte[nDataSize];
        //将字节流读入字节数组
        body.readBytes(aa);
        //将该字节数组转成字符串
        String myMsg = new String(aa, CharsetUtil.UTF_8);
        log.info("收到 " + ++counter + "次消息:[" + myMsg + "],类型为[" + nType + "]");

        //初始化一个发送消息对象
        CCMessageHeader hd = new CCMessageHeader();
        hd.setType(2);
        hd.setData("server data...");
        //申请一段直接缓冲空间
        ByteBuf echo = Unpooled.directBuffer();
        //将该发送消息对象的各属性写入到缓冲空间中
        echo.writeBytes(hd.getMessageFlag());
        echo.writeInt(hd.getLength());
        echo.writeInt(hd.getType());
        echo.writeCharSequence(hd.getData(),CharsetUtil.UTF_8);
        //将该发送消息对象以字节流到形式发送到客户端
        ctx.writeAndFlush(echo);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoServer如下

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
                    //如果要减少发送次数,就设置为false,会累积一定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将我们自己编写的事件处理器添加到客户端的连接管道中
                            //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel
                            //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用
                            //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //作业的工人,我们可以往一条流水线上投放很多的工人
                            //65535表示对自定义长度解码器对最大处理长度,
                            // 第二个2表示长度信息从第二个字节后开始获取,
                            //其实这个2是messageFlag的长度,即获取长度字段的偏移量。
                            //4表示长度字段占4个字节,即private int length的字节数,一个整数占4个字节
                            //0表示添加到长度字段的补偿值,这里不需要补偿
                            //最后一个2表示获取我们的消息体,要去掉2个长度字段的字节数,因为除了length字段还有一个
                            //type的整形字段,所以是2才能拿到data字段,即显示消息体字段,但其实消息体包含了type字段的
                            socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2));
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

客户端入站处理器

@Slf4j
public class ClientHandler2 extends ChannelInboundHandlerAdapter {
    private int counter;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //初始化一个发送消息对象
        CCMessageHeader hd = new CCMessageHeader();
        //设置类型为1
        hd.setType(1);
        for (int i = 0;i < 10;i++) {
            String strData = String.format("client data %d...",i + 1);
            //设置消息体
            hd.setData(strData);
            //申请一段直接缓冲区
            ByteBuf echo = Unpooled.directBuffer();
            //将消息对象的各个属性写入该缓冲区
            echo.writeBytes(hd.getMessageFlag());
            echo.writeInt(hd.getLength());
            echo.writeInt(hd.getType());
            echo.writeCharSequence(hd.getData(), CharsetUtil.UTF_8);
            //将该缓冲区的字节流发送到服务端
            ctx.writeAndFlush(echo);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf body = (ByteBuf) msg;
        //初始化接收消息对象
        CCMessageHeader recHd = new CCMessageHeader();
        //获取第一个整形赋给nLength,即消息长度
        int nLength = body.readInt();
        //获取第二个整形赋给nType
        int nType = body.readInt();
        //获取整个字节流的可读字节数,并以此建立一个字节数组
        int nDataSize = body.readableBytes();
        byte[] aa = new byte[nDataSize];
        //将字节流读入该字节数组
        body.readBytes(aa);
        //将该字节数组转换为字符串
        String myMsg = new String(aa,CharsetUtil.UTF_8);
        log.info("收到" + ++counter + "次消息[" + myMsg + "],类型为:[" + nType + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoClient代码如下

@AllArgsConstructor
public class EchoClient {
    private String host;
    private int port;

    public void run() throws InterruptedException {
        //客户端处理线程组(其实就是一个线程池)
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //客户端netty启动对象
            Bootstrap bootstrap = new Bootstrap();
            //将客户端线程组添加到启动对象中
            bootstrap.group(group)
                    //给启动对象添加Socket管道
                    .channel(NioSocketChannel.class)
                    //主动连接到远程服务器IP端口
                    .remoteAddress(new InetSocketAddress(host,port))
                    .option(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必须要实现的抽象方法
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2));
                            socketChannel.pipeline().addLast(new ClientHandler2());
                        }
                    });
            //连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞直到客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            group.shutdownGracefully();
        }
    }
}

运行服务端,客户端。服务端日志如下

2019-10-06 15:27:54.601  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 1次消息:[client data 1...],类型为[1]
2019-10-06 15:27:54.604  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 2次消息:[client data 2...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 3次消息:[client data 3...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 4次消息:[client data 4...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 5次消息:[client data 5...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 6次消息:[client data 6...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 7次消息:[client data 7...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 8次消息:[client data 8...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 9次消息:[client data 9...],类型为[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 10次消息:[client data 10...],类型为[1]

客户端日志如下

2019-10-06 15:27:54.611  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到1次消息[server data...],类型为:[2]
2019-10-06 15:27:54.611  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到2次消息[server data...],类型为:[2]
2019-10-06 15:27:54.611  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到3次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到4次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到5次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到6次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到7次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到8次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到9次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到10次消息[server data...],类型为:[2]

这样就分别完成了自定义长度的半包解码

 

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部