文档章节

Netty权威指南之TCP粘包/拆包

stefanzhlg
 stefanzhlg
发布于 2015/02/12 16:07
字数 1417
阅读 589
收藏 1

TCP粘包/拆包发生的原因:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小。

  2. 进行MSS大小的TCP分段。

  3. 以太网帧的payload大于MTU进行IP分片。


粘包问题的解决策略

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,不空格。

  2. 在包尾增加回车换行符进行分割,例如FTP协议。

  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息总长度。

  4. 更复杂的应用层协议。

未考虑TCP粘包导致功能异常的案例

    

package nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServer {

    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHander());

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    private class ChildChannelHander extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            //arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    /**
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeServer().bind(port);
    }

}



package nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeClient {
    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0)
                                throws Exception {
                            
                            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //arg0.pipeline().addLast(new StringDecoder());
                            arg0.pipeline().addLast(new TimeClientHandler());
                        }

                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeClient().connect(port, "127.0.0.1");
    }
}


package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;
import java.util.Date;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length
                - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    public void exceptionCaugh(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}

package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("Query time order" + System.getProperty("line.separator"))
                .getBytes();
    }

    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;

        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);

    }

    public void exceptionCaught(ChannelHandlerContext ctx) {
        ctx.close();
    }
}

运行结果:

The time server receive order :Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time ord ; the counter is:1
The time server receive order :
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order ; the counter is:2

Now is :Bad order
Bad order
;the counter is :1

结果发现服务端只接受到了两条消息,说明发生了TCP粘包。

利用LineBasedFrameDecoder解决TCP粘包问题

我们只需要把TimeServer类中的加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeServerHandler类channelRead方法变成:

public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        String body = (String) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8").substring(0, req.length
        //        - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

TimeClient类中加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeClientHandler类channelRead方法变为:

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        //ByteBuf buf = (ByteBuf) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8");
        String body = (String) msg;
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);
    }

运行结果就正常了,由于blog字数限制就不贴出来了。

LineBasedFrameDecoder 和 StringDecoderde 原理分析

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。

StringDecoderde将接收到的对象转成字符串,然后继续调用后面的handler。

LineBasedFrameDecoder+StringDecoderde组合就是按行切换的文本解码器,以支持TCP粘包和拆包。


© 著作权归作者所有

共有 人打赏支持
stefanzhlg

stefanzhlg

粉丝 63
博文 110
码字总数 85559
作品 4
浦东
程序员
加载中

评论(1)

geguol
geguol
:smile:
Netty 粘包/拆包应用案例及解决方案分析

熟悉TCP变成的可以知道,无论是客户端还是服务端,但我们读取或者发送消息的时候,都需要考虑TCP底层粘包/拆包机制,下面我们先看一下TCP 粘包/拆包和基础知识,然后模拟一个没有考虑TCP粘包...

Java/Python
08/21
0
0
Netty 之入门应用

说明 系列文章:http://www.jianshu.com/p/594441fb9c9e 本文完全参考自《Netty权威指南(第2版)》,李林峰著。 Netty 环境搭建 例程使用构建工程,在pom文件中,加入Netty的依赖。 服务端程...

被称为L的男人
2017/09/02
0
0
Netty精粹之TCP粘包拆包问题

粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生这个问题,因此这篇文章只...

Float_Luuu
2016/02/27
8.6K
0
【Netty】Netty实例开源项目

版权声明:本文为谙忆原创文章,转载请附上本文链接,谢谢。 https://blog.csdn.net/qq_26525215/article/details/81989644 Netty netty-not-sticky-pack-demo 项目地址 Netty 本篇博客讲解:...

谙忆
08/23
0
0
Netty5入门学习笔记003-TCP粘包/拆包问题的解决之道(下)

TCP网络通信时候会发生粘包/拆包的问题,上节使用定长解码器解码,本次使用Netty提供的特殊分隔符解码器 还是用上节中的代码例子,但是只需要修改一下发送的消息和配置一下解码器就可以了 客...

山东-小木
2014/12/18
0
4

没有更多内容

加载失败,请刷新页面

加载更多

day96-20180923-英语流利阅读-待学习

英国王子也不看好人工智能,理由却和霍金不同 Daniel 2018-09-23 1.今日导读 2016 年 3 月 9 日至 15 日,世界围棋冠军李世石与谷歌研发的计算机围棋程序 AlphaGo 进行人机大战并以 1 比 4 ...

飞鱼说编程
今天
4
0
今天在码云遇到一个很有意思的人 for Per.js

今天在码云遇到一个很有意思的人,他在我的Per.js项目下面评论了一句,大意为“你试试这句代码,看看速度到底是你快还是Vue快”【当然,这个评论被我手残不小心删掉了...】。 然后我就试了,...

Skyogo
今天
36
0
Java -------- 首字母相关排序总结

Java 字符串数组首字母排序 字符串数组按首字母排序:(区分大小写) String[] strings = new String[]{"ba","aa","CC","Ba","DD","ee","dd"}; Arrays.sort(strings); for (int i ...

切切歆语
今天
4
0
还在用 Git 的 -f 参数强推仓库,你这是在作死!

最近,美国一个程序员因为同事不写注释,代码不规范,最严重的是天天使用 git push -f 参数强行覆盖仓库,该程序员忍无可忍向四名同事开抢,其中一人情况危急!!! 不写注释、代码不规范是一...

红薯
今天
573
0
NPM报错终极大法

所有的错误基本上都跟node的版本相关 直接删除系统中的node 重新安装 sudo rm -rf /usr/local/{bin/{node,npm},lib/node_modules/npm,lib/node,share/man/*/node.*} 重新安装 $ n lts$ npm...

lilugirl
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部