Netty权威指南之TCP粘包/拆包
Netty权威指南之TCP粘包/拆包
stefanzhlg 发表于3年前
Netty权威指南之TCP粘包/拆包
  • 发表于 3年前
  • 阅读 527
  • 收藏 1
  • 点赞 1
  • 评论 1

标题:腾讯云 新注册用户域名抢购1元起>>>   

摘要: TCP底层并不了解上层业务数据的具体含义,他会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据报发送,这就是所谓的TCP粘包和拆包的问题。

TCP粘包/拆包发生的原因:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小。

  2. 进行MSS大小的TCP分段。

  3. 以太网帧的payload大于MTU进行IP分片。


粘包问题的解决策略

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,不空格。

  2. 在包尾增加回车换行符进行分割,例如FTP协议。

  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息总长度。

  4. 更复杂的应用层协议。

未考虑TCP粘包导致功能异常的案例

    

package nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServer {

    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHander());

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    private class ChildChannelHander extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            //arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    /**
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeServer().bind(port);
    }

}



package nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeClient {
    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0)
                                throws Exception {
                            
                            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //arg0.pipeline().addLast(new StringDecoder());
                            arg0.pipeline().addLast(new TimeClientHandler());
                        }

                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeClient().connect(port, "127.0.0.1");
    }
}


package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;
import java.util.Date;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length
                - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    public void exceptionCaugh(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}

package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("Query time order" + System.getProperty("line.separator"))
                .getBytes();
    }

    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;

        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);

    }

    public void exceptionCaught(ChannelHandlerContext ctx) {
        ctx.close();
    }
}

运行结果:

The time server receive order :Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time ord ; the counter is:1
The time server receive order :
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order ; the counter is:2

Now is :Bad order
Bad order
;the counter is :1

结果发现服务端只接受到了两条消息,说明发生了TCP粘包。

利用LineBasedFrameDecoder解决TCP粘包问题

我们只需要把TimeServer类中的加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeServerHandler类channelRead方法变成:

public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        String body = (String) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8").substring(0, req.length
        //        - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

TimeClient类中加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeClientHandler类channelRead方法变为:

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        //ByteBuf buf = (ByteBuf) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8");
        String body = (String) msg;
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);
    }

运行结果就正常了,由于blog字数限制就不贴出来了。

LineBasedFrameDecoder 和 StringDecoderde 原理分析

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。

StringDecoderde将接收到的对象转成字符串,然后继续调用后面的handler。

LineBasedFrameDecoder+StringDecoderde组合就是按行切换的文本解码器,以支持TCP粘包和拆包。


标签: Netty TCP 粘包/拆包
共有 人打赏支持
stefanzhlg
粉丝 57
博文 110
码字总数 85559
作品 4
评论 (1)
geguol
:smile:
×
stefanzhlg
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: