现在我们来验证一下channel的生命周期。
我们将EchoServerHandler修改如下,增加全部的监听事件,并打印事件方法名称。
/** * 事件处理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(data); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有连接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有连接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
启动EchoServer,打开telnet连接到端口,我们可以看到
admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfs
sdfs
^]
telnet> quit
Connection closed.
整个过程为连接,发送字符串sdfs,退出连接
服务端日志为
2019-10-01 05:33:36.960 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-10-01 05:33:36.960 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-10-01 05:33:54.439 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : sdfs
2019-10-01 05:33:54.442 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-01 05:34:22.527 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-01 05:34:22.529 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelInactive
2019-10-01 05:34:22.529 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregistered
整个生命周期正如前面写到一样
Channel的生命周期为:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered
ChannelPipeline:
好比厂里的流水线一样,可以在上面添加多个ChannelHanler,也可看成是一串 ChannelHandler 实例,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权。
我们来看一下ChannelPipeline的源码
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> { /** * 在管道的首位置添加一个channelhandler */ ChannelPipeline addFirst(String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); /** * 在管道的最末端添加一个channelhandler */ ChannelPipeline addLast(String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); /** * 在一个管道中已存在的channelhandler之前插入另外一个channelhandler */ ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * 在管道已有多一个channelhandler之后插入另外一个channelhandler */ ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * 在该管道的首位置放入一组channelhandler * */ ChannelPipeline addFirst(ChannelHandler... handlers); /** * 同上,多了一个线程池参数 * */ ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); /** * 在管道的最末端放入一组channelhandler * */ ChannelPipeline addLast(ChannelHandler... handlers); /** * 同上,多了一个线程池参数 * */ ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); /** * 从管道中移除一个channelhandler */ ChannelPipeline remove(ChannelHandler handler); /** * 根据名字在管道中移除一个channelhandler */ ChannelHandler remove(String name); /** * 根据类名在管道中移除一个channelhandler */ <T extends ChannelHandler> T remove(Class<T> handlerType); /** * 移除管道中首个channelhandler */ ChannelHandler removeFirst(); /** * 移除管道中末个channelhandler */ ChannelHandler removeLast(); /** * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字 */ ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); /** * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字 */ ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler); /** * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字 */ <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler); /** * 返回管道中首个channelhandler */ ChannelHandler first(); /** * 获取第一个管道处理器上下文 */ ChannelHandlerContext firstContext(); /** * 获取管道中最后一个channelhandler */ ChannelHandler last(); /** * 获取管道中最后一个管道处理器上下文 */ ChannelHandlerContext lastContext(); /** * 根据名字获取管道中的一个channelhandler */ ChannelHandler get(String name); /** * 根据类获取一个channelhandler */ <T extends ChannelHandler> T get(Class<T> handlerType); /** * 根据channelhandler获取一个管道处理器上下文 */ ChannelHandlerContext context(ChannelHandler handler); /** * 根据名字获取一个管道处理器上下文 */ ChannelHandlerContext context(String name); /** * 根据一个channelhandler的类名获取一个管道处理器上下文 */ ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType); /** * 返回一个管道 */ Channel channel(); /** * 返回管道中的channelhandler的名称列表 */ List<String> names(); /** * Converts this pipeline into an ordered {@link Map} whose keys are * handler names and whose values are handlers. */ Map<String, ChannelHandler> toMap(); @Override ChannelPipeline fireChannelRegistered(); @Override ChannelPipeline fireChannelUnregistered(); @Override ChannelPipeline fireChannelActive(); @Override ChannelPipeline fireChannelInactive(); @Override ChannelPipeline fireExceptionCaught(Throwable cause); @Override ChannelPipeline fireUserEventTriggered(Object event); @Override ChannelPipeline fireChannelRead(Object msg); @Override ChannelPipeline fireChannelReadComplete(); @Override ChannelPipeline fireChannelWritabilityChanged(); @Override ChannelPipeline flush(); }
ChannelHandlerContext模块的作用和分析
我们在ChannelHandler的方法中会看到有一个参数。如
/** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(data); }
这个ChannelHandlerContext就是一个处理器的上下文。
1、ChannelHandlerContext是连接ChannelHandler和ChannelPipeline的桥梁
ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,好比调用write方法,
Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者都会在整个管道流里传播,而ChannelHandlerContext就只会在后续的Handler里面传播
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //第一种,会在整个管道中传播 Channel channel = ctx.channel(); channel.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); //第二种,会在整个管道中传播 ChannelPipeline pipeline = ctx.pipeline(); pipeline.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); //第三种,只会在后续的channelhandler中传播 ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); // ByteBuf data = (ByteBuf) msg; // log.info(data.toString(CharsetUtil.UTF_8)); // ctx.writeAndFlush(data); }
2、AbstractChannelHandlerContext类
双向链表结构,next/prev分别是后继节点,和前驱节点
volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;
3、DefaultChannelHandlerContext 是实现类,但是大部分都是父类那边完成,这个只是简单的实现一些方法
主要就是判断Handler的类型
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
ChannelInboundHandler之间的传递,主要通过调用ctx里面的FireXXX()方法来实现下个handler的调用
我们修改一下EchoServerHandler,再增加一个EchoServerHandler2
/** * 事件处理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // //第一种,会在整个管道中传播 // Channel channel = ctx.channel(); // channel.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); // //第二种,会在整个管道中传播 // ChannelPipeline pipeline = ctx.pipeline(); // pipeline.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); // //第三种,只会在后续的channelhandler中传播 // ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是第一个handler", CharsetUtil.UTF_8)); ctx.fireChannelRead(msg); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有连接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有连接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
@Slf4j public class EchoServerHandler2 extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是第二个handler", CharsetUtil.UTF_8)); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有连接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有连接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
将EchoServerHandler2添加到管道中
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 socketChannel.pipeline().addLast(new EchoServerHandler()); socketChannel.pipeline().addLast(new EchoServerHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
我们可以看到EchoServerHandler2是添加到EchoServerHandler后面的。
启动EchoServer,打开telnet
admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
kdfaf
我是第一个handler我是第二个handler^]
telnet> quit
Connection closed.
服务端的日志为
2019-09-29 22:42:42.626 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-09-29 22:42:42.627 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-09-29 22:42:48.108 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : kdfaf
2019-09-29 22:42:48.114 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler2 : kdfaf
2019-09-29 22:42:48.114 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-09-29 22:44:09.799 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-09-29 22:44:09.802 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelInactive
2019-09-29 22:44:09.802 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregistered
多个入站出站ChannelHandler的执行顺序
1、一般的项目中,inboundHandler和outboundHandler有多个,在Pipeline中的执行顺序?
InboundHandler顺序执行,OutboundHandler逆序执行
问题:ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler2());
或者:
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
现在我们以实际代码来验证一下
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class InboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); //在此结束调用,不会在管道中继续传递 ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class OutboundHandler1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("OutBoundHandler1 write : " + data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutBoundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
@Slf4j public class OutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("OutboundHaneler2 write : " + data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
启动EchoServer,打开telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
dsfs
InboundHandler2 InboundHandler1 dsfs
服务端日志
2019-10-02 09:51:44.865 INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:dsfs
2019-10-02 09:51:44.869 INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 dsfs
我们可以看到此时,事件在InboundHandler中就结束了,并没有传递到OutboundHandler中。这是因为OutboundHandler在InboundHandler之后才开始监听,InboundHandler在处理中,并没有监听写出站事件,所以不会执行到OutboundHandler之中的代码。
但是如果把监听事件的顺序调整一下,在入站之前就开始监听出站事件。
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
运行EchoServer,打开telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sfdfg
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
^]
telnet> quit
Connection closed.
从右到左为依次执行顺序,先是InboundHandler1->InboundHandler2->OutboundHandler2->OutboundHandler1
服务端日志为
2019-10-02 10:27:03.683 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
因为没有监听活跃,断开事件,所以不会打印相关日志。这里之所以OutboundHandler会执行是因为在入站之前就开始监听写出事件,而InboundHandler会先执行是因为只有接收到客户端的消息的时候才会进行写出操作,这个时候才会被OutboundHandler监听到,进行相关操作,但是由于OutboundHandler是从后往前执行,所以会先执行OutboundHandler2,再执行OutboundHandler1。而整个数据传输是贯穿管道始终的。
执行顺序是:
InboundHandler1 channelRead
InboundHandler2 channelRead
OutboundHandler2 write
OutboundHandler1 write
现在把InboundHandler1提到最前又是什么情况呢?
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
由分析可知,OutboundHandler出站是不会监听InboudHandler1的写出事件的,但可以监听到InboundHandler2的写出事件。
我们修改一下两个InboundHandler来加以验证。
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class InboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); //在此结束调用,不会在管道中继续传递 ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 alone \n".getBytes())); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
重启EchoServer,打开telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfljioldf
InboundHandler1 sdfljioldf //此处InboundHandler1只是简单的输出了,没有被OutboundHandler监听。
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone //此处InboundHandler2被OutboundHandler监听,所以会追加输出OutBoundHandler1 OutboundHandler2
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf //此处也是InboundHandler2被OutboundHandler监听输出的。
服务端日志
2019-10-02 11:04:09.345 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 alone
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf
跟我们分析的一样。
但如果我们修改一下InboundHandler1的代码如下
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
或者
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
现在无论InboundHandler1的位置放前、放后,是否被OutboundHandler监听,它都会流通整个管道。
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler1 alone ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg
服务端日志为
2019-10-02 11:18:10.761 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:ljlfrg
2019-10-02 11:18:10.764 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler1 alone ljlfrg
2019-10-02 11:18:10.764 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler1 alone ljlfrg
2019-10-02 11:18:10.767 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 ljlfrg
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 alone
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 ljlfrg
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg
结论:
1)InboundHandler顺序执行,OutboundHandler逆序执行
2)InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)
3)InboundHandler通过ctx.write(msg),则会传递到outboundHandler
4) 使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行;
但是使用channel.write(msg)、pipline.write(msg)情况会不一致,都会执行
5) outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先outbound再inbound,服务端则相反
7、Netty异步操作模块ChannelFuture
Netty中的所有I/O操作都是异步的,这意味着任何I/O调用都会立即返回,而ChannelFuture会提供有关的信息I/O操作的结果或状态。
ChannelFuture的使用主要是在EchoServer中
//绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync();
1)ChannelFuture状态:
未完成:当I/O操作开始时,将创建一个新的对象,新的最初是未完成的 - 它既没有成功,也没有被取消,因为I/O操作尚未完成。
已完成:当I/O操作完成,不管是成功、失败还是取消,Future都是标记为已完成的, 失败的时候也有具体的信息,例如原因失败,但请注意,即使失败和取消属于完成状态。
ChannelFuture的四种状态
* +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+
注意:不要在IO线程内调用future对象的sync或者await方法
不能在channelHandler中调用sync或者await方法
2)ChannelPromise:继承于ChannelFuture,进一步拓展用于设置IO操作的结果
其中的继承关系图如上所示,第一个Future是JDK自带的,第二个Future是netty中增加的。关于future一般的使用方法可以参考Fork/Join框架原理和使用探秘
Netty编写的网络数据传输中的编码和解码
前面说的:高性能RPC框架的3个要素:IO模型、数据协议、线程模型
最开始接触的编码码:java序列化/反序列化(就是编解码)、url编码、base64编解码
为啥jdk有编解码,还要netty自己开发编解码?
java自带序列化的缺点
1)无法跨语言
2) 序列化后的码流太大,也就是数据包太大
3) 序列化和反序列化性能比较差
业界里面也有其他编码框架: google的 protobuf(PB)、Facebook的Trift、Jboss的Marshalling、Kyro等,关于Kyro的使用方法可以参考浅析克隆 。
Netty里面的编解码:
解码器:负责处理“入站 InboundHandler”数据
编码器:负责“出站 OutboundHandler” 数据
Netty里面提供默认的编解码器,也支持自定义编解码器
Encoder:编码器
Decoder:解码器
Codec:编解码器
Netty的解码器Decoder和使用场景
Decoder对应的就是ChannelInboundHandler,主要就是字节数组转换为消息对象,在我们之前的样例中,都是处理一些简单的字符串来进行消息传递,但如果我们需要转换的是Java的对象的话,则需要使用到Decoder。
主要是两个方法
decode (常用)
decodeLast (用于最后的几个字节处理,也就是channel关闭的时候,产生的最后一个消息)
抽象解码器
1)ByteToMessageDecoder
用于将字节转为消息,需要检查缓冲区是否有足够的字节,通过源码可知,ByteToMessageDecoder实际上就是一个ChannelInboundHandler。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
2)ReplayingDecoder
继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,不是所有的ByteBuf都支持
选择:项目复杂性高则使用ReplayingDecoder,否则使用 ByteToMessageDecoder
解码器具体的实现,用的比较多的是(更多是为了解决TCP底层的粘包和拆包问题)
DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
LineBasedFrameDecoder: 以换行符为结束标志的解码器
FixedLengthFrameDecoder:固定长度解码器
LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
StringDecoder:文本解码器,将接收到的对象转化为字符串,一般会与上面的进行配合,然后在后面添加业务handle
Netty编码器Encoder
Encoder对应的就是ChannelOutboundHandler,消息对象转换为字节数组
Netty本身未提供和解码一样的编码器,是因为场景不同,两者非对等的
1)MessageToByteEncoder
消息转为字节数组,调用write方法,会先判断当前编码器是否支持需要发送的消息类型,如果不支持,则透传;其本身其实就是一个ChannelOutboundHandler。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
2)MessageToMessageEncoder
用于从一种消息编码为另外一种消息(例如POJO到POJO)
Netty编解码器类Codec
组合解码器和编码器,以此提供对于字节和消息都相同的操作(一般不常用)
优点:成对出现,编解码都是在一个类里面完成
缺点:耦合在一起,拓展性不佳
Codec:组合编解码
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解码
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:编码
1)ByteToMessageEncoder
2)MessageToMessageEncoder
什么是TCP粘包拆包
1)TCP拆包: 一个完整的包可能会被TCP拆分为多个包进行发送
2)TCP粘包: 把多个小的包封装成一个大的数据包发送, client发送的若干数据包 Server接收时粘成一包
发送方和接收方都可能出现这个原因
发送方的原因:TCP默认会使用Nagle算法
接收方的原因: TCP接收到数据放置缓存中,应用程序从缓存中读取
UDP: 是没有粘包和拆包的问题,有边界协议
2、TCP半包读写常见解决方案
简介:讲解TCP半包读写常见的解决办法
发送方:可以关闭Nagle算法
接受方: TCP是无界的数据流,并没有处理粘包现象的机制, 且协议本身无法避免粘包,半包读写的发生需要在应用层进行处理
应用层解决半包读写的办法
1)设置定长消息 (10字符)
xdclass000xdclass000xdclass000xdclass000
2)设置消息的边界 ($$ 切割)
sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$
3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息
Header+Body
3、Netty自带解决TCP半包读写方案
DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
LineBasedFrameDecoder: 以换行符为结束标志的解码器
FixedLengthFrameDecoder:固定长度解码器
LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
现在我们来做一个实验,如果不使用解码器,会产生什么样的效果。
服务端的入站事件处理器
@Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; byte[] bytes = new byte[data.readableBytes()]; //将收到到字节流放入内存字节数组 data.readBytes(bytes); //将字节数组转成字符串,并截取该字符串从0到换行符(不包含换行符)到子串 //System.getProperty("line.separator")为获取操作系统的换行符,每种操作系统可能各不相同 String body = new String(bytes, CharsetUtil.UTF_8) .substring(0,bytes.length - System.getProperty("line.separator").length()); log.info("服务端收到消息内容为:" + body + "收到消息次数:" + ++counter); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端入站事件处理器
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; //定义一段包含换行符的字符串,并转化为字节数组 byte[] req = ("guanjian.net" + System.getProperty("line.separator")).getBytes(); //连续发送10条该字符串到服务端 for (int i = 0;i < 10;i++) { //申请一段内存缓冲区 msg = Unpooled.buffer(req.length); //将字节数组写入缓冲区 msg.writeBytes(req); //发送该字符串到服务端 ctx.writeAndFlush(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
分别启动服务端和客户端
我们可以看到服务端日志为
2019-10-05 08:41:11.733 INFO 614 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net收到消息次数:1
虽然客户端是分了10次发送到数据,但是服务端却是只接收了一次,收到消息次数1,这明显是一次粘包。
现在我们给客户端加上.option(ChannelOption.TCP_NODELAY,true)来看一下是什么情况
@AllArgsConstructor public class EchoClient { private String host; private int port; public void run() throws InterruptedException { //客户端处理线程组(其实就是一个线程池) EventLoopGroup group = new NioEventLoopGroup(); try { //客户端netty启动对象 Bootstrap bootstrap = new Bootstrap(); //将客户端线程组添加到启动对象中 bootstrap.group(group) //给启动对象添加Socket管道 .channel(NioSocketChannel.class) //主动连接到远程服务器IP端口 .remoteAddress(new InetSocketAddress(host,port)) .option(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); //连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞直到客户端通道关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 group.shutdownGracefully(); } } }
服务端日志为
2019-10-05 08:45:52.654 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:1
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:2
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:3
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:4
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:5
2019-10-05 08:45:52.656 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:6
我们可以看到,虽然加了.option(ChannelOption.TCP_NODELAY,true),但并不能保证它不产生粘包,有些包包含了两条字符串,收到的消息也就不是10次了。
现在我们把换行解码器LineBasedFrameDecoder放入
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 //这个1024是在1024个字节内去寻找换行符,如果在1024个字节内没有找到换行符,就会报错 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
再次运行,服务端日志
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:1
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:2
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:3
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:4
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:5
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:6
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:7
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:8
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:9
2019-10-05 09:04:15.694 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:10
LineBasedFrameDecoder是继承于ByteToMessageDecoder,是一个ChannelInboundHandler
public class LineBasedFrameDecoder extends ByteToMessageDecoder
由上面可见,粘包经过LineBasedFrameDecoder处理,再逐条发往下一个ChannelInboundHandler,而不是从客户端逐条发送过来的。
如果我们觉得ServerHandler的channelRead方法太麻烦了,还要由字节数组转成字符串,那我们直接将收到的信息强制转成字符串会怎么样呢?
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; log.info("服务端收到消息内容为:" + body + "收到消息次数:" + ++counter); }
启动后服务端报错如下(部分)
java.lang.ClassCastException: io.netty.buffer.PooledSlicedByteBuf cannot be cast to java.lang.String
at com.guanjian.websocket.netty.packet.ServerHandler.channelRead(ServerHandler.java:29)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
说明此处无法将收到的信息直接强制转换成字符串,这个时候我们可以加入字符串解码器StringDecoder.
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 //这个1024是在1024个字节内去寻找换行符,如果在1024个字节内没有找到换行符,就会报错 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); //字符串解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
启动运行,服务端日志为
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:1
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:2
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:3
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:4
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:5
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:6
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:7
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:8
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:9
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:10
使用解码器LineBasedFrameDecoder解决半包读写问题
1)LineBaseFrameDecoder 以换行符为结束标志的解码器 ,构造函数里面的数字表示最长遍历的帧数
2)StringDecoder解码器将对象转成字符串
现在我们修改一下客户端处理器ClientHandler。
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)"; ByteBuf buf = null; //申请一段内存缓冲区 buf = Unpooled.buffer(message.getBytes().length); //将字节数组写入缓冲区 buf.writeBytes(message.getBytes()); //发送字节数组到服务端 ctx.writeAndFlush(buf); }
我们可以看到message字符串中都带有&_字符,我们将在服务端以该字符为分隔符进行解码。
修改EchoServer到代码如下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 //使用指定消息分隔符解码器进行解码 //1024的意思是在1024个字节内查找&_,如果找不到就会抛出异常 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); //字符串解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
分别启动服务端,客户端。服务端到日志如下
2019-10-06 12:55:36.078 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:Starting EchoclientApplication 收到消息次数:1
2019-10-06 12:55:36.078 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:on admindeMBP.lan with PID 741 收到消息次数:2
2019-10-06 12:55:36.079 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次数:3
对比message
"Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)"
我们会发现最后一句/Users/admin/Downloads/nettyecho)被丢弃了,所以要全部拿到我们需要的消息,我们需要在最后一段字符串中加入&_
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)&_"; ByteBuf buf = null; //申请一段内存缓冲区 buf = Unpooled.buffer(message.getBytes().length); //将字节数组写入缓冲区 buf.writeBytes(message.getBytes()); //发送字节数组到服务端 ctx.writeAndFlush(buf); }
再次启动客户端,服务端日志为
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:Starting EchoclientApplication 收到消息次数:1
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:on admindeMBP.lan with PID 741 收到消息次数:2
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次数:3
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:/Users/admin/Downloads/nettyecho)收到消息次数:4
其实DelimiterBasedFrameDecoder有很多个构造器。我们这里使用的两参构造器其实调用到是一个三参构造器。
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { this(maxFrameLength, true, delimiter); }
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) { this(maxFrameLength, stripDelimiter, true, delimiter); }
而三参构造器其实是调用了一个四参构造器
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf delimiter) { this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] { delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())}); }
我们来说明一下这个四参构造器每个参数到含义。
maxFrameLength:
表示一行最大的长度,如果超过这个长度依然没有检测自定义分隔符,将会抛出TooLongFrameException
stripDelimiter:
解码后的消息是否去除掉分隔符(true去掉分隔符,flase保留分隔符)
failFast:
如果为true,则超出maxLength后立即抛出TooLongFrameException,不进行继续解码
如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常
delimiters:
分隔符,ByteBuf类型
现在我们来看一下第二个参数到含义。
修改EchoServer如下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 //使用指定消息分隔符解码器进行解码,1024为在1024个字节内查找分隔符(可以自己任意定义),如果 //找不到会抛出异常 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,false,true,delimiter)); //字符串解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
重新启动服务端,客户端。服务端到日志如下
2019-10-06 13:33:24.806 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:Starting EchoclientApplication &_收到消息次数:1
2019-10-06 13:33:24.807 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:on admindeMBP.lan with PID 741 &_收到消息次数:2
2019-10-06 13:33:24.807 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_收到消息次数:3
2019-10-06 13:33:24.808 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:/Users/admin/Downloads/nettyecho)&_收到消息次数:4
自定义长度半包读写器LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder也有很多构造器,一般我们使用的是一个五参构造器
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this( maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
当然它调用的是一个六参构造器。
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this( ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); }
这六个参数的含义如下
maxFrameLength 数据包的最大长度
lengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节之后的才是消息体字段
lengthFieldLength 长度字段占的字节数, 帧数据长度的字段本身的长度
lengthAdjustment
一般 Header + Body,添加到长度字段的补偿值,如果为负数,开发人员认为这个 Header的长度字段是整个消息包的长度,则Netty应该减去对应的数字
initialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包之后,忽略前面的指定位数的长度字节,应用解码器拿到的就是不带长度域的数据包
failFast 是否快速失败
六参构造器调用到是一个七参构造器(不讨论)。
现在我们来设置一个存储消息内容到消息类。(服务端和客户端相同)
public class CCMessageHeader { @Getter @Setter private byte[] messageFlag = new byte[2]; //消息长度偏移量 @Getter @Setter private int length; //消息长度 @Getter @Setter private int type; //消息类型 @Getter private String data; //消息内容 public CCMessageHeader() { //170,这两个数没有实际意义,只用来描述消息长度偏移量而填充进去的 messageFlag[0] = (byte) 0xaa; //187 messageFlag[1] = (byte) 0xbb; } public void setData(String data) { this.data = data; //消息体的长度为字符串data的长度加4,4为type整形的长度 this.length = data.length() + 4; } }
服务端入站处理器
@Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf body = (ByteBuf) msg; //如果字节流到可读字节数小于等于0,转给下一个InboundHandler处理 if (body.readableBytes() <= 0) { ctx.fireChannelRead(msg); } //初始化一个接收消息对象 CCMessageHeader recHd = new CCMessageHeader(); //字节流读取到的第一个整形赋给nLength int nLength = body.readInt(); //字节流读取到到第二个整形赋给nType int nType = body.readInt(); //获取字节流的可读字节数,并分配一个该大小的字节数组 int nDataSize = body.readableBytes(); byte[] aa = new byte[nDataSize]; //将字节流读入字节数组 body.readBytes(aa); //将该字节数组转成字符串 String myMsg = new String(aa, CharsetUtil.UTF_8); log.info("收到 " + ++counter + "次消息:[" + myMsg + "],类型为[" + nType + "]"); //初始化一个发送消息对象 CCMessageHeader hd = new CCMessageHeader(); hd.setType(2); hd.setData("server data..."); //申请一段直接缓冲空间 ByteBuf echo = Unpooled.directBuffer(); //将该发送消息对象的各属性写入到缓冲空间中 echo.writeBytes(hd.getMessageFlag()); echo.writeInt(hd.getLength()); echo.writeInt(hd.getType()); echo.writeCharSequence(hd.getData(),CharsetUtil.UTF_8); //将该发送消息对象以字节流到形式发送到客户端 ctx.writeAndFlush(echo); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoServer如下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工作线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(类似于NIO中的Channel或者BIO中的Socket,意思差不多) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法; //如果要减少发送次数,就设置为false,会累积一定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将我们自己编写的事件处理器添加到客户端的连接管道中 //这里为socketChannel(客户端连接管道),有别于NioServerSocketChannel //这里可以添加很多的事件处理器,其实Netty有很多内置的事件处理器可以使用 //pipeline()可以理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //作业的工人,我们可以往一条流水线上投放很多的工人 //65535表示对自定义长度解码器对最大处理长度, // 第二个2表示长度信息从第二个字节后开始获取, //其实这个2是messageFlag的长度,即获取长度字段的偏移量。 //4表示长度字段占4个字节,即private int length的字节数,一个整数占4个字节 //0表示添加到长度字段的补偿值,这里不需要补偿 //最后一个2表示获取我们的消息体,要去掉2个长度字段的字节数,因为除了length字段还有一个 //type的整形字段,所以是2才能拿到data字段,即显示消息体字段,但其实消息体包含了type字段的 socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2)); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
客户端入站处理器
@Slf4j public class ClientHandler2 extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //初始化一个发送消息对象 CCMessageHeader hd = new CCMessageHeader(); //设置类型为1 hd.setType(1); for (int i = 0;i < 10;i++) { String strData = String.format("client data %d...",i + 1); //设置消息体 hd.setData(strData); //申请一段直接缓冲区 ByteBuf echo = Unpooled.directBuffer(); //将消息对象的各个属性写入该缓冲区 echo.writeBytes(hd.getMessageFlag()); echo.writeInt(hd.getLength()); echo.writeInt(hd.getType()); echo.writeCharSequence(hd.getData(), CharsetUtil.UTF_8); //将该缓冲区的字节流发送到服务端 ctx.writeAndFlush(echo); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf body = (ByteBuf) msg; //初始化接收消息对象 CCMessageHeader recHd = new CCMessageHeader(); //获取第一个整形赋给nLength,即消息长度 int nLength = body.readInt(); //获取第二个整形赋给nType int nType = body.readInt(); //获取整个字节流的可读字节数,并以此建立一个字节数组 int nDataSize = body.readableBytes(); byte[] aa = new byte[nDataSize]; //将字节流读入该字节数组 body.readBytes(aa); //将该字节数组转换为字符串 String myMsg = new String(aa,CharsetUtil.UTF_8); log.info("收到" + ++counter + "次消息[" + myMsg + "],类型为:[" + nType + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoClient代码如下
@AllArgsConstructor public class EchoClient { private String host; private int port; public void run() throws InterruptedException { //客户端处理线程组(其实就是一个线程池) EventLoopGroup group = new NioEventLoopGroup(); try { //客户端netty启动对象 Bootstrap bootstrap = new Bootstrap(); //将客户端线程组添加到启动对象中 bootstrap.group(group) //给启动对象添加Socket管道 .channel(NioSocketChannel.class) //主动连接到远程服务器IP端口 .remoteAddress(new InetSocketAddress(host,port)) .option(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必须要实现的抽象方法 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2)); socketChannel.pipeline().addLast(new ClientHandler2()); } }); //连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞直到客户端通道关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 group.shutdownGracefully(); } } }
运行服务端,客户端。服务端日志如下
2019-10-06 15:27:54.601 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 1次消息:[client data 1...],类型为[1]
2019-10-06 15:27:54.604 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 2次消息:[client data 2...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 3次消息:[client data 3...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 4次消息:[client data 4...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 5次消息:[client data 5...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 6次消息:[client data 6...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 7次消息:[client data 7...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 8次消息:[client data 8...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 9次消息:[client data 9...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 10次消息:[client data 10...],类型为[1]
客户端日志如下
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到1次消息[server data...],类型为:[2]
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到2次消息[server data...],类型为:[2]
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到3次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到4次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到5次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到6次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到7次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到8次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到9次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到10次消息[server data...],类型为:[2]
这样就分别完成了自定义长度的半包解码