文档章节

Netty权威指南之TCP粘包/拆包

stefanzhlg
 stefanzhlg
发布于 2015/02/12 16:07
字数 1417
阅读 592
收藏 1

TCP粘包/拆包发生的原因:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小。

  2. 进行MSS大小的TCP分段。

  3. 以太网帧的payload大于MTU进行IP分片。


粘包问题的解决策略

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,不空格。

  2. 在包尾增加回车换行符进行分割,例如FTP协议。

  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息总长度。

  4. 更复杂的应用层协议。

未考虑TCP粘包导致功能异常的案例

    

package nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServer {

    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHander());

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    private class ChildChannelHander extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            //arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    /**
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeServer().bind(port);
    }

}



package nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeClient {
    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0)
                                throws Exception {
                            
                            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //arg0.pipeline().addLast(new StringDecoder());
                            arg0.pipeline().addLast(new TimeClientHandler());
                        }

                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeClient().connect(port, "127.0.0.1");
    }
}


package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;
import java.util.Date;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length
                - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    public void exceptionCaugh(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}

package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("Query time order" + System.getProperty("line.separator"))
                .getBytes();
    }

    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;

        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);

    }

    public void exceptionCaught(ChannelHandlerContext ctx) {
        ctx.close();
    }
}

运行结果:

The time server receive order :Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time ord ; the counter is:1
The time server receive order :
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order ; the counter is:2

Now is :Bad order
Bad order
;the counter is :1

结果发现服务端只接受到了两条消息,说明发生了TCP粘包。

利用LineBasedFrameDecoder解决TCP粘包问题

我们只需要把TimeServer类中的加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeServerHandler类channelRead方法变成:

public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        String body = (String) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8").substring(0, req.length
        //        - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

TimeClient类中加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeClientHandler类channelRead方法变为:

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        //ByteBuf buf = (ByteBuf) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8");
        String body = (String) msg;
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);
    }

运行结果就正常了,由于blog字数限制就不贴出来了。

LineBasedFrameDecoder 和 StringDecoderde 原理分析

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。

StringDecoderde将接收到的对象转成字符串,然后继续调用后面的handler。

LineBasedFrameDecoder+StringDecoderde组合就是按行切换的文本解码器,以支持TCP粘包和拆包。


© 著作权归作者所有

共有 人打赏支持
stefanzhlg

stefanzhlg

粉丝 65
博文 110
码字总数 85559
作品 4
浦东
程序员
私信 提问
加载中

评论(1)

geguol
geguol
:smile:
Netty 之入门应用

说明 系列文章:http://www.jianshu.com/p/594441fb9c9e 本文完全参考自《Netty权威指南(第2版)》,李林峰著。 Netty 环境搭建 例程使用构建工程,在pom文件中,加入Netty的依赖。 服务端程...

被称为L的男人
2017/09/02
0
0
Netty 粘包/拆包应用案例及解决方案分析

熟悉TCP变成的可以知道,无论是客户端还是服务端,但我们读取或者发送消息的时候,都需要考虑TCP底层粘包/拆包机制,下面我们先看一下TCP 粘包/拆包和基础知识,然后模拟一个没有考虑TCP粘包...

Java/Python
08/21
0
0
Netty精粹之TCP粘包拆包问题

粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生这个问题,因此这篇文章只...

Float_Luuu
2016/02/27
8.6K
0
【Netty】Netty实例开源项目

版权声明:本文为谙忆原创文章,转载请附上本文链接,谢谢。 https://blog.csdn.net/qq_26525215/article/details/81989644 Netty netty-not-sticky-pack-demo 项目地址 Netty 本篇博客讲解:...

谙忆
08/23
0
0
DotNetty 版 开源 mqtt 客户端 (MqttFx)

一、DotNetty背景介绍 某天发现 dotnet 是个好东西,就找了个项目来练练手。于是有了本文的 Mqtt 客户端 (github: MqttFx ) DotNetty是微软的Azure团队,使用C#实现的Netty的版本发布。不但使...

linfx
10/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

2135亿背后,你所不知道的那些阿里瞬间

摘要: 01 10日下午五点。 上海梅赛德斯奔驰文化中心,天猫双11前夜的“猫晚”就在离我100米的地方。 台上正彩排,鲜衣怒马,烈焰繁花。但走进导播车,白天和黑夜好像没有区别,但你知道吗,...

阿里云官方博客
40分钟前
1
0
叶曼音频

道德经 道德经01a 道德经01b

亚林瓜子
49分钟前
1
0
Go 使用channel控制并发

前言 channel一般用于协程之间的通信,channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下channel也可轻易实现。 场景示例 总结 ...

恋恋美食
今天
3
0
斐波那契堆的理解,节点mark属性和势函数

斐波那契堆 看了好多博客,都是照搬算法导论的内容,没有自己的理解,比如为什么有mark属性,势函数的作用,以及为什么叫斐波那契堆,下面说说鄙人的理解。 势函数 势函数是根节点个数加上2...

杨喆
今天
7
0
NIO源码详解

阻塞io和无阻塞io: 阻塞io是指jdk1.4之前版本面向流的io,服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,如果没有则会一直等待或者遭到拒 ...

沉稳2018
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部