文档章节

Netty就是这么回事(八)

ACRushVoid
 ACRushVoid
发布于 2017/01/16 19:56
字数 2016
阅读 155
收藏 2

这一章主要介绍如何使用Netty开发自定义通信协议。我们知道有的时候可能我们不想用java的序列化,因为编码效率低,并且我们也不想使用protobuf,因为如果说我们的通信程序都是来自一个平台,那么用protobuf每次都需要重新利用工具生成文件也不是很好,那么就需要开发自己的通信协议。

在开发自定义通信协议之前,我们先来明确这么几个开发目标:首先,我们希望仍然通信在对象和字节之间做转换,对上层来说无感知;其次,我们的自定义协议通信程序也要有半包解码的能力。明确了目的之后,我们如何处理呢?我们需要依次开发自己的编码器和解码器,你可能会觉得以前都是用Netty自带的编解码器,自己开发是不是很困难啊。

当然不是!Netty给我们提供了MessageToByteEncoder和ByteToMessageDecoder这两个编解码器,他们具有半包处理的能力,我们只需要实现协议就可以了。

我们来看一下客户端和服务端的代码。

客户端代码:

package com.dlb.note.client;

import com.dlb.note.constant.ConstantValue;
import com.dlb.note.doj.Request;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 功能:自定义编码器客户端
 * 版本:1.0
 * 日期:2016/12/19 19:55
 * 作者:馟苏
 */
public class MySelfEncoderClient {
    /**
     * 主函数
     */
    public static void main(String []args) {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline().addLast(new MySelfEncoder());
                            channel.pipeline().addLast(new MySelfEncoderHandler());
                        }
                    });

            // 等待客户端链接成功
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            System.out.println("客户端链接成功!");

            // 等待客户端链接关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

/**
 * 请求编码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包头    | 模块号   | 命令号  |  长度   |   数据   |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节short
 * 命令号2字节short
 * 长度4字节(描述数据部分字节长度)
 * @author 馟苏
 */
class MySelfEncoder extends MessageToByteEncoder {
    /**
     * 编码
     * @param channelHandlerContext
     * @param rs
     * @param byteBuf
     * @throws Exception
     */
    protected void encode(ChannelHandlerContext channelHandlerContext, Object rs, ByteBuf byteBuf) throws Exception {
        Request request = (Request)(rs);

        //包头
        byteBuf.writeInt(ConstantValue.FLAG);
        //module
        byteBuf.writeShort(request.getModule());
        //cmd
        byteBuf.writeShort(request.getCmd());
        //长度
        byteBuf.writeInt(request.getDataLength());
        //data
        if(request.getData() != null){
            byteBuf.writeBytes(request.getData());
        }
    }
}

/**
 * 自定义处理器
 */
class MySelfEncoderHandler extends ChannelHandlerAdapter {
    // 可读
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

    }

    // 连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            Request request = new Request();

            request.setCmd((short) 100);
            request.setData("nihao".getBytes());
            request.setModule((short) 9000);

            ctx.writeAndFlush(request);
        }
    }

    // 关闭
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    // 异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.toString());
        ctx.close();
    }
}

服务端代码:

package com.dlb.note.server;

import com.dlb.note.doj.Request;
import com.dlb.note.server.decode.MySelfDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 功能:自定义解码器服务端
 * 版本:1.0
 * 日期:2016/12/15 12:47
 * 作者:馟苏
 */
public class MySelfDecoderServer {
    /**
     * 主函数
     */
    public static void main(String []args) {
        // 配置服务端的NIO线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 为管道添加处理器
                    .childHandler(new ChannelInitializer() {
                        // 初始化管道
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline()
                                    .addLast(new MySelfDecoder())
                                    .addLast(new MySelfDecoderHandler());
                        }
                    });

            // 绑定端口,同步等待成功
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            System.out.println("服务器在8888端口监听hello");

            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
            System.out.println("服务器关闭bye");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

/**
 * 自定义处理器
 */
class MySelfDecoderHandler extends ChannelHandlerAdapter {
    // 可读
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        System.out.println(request.toString());
    }

    // 连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client come,ip:" + ctx.channel().remoteAddress());
    }

    // 关闭
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    // 异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.toString());
        ctx.close();
    }
}

解码器:

package com.dlb.note.server.decode;

import com.dlb.note.constant.ConstantValue;
import com.dlb.note.doj.Request;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 请求解码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包头    | 模块号   | 命令号  |  长度   |   数据   |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节short
 * 命令号2字节short
 * 长度4字节(描述数据部分字节长度)
 */
