文档章节

Netty权威指南之TCP粘包/拆包

stefanzhlg
 stefanzhlg
发布于 2015/02/12 16:07
字数 1417
阅读 582
收藏 1
点赞 1
评论 1

TCP粘包/拆包发生的原因:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小。

  2. 进行MSS大小的TCP分段。

  3. 以太网帧的payload大于MTU进行IP分片。


粘包问题的解决策略

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,不空格。

  2. 在包尾增加回车换行符进行分割,例如FTP协议。

  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息总长度。

  4. 更复杂的应用层协议。

未考虑TCP粘包导致功能异常的案例

    

package nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServer {

    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHander());

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    private class ChildChannelHander extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            //arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    /**
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeServer().bind(port);
    }

}



package nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeClient {
    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0)
                                throws Exception {
                            
                            //arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //arg0.pipeline().addLast(new StringDecoder());
                            arg0.pipeline().addLast(new TimeClientHandler());
                        }

                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args != null && args.length > 0) {
            port = Integer.valueOf(args[0]);
        }
        new TimeClient().connect(port, "127.0.0.1");
    }
}


package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;
import java.util.Date;

/**
 * @author zhaoliangang 2015-2-12
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length
                - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    public void exceptionCaugh(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}

package nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("Query time order" + System.getProperty("line.separator"))
                .getBytes();
    }

    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;

        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);

    }

    public void exceptionCaught(ChannelHandlerContext ctx) {
        ctx.close();
    }
}

运行结果:

The time server receive order :Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time ord ; the counter is:1
The time server receive order :
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order
Query time order ; the counter is:2

Now is :Bad order
Bad order
;the counter is :1

结果发现服务端只接受到了两条消息,说明发生了TCP粘包。

利用LineBasedFrameDecoder解决TCP粘包问题

我们只需要把TimeServer类中的加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeServerHandler类channelRead方法变成:

public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        String body = (String) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8").substring(0, req.length
        //        - System.getProperty("line.separator").length());

        System.out.println("The time server receive order :" + body
                + " ; the counter is:" + ++counter);

        String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "Bad order";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

TimeClient类中加入

arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

TimeClientHandler类channelRead方法变为:

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws UnsupportedEncodingException {
        //ByteBuf buf = (ByteBuf) msg;
        //byte[] req = new byte[buf.readableBytes()];
        //buf.readBytes(req);
        //String body = new String(req, "UTF-8");
        String body = (String) msg;
        System.out.println("Now is :" + body + ";the counter is :" + ++counter);
    }

运行结果就正常了,由于blog字数限制就不贴出来了。

LineBasedFrameDecoder 和 StringDecoderde 原理分析

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。

StringDecoderde将接收到的对象转成字符串,然后继续调用后面的handler。

LineBasedFrameDecoder+StringDecoderde组合就是按行切换的文本解码器,以支持TCP粘包和拆包。


© 著作权归作者所有

共有 人打赏支持
stefanzhlg

stefanzhlg

粉丝 61
博文 110
码字总数 85559
作品 4
浦东
程序员
加载中

评论(1)

geguol
geguol
:smile:
Netty 之入门应用

说明 系列文章:http://www.jianshu.com/p/594441fb9c9e 本文完全参考自《Netty权威指南(第2版)》,李林峰著。 Netty 环境搭建 例程使用构建工程,在pom文件中,加入Netty的依赖。 服务端程...

被称为L的男人 ⋅ 2017/09/02 ⋅ 0

Netty精粹之TCP粘包拆包问题

粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生这个问题,因此这篇文章只...

Float_Luuu ⋅ 2016/02/27 ⋅ 0

Netty5入门学习笔记003-TCP粘包/拆包问题的解决之道(下)

TCP网络通信时候会发生粘包/拆包的问题,上节使用定长解码器解码,本次使用Netty提供的特殊分隔符解码器 还是用上节中的代码例子,但是只需要修改一下发送的消息和配置一下解码器就可以了 客...

山东小木 ⋅ 2014/12/18 ⋅ 4

Netty解决半包(TCP粘包/拆包导致)读写问题

TCP粘包/拆包 TCP是个"流"协议,所谓流,就是没有界限没有分割的一串数据。TCP会根据缓冲区的实际情况进行包划分,一个完整的包可能会拆分成多个包进行发送,也用可能把多个小包封装成一个大...

天冰 ⋅ 2014/06/19 ⋅ 0

netty入门笔记

尝试一下helloword demo -->官方HelloWord -->简书的入门级netty聊天demo 地址:https://www.jianshu.com/p/216881b0573d 2.netty入门级群聊demo,发现有3个问题 问题1:没有显示用户的名字 (现...

谜男amu ⋅ 01/23 ⋅ 0

Netty5入门学习笔记001

Netty官网:http://netty.io/ 本例程使用最新的netty5.x版本编写 服务器端: TimeServer 时间服务器 服务端接收客户端的连接请求和查询当前时间的指令,判断指令正确后响应返回当前服务器的校...

山东小木 ⋅ 2014/12/17 ⋅ 10

Netty构建游戏服务器(四)--JAVA与unity通过protobuf通信

一,概要 1,服务器端采用Netty,前面的教程已经讲述了netty的基本用法和原理。使用的是java自带的编码解码方式,现在为了和unity通信,选用google备受关注的protobuf来编码解码。protobuf优...

安世博 ⋅ 2016/09/20 ⋅ 0

Netty5入门学习笔记002-TCP粘包/拆包问题的解决之道(上)

TCP网络通信时候会发生粘包/拆包的问题,接下来探讨其解决之道。 什么是粘包/拆包 一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是...

山东小木 ⋅ 2014/12/17 ⋅ 11

Dubbo处理TCP拆包粘包问题

Dubbo处理TCP拆包粘包问题 在TCP网络传输工程中,由于TCP包的缓存大小限制,每次请求数据有可能不在一个TCP包里面,或者也可能多个请求的数据在一个TCP包里面。那么如果合理的decode接受的T...

Bieber ⋅ 2015/08/03 ⋅ 13

TCP的拆包和粘包

在我们TCP传输数据的时候,因为TCP底层并不了解上层业务数据的具体含义,它会根据缓冲区的实际情况进行包的划分,可能会把一个大的包拆成多个包进行发送,也有可能将很多小包封装成大的数据发...

black_lxf_720 ⋅ 2016/03/02 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

BS与CS的联系与区别【简】

C/S是Client/Server的缩写。服务器通常采用高性能的PC、工作站或小型机,并采用大型数据库系统,如Oracle、Sybase、InFORMix或 SQL Server。客户端需要安装专用的客户端软件。 B/S是Brower/...

anlve ⋅ 38分钟前 ⋅ 0

发生了什么?Linus 又发怒了?

在一个 Linux 内核 4.18-rc1 的 Pull Request 中,开发者 Andy Shevchenko 表示其在对设备属性框架进行更新时,移除了 union 别名,这引发了 Linus 的暴怒。 这一次 Linus Torvalds 发怒的原...

问题终结者 ⋅ 58分钟前 ⋅ 0

在树莓派上搭建一个maven仓库

在树莓派上搭建一个maven仓库 20180618 lambo init 项目说明 家里有台树莓派性能太慢。想搭建一个maven私服, 使用nexus或者 jfrog-artifactory 运行的够呛。怎么办呢,手写一个吧.所在这个...

林小宝 ⋅ 今天 ⋅ 0

Spring发展历程总结

转自与 https://www.cnblogs.com/RunForLove/p/4641672.html 目前很多公司的架构,从Struts2迁移到了SpringMVC。你有想过为什么不使用Servlet+JSP来构建Java web项目,而是采用SpringMVC呢?...

onedotdot ⋅ 今天 ⋅ 0

Python模块/包/库安装(6种方法)

Python模块/包/库安装(6种方法) 冰颖机器人 2016-11-29 21:33:26 一、方法1: 单文件模块 直接把文件拷贝到 $python_dir/Lib 二、方法2: 多文件模块,带setup.py 下载模块包(压缩文件zip...

cswangyx ⋅ 今天 ⋅ 0

零基础学习大数据人工智能,学习路线篇!系统规划大数据之路?

大数据处理技术怎么学习呢?首先我们要学习Python语言和Linux操作系统,这两个是学习大数据的基础,学习的顺序不分前后。 Python:Python 的排名从去年开始就借助人工智能持续上升,现在它已经...

董黎明 ⋅ 今天 ⋅ 0

openJdk和sun jdk的区别

使用过LINUX的人都应该知道,在大多数LINUX发行版本里,内置或者通过软件源安装JDK的话,都是安装的OpenJDK, 那么到底什么是OpenJDK,它与SUN JDK有什么关系和区别呢? 历史上的原因是,Ope...

jason_kiss ⋅ 今天 ⋅ 0

梳理

Redux 是 JavaScript 状态容器,提供可预测化的状态管理。 它是JS的状态容器,是一种解决问题的方式,所以即可以用于 react 也可以用于 vue。 需要理解其思想及实现方式。 应用中所有的 stat...

分秒 ⋅ 今天 ⋅ 0

Java 后台判断是否为ajax请求

/** * 是否是Ajax请求 * @param request * @return */public static boolean isAjax(ServletRequest request){return "XMLHttpRequest".equalsIgnoreCase(((HttpServletReques......

JavaSon712 ⋅ 今天 ⋅ 0

Redis 单线程 为何却需要事务处理并发问题

Redis是单线程处理,也就是命令会顺序执行。那么为什么会存在并发问题呢? 个人理解是,虽然redis是单线程,但是可以同时有多个客户端访问,每个客户端会有 一个线程。客户端访问之间存在竞争...

码代码的小司机 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部