文档章节

聊聊rocketmq的NettyEncoder及NettyDecoder

go4it
 go4it
发布于 2018/08/07 10:27
字数 503
阅读 57
收藏 0

本文主要研究一下rocketmq的NettyEncoder及NettyDecoder

NettyEncoder

org/apache/rocketmq/remoting/netty/NettyEncoder.java

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}
  • 这里继承MessageToByteEncoder,类型是RemotingCommand,先写入header再写入body

RemotingCommand.encodeHeader

org/apache/rocketmq/remoting/protocol/RemotingCommand.java

    public ByteBuffer encodeHeader() {
        return encodeHeader(this.body != null ? this.body.length : 0);
    }

    public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData;
        headerData = this.headerEncode();

        length += headerData.length;

        // 3> body data length
        length += bodyLength;

        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        result.flip();

        return result;
    }
  • 这里先写数据总长度,再写入header长度,最后写入header的数据,在写入body数据
  • 整体结构为:Length | Header length | Header data | Body

NettyDecoder

org/apache/rocketmq/remoting/netty/NettyDecoder.java

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    private static final int FRAME_MAX_LENGTH =
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}
  • 这里继承LengthFieldBasedFrameDecoder,maxFrameLength为FRAME_MAX_LENGTH,lengthFieldOffset为0,lengthFieldLength为4,lengthAdjustment为0,initialBytesToStrip为4
  • 获取ByteBuffer之后调用RemotingCommand.decode(byteBuffer)

RemotingCommand.decode

org/apache/rocketmq/remoting/protocol/RemotingCommand.java

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

    public static int getHeaderLength(int length) {
        return length & 0xFFFFFF;
    }

    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
        switch (type) {
            case JSON:
                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                resultJson.setSerializeTypeCurrentRPC(type);
                return resultJson;
            case ROCKETMQ:
                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                resultRMQ.setSerializeTypeCurrentRPC(type);
                return resultRMQ;
            default:
                break;
        }

        return null;
    }
  • 这里先获取header长度,获取header的数据,decode出来RemotingCommand,之后获取body

小结

rocketmq的netty编码解码,使用的是RemotingCommand对象,编码继承MessageToByteEncoder,解码采用的是LengthFieldBasedFrameDecoder

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1251
码字总数 1168439
作品 0
深圳
私信 提问
加载中

评论(0)

RocketMQ 源码学习 3 :Remoting 模块

原文出处:谢晞鸣 rocketmq-remoting 模块是 RocketMQ 中负责网络通信的模块,被其他所有需要网络通信的模块依赖。它是基于 Netty 实现的,避免了网络编程很多 tricky 的问题。 首先来看下 ...

谢晞鸣
2018/06/02
0
0
RocketMQ(二):RPC通讯

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
2018/10/29
0
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
221
0
消息中间件—RocketMQ的RPC通信(二)

文章摘要:如何设计RPC通信层模型是任何一款性能强劲的MQ所要重点考虑的问题 在(一)篇中主要介绍了RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)、消息发送/接收以及异步回调...

癫狂侠
2018/07/01
0
0
《RocketMq》一、网络传输篇

你是否想知道一个分布式系统的网络传输解决方案,那你可以学习下RocketMQ的网络传输原理,从RocketMQ的Remoting网络处理部分,可以学习到如何进行高效的网络传输,这些思想可以应用到不同的业...

xiaomin0322
2018/05/13
73
0

没有更多内容

加载失败,请刷新页面

加载更多

入门实战: ELK

ELK

BeanHo
18分钟前
14
0
PHP一致性hash代码

[TOC] PHP实现一致性hash bash命令 因为下面PHP代码的模拟用户用的是随机数,所以统计结果达不到绝对的均衡. php ./hash.php | sort | uniq -c | sort PHP代码 这是之前学的时候留下来的测试...

我爱吃炒鸡
今天
78
0
OSChina 周六乱弹 —— 现在看动弹的人都是什么状态

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《夏日、教室与望着窗外的我》- Candy_Wind 手机党少年们想听歌,请使劲儿戳(这里...

小小编辑
今天
704
8
wamp环境安装redis扩展

1.查看phpinfo信息根据配置信息下载对应的扩展 关键信息:VC14,TS,x86 2.下载php_redis和php_igbinary扩展 php_redis扩展下载地址: https://windows.php.net/downloads/pecl/snaps/redis...

点滴课程
今天
36
0
开源商城开发笔记1-创建MyBatis示例

一、修改pom.xml,引入MyBatis,JUnit,Log4j <dependencies><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.5.4</version>......

土龙
今天
56
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部