/**
 * 功能:自定义消息解码器
 * 版本:1.0
 * 日期:2016/12/19 19:42
 * 作者:馟苏
 */
public class MySelfDecoder extends ByteToMessageDecoder {
    /**
     * 数据包基本长度
     */
    public static int BASE_LENTH = 4 + 2 + 2 + 4;

    /**
     * 解码器
     * @param channelHandlerContext
     * @param byteBuf
     * @param list
     * @throws Exception
     */
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
            throws Exception {
        // 可读长度必须大于基本长度
        if(byteBuf.readableBytes() >= BASE_LENTH){
            // 防止socket字节流攻击
            if(byteBuf.readableBytes() > 2048){
                byteBuf.skipBytes(byteBuf.readableBytes());
            }

            // 记录包头开始的index
            int beginReader;

            while(true){
                beginReader = byteBuf.readerIndex();
                byteBuf.markReaderIndex();
                if(byteBuf.readInt() == ConstantValue.FLAG){
                    break;
                }

                // 未读到包头,略过一个字节
                byteBuf.resetReaderIndex();
                byteBuf.readByte();

                // 长度又变得不满足
                if(byteBuf.readableBytes() < BASE_LENTH){
                    return;
                }
            }

            // 模块号
            short module = byteBuf.readShort();
            // 命令号
            short cmd = byteBuf.readShort();
            // 长度
            int length = byteBuf.readInt();

            // 判断请求数据包数据是否到齐
            if(byteBuf.readableBytes() < length){
                // 还原读指针
                byteBuf.readerIndex(beginReader);
                return;
            }

            // 读取data数据
            byte[] data = new byte[length];
            byteBuf.readBytes(data);

            Request request = new Request();
            request.setModule(module);
            request.setCmd(cmd);
            request.setData(data);

            // 继续往下传递
            list.add(request);
        }
    }
}

可能大家看完以后会问这么一个问题,Netty究竟是怎么给我们解决了半包读写问题,还有关于编码器的编写时什么意思?这其实就需要我们看一下Netty的源码,拿解码器做例子,下面是byteToMessageDecoder的关键代码。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if(msg instanceof ByteBuf) {
        RecyclableArrayList out = RecyclableArrayList.newInstance();
        boolean var12 = false;

        try {
            var12 = true;
            ByteBuf t = (ByteBuf)msg;
            this.first = this.cumulation == null;
            if(this.first) {
                this.cumulation = t;
            } else {
                if(this.cumulation.writerIndex() > this.cumulation.maxCapacity() - t.readableBytes()) {
                    this.expandCumulation(ctx, t.readableBytes());
                }

                this.cumulation.writeBytes(t);
                t.release();
            }

            this.callDecode(ctx, this.cumulation, out);
            var12 = false;
        } catch (DecoderException var13) {
            throw var13;
        } catch (Throwable var14) {
            throw new DecoderException(var14);
        } finally {
            if(var12) {
                if(this.cumulation != null && !this.cumulation.isReadable()) {
                    this.cumulation.release();
                    this.cumulation = null;
                }

                int size = out.size();
                this.decodeWasNull = size == 0;

                for(int i1 = 0; i1 < size; ++i1) {
                    ctx.fireChannelRead(out.get(i1));
                }

                out.recycle();
            }
        }

        if(this.cumulation != null && !this.cumulation.isReadable()) {
            this.cumulation.release();
            this.cumulation = null;
        }

        int var16 = out.size();
        this.decodeWasNull = var16 == 0;

        for(int i = 0; i < var16; ++i) {
            ctx.fireChannelRead(out.get(i));
        }

        out.recycle();
    } else {
        ctx.fireChannelRead(msg);
    }

}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    this.replayable.setCumulation(in);

    try {
        while(in.isReadable()) {
            int cause = this.checkpoint = in.readerIndex();
            int outSize = out.size();
            Object oldState = this.state;
            int oldInputLength = in.readableBytes();

            try {
                this.decode(ctx, this.replayable, out);
                if(ctx.isRemoved()) {
                    break;
                }

                if(outSize == out.size()) {
                    if(oldInputLength == in.readableBytes() && oldState == this.state) {
                        throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything.");
                    }
                    continue;
                }
            } catch (Signal var10) {
                var10.expect(REPLAY);
                if(!ctx.isRemoved()) {
                    int checkpoint = this.checkpoint;
                    if(checkpoint >= 0) {
                        in.readerIndex(checkpoint);
                    }
                }
                break;
            }

            if(cause == in.readerIndex() && oldState == this.state) {
                throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something.");
            }

            if(this.isSingleDecode()) {
                break;
            }
        }

    } catch (DecoderException var11) {
        throw var11;
    } catch (Throwable var12) {
        throw new DecoderException(var12);
    }
}

我们可以看到实际上decode方法是在callDecoder中调用的,调用完了以后如果list不为空,那么就会往上传,这样看来要是想让我们的上层handler接受对象,就必须要把我们从字节解析的对象放入list集合中。

比较关键的是

首先呢,查看是不是第一次接受,如果是那么把接受到的字节赋值给cumulation,如果不是那么把接收到的字节拼接到cumulation的后面。这是什么意思,我们可以这么考虑,把cumulation想象成一个缓存,假如你是第一次接受字节,那么你的缓存就是本次接受到的字节;如果说你上次的没有处理完,那么我就把上次没处理完的字节加上本次接受的一起放入缓存,然后再交给本次解码器处理。

还有一个问题是怎么保证在处理的时候准备返回还未处理完的字节呢,比如说我发现这一帧不够,我希望等帧接受足够大了再进行相应处理?其实关键在这个地方,一切都是以读指针来标识的,我们来看一个bytebuf其实有两个指针:一个读指针readIndex和一个写指针writeIndex,这就解决了NIO编程中经常要flip的问题,那么读指针和写指针直接的字节就是还未处理的。而我们每读一个字节,读指针都会自增,只要我们保证在我们的处理程序中返回读指针的正确位置就能保证Netty框架的缓存帮我们缓存字节信息。因此,Netty就帮我们解决了半包的解码问题,是不是很方便!

 

© 著作权归作者所有

ACRushVoid
粉丝 58
博文 18
码字总数 19761
作品 0
威海
私信 提问
加载中

评论(1)

守望之心
守望之心
赞学习!
MQTT---HiveMQ源码详解(八)Netty-WebSocket

源博客地址:http://blog.csdn.net/pipinet123 MQTT交流群:221405150 基于netty实现Webscoket相对来说就是相当简单,所以本讲中就不搞太复杂的了,给大家看一个类图,和一段给pipeline添加han...

pipinet123
2017/03/10
0
0
MQTT---HiveMQ源码详解(八)Netty-WebSocket

基于netty实现Webscoket相对来说就是相当简单,所以本讲中就不搞太复杂的了,给大家看一个类图,和一段给pipeline添加handler,就可以完全了解hivemq如何处理的了,其实跟几乎所有需要使用n...

西安PP
2017/10/22
0
0
Netty 源码深度解析(八) - 解码

就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标实现的很好的候选对象,它可以节省开发人员大量的时间和精力。 当然这也适应于本文的主题:编码和解码,或...

JavaEdge
2018/11/29
0
0
netty 学习 (2)Handler的执行顺序

Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控...

yaokangjun
2014/06/04
26.3K
10
Netty源码阅读入门实战(八)-解码

就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标实现的很好的候选对象,它可以节省开发人员大量的时间和精力。 当然这也适应于本文的主题:编码和解码,或...

芥末无疆sss
2018/10/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

从0搭建自己的webpack开发环境(五)

往期回顾: 从0搭建自己的webpack开发环境(一) 从0搭建自己的webpack开发环境(二) 从0搭建自己的webpack开发环境(三) 从0搭建自己的webpack开发环境(四) 前四篇文章我们已经掌握了w...

前端优选
23分钟前
4
0
docker 构建php-fpm 7.2(swoole) 镜像

mkdir -p ~/mnt/docker/phpmkdir -p ~/mnt/docker/php#下载swoole-2.2.0.tgz安装包到software 下载地址:http://pecl.php.net/package/swoole/2.2.0#创建Dockerfilevim ~/docker/......

Jack088
38分钟前
3
0
简单工厂

定义:由一个工厂对象决定创建出哪一种产品类的实例 类型:创建型,但不属于GOF23种设计模式 工厂类负责创建的对象比较少 客户端(应用层)只知道传入工厂类的参数,对于如何创建对象,不关心...

东风破2019
53分钟前
4
0
SSH安全加强两步走

从 OpenSSH 6.2 开始已经支持 SSH 多因素认证,本文就来讲讲如何在 OpenSSH 下启用该特性。 OpenSSH 6.2 以后的版本多了一个配置项 AuthenticationMethods。该配置项可以让 OpenSSH 同时指定...

Linux就该这么学
54分钟前
7
0
聊聊nacos的TcpSuperSenseProcessor

序 本文主要研究一下nacos的TcpSuperSenseProcessor TcpSuperSenseProcessor nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java @Compon......

go4it
54分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部