文档章节

【RocketMQ】Message存储笔记

S
 SaintTinyBoy
发布于 2018/07/22 18:14
字数 1576
阅读 15
收藏 0

概述

消息中间件存储分为三种,一是保存在内存中,速度快但会因为系统宕机等因素造成消息丢失;二是保存在内存中,同时定时将消息写入DB中,好处是持久化消息,如何读写DB是MQ的瓶颈;三是内存+磁盘,定时将消息保存在磁盘中,如何设计好的存储机制决定MQ的高并发、高可用。

通过阅读RocketMQ源码,了解下列问题的解法:

  • RocketMQ如何设计存储机制
  • 采用哪些技术保证存储的高效性

存储机制

消息存储在文件中,需要有一个角色专门来管理对应的文件,MappedFile为此而生。管理这些MappedFile的角色是MappedFileQueue,看做一个文件夹,维护CopyOnWriteArrayList<MappedFile> mappedFiles。

public class MappedFile {

    //记录每次写消息到内存之后的位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //记录每次提交到FileChannel之后的位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    //记录刷新到物理文件之后的位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    //文件大小默认是1G
    protected int fileSize;
    //对应的文件NIO通道
    protected FileChannel fileChannel;
    //对应的文件
    private File file;
    //内存缓冲区,保存暂时写入的消息
    protected ByteBuffer writeBuffer = null;
    protected MappedByteBuffer mappedByteBuffer = null;


    private void init(final String fileName, final int fileSize) throws IOException {
       
        this.fileFromOffset = Long.parseLong(this.file.getName());
        ensureDirOK(this.file.getParent());
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    }
}

MappedFile的名字(file.getName)是00000000000000000000、00000000001073741824、00000000002147483648,fileName[n] = fileName[n - 1] + mappedFileSize。直接用起始偏移量记录每个文件名称,00000000001073741824换算成大小=1G,即每个文件名称是该文件的startOffset。

MappedFile提供三种作用:写消息、提交消息到FileChannel、写磁盘

1、AppendMessageResult appendMessagesInner(MessageExt messageExt, final AppendMessageCallback cb)

2、boolean appendMessage(final byte[] data, final int offset, final int length)

3、int commit(final int commitLeastPages) 

4、int flush(final int flushLeastPages)

先看appendMessage操作

MappedFile#appendMessage

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, messageExt);
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
       .......
    }

1.先获取上一次写入位置,从Buffer中取一个分区出来

2.设置buffer即将写入的开始位置,即上一次写入位置之后

3.由回调函数AppendMessageCallback负责消息写入,该函数由CommitLog提供,逻辑是对Message做一些额外处理,如附加消息长度、时间戳等。具体如下:

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息内容CRC Int 4
4 QueueId 消息队列编号 Int 4
5 Flag flag Int 4
6 QueueOffset 消息队列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的顺序存储位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息时间戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost 存储消息的地址+端口 Long 8
13 ReconsumeTimes 重新消费消息次数 Int 4
14 PreparedTransationOffset   Long 8
15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

封装后转为字节数组写入到Buffer中即可。返回写入长度告诉wrotePosition再偏移WroteBytes长度;由此可见,ByteBuffer针对是消息维度

commit操作

public int commit(final int commitLeastPages) {

        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } 
        }
        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
        return this.committedPosition.get();
}

protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
}
/**
 * 是否能够commit。满足如下条件任意条件:
 * 1. 映射文件已经写满
 * 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages
 * 3. commitLeastPages = 0 && 有新写入部分
 * @param commitLeastPages commit最小分页
 * @return 是否能够写入
 */
 protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();
        if (this.isFull()) {  //this.fileSize == this.wrotePosition.get()
            return true;
        }
        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }
        return write > flush;
}

commit操作主要由上面三个方法构成,isAbleToCommit负责判断能否写入,每次写入超过4KB(OS页大小)。commit0把buffer中的内容(上次提交后的位置——最近一次写入Buffer的位置)写入到FileChannel中,更新committedPosition。commit操作主要针对FileChannel维度。

flush操作

public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
              
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
 }

刷新时isAbleToFlush思路和isAbletoCommit一样,保证超过4KB。刷新到磁盘后更新flushedPosition,记录物理文件的最后写入位置。flush操作针对物理文件级别。

下面再来看下CommitLog如何操作commit && flush的

FlushCommitLogService类图

FlushCommitLogService继承了ServiceThread-->Thread,因此异步执行。

线程服务 场景 插入消息性能
CommitRealTimeService 异步刷盘 && 开启内存字节缓冲区 第一
FlushRealTimeService 异步刷盘 && 关闭内存字节缓冲区 第二
GroupCommitService 同步刷盘 第三

CommitRealTimeService定时调用mappedFileQueue.commit(commitDataLeastPages)执行提交。提交之后唤醒flushCommitLogService执行落盘。

【MappedFileQueue】
 public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = findMappedFileByOffset(committedWhere,committedWhere == 0);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);
            // 更新之后的位置,即下一次提交开始位置
            long where = mappedFile.getFileFromOffset() + offset;
            //如果不相等,说明有写入,否则上一步操作offset是零,相加之后才可能依然等于committedWhere
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
 }

首先findMappedFileByOffset找到要提交的文件,公式是 index (文件在集合中的下标)= (committedWhere-startOffset)/fileSize,committedWhere即要提交的位置,例如committedWhere = 4000,startOffset = 0,fileSize = 1024,那么index = 3,从Queue中获取第4个MappedFile,由它负责把自身的buffer提交到FileChannel。

FlushRealTimeService也是定时刷新内容到物理文件中,刷新成功后更新flushedWhere,主要步骤和commit相似。

 public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

 

© 著作权归作者所有

共有 人打赏支持
S
粉丝 0
博文 25
码字总数 28106
作品 0
成都
私信 提问
rocketmq4.x快速入门指南

以下采用的是版本 相关文档如下 快速体验: http://blog.seoui.com/2018/07/24/rocketmqinstall/ rocketmq简单消息发送: http://blog.seoui.com/2018/07/24/rocketmq_simple_message/ rock......

peachyy
2018/08/02
0
0
一文讲透Apache RocketMQ技术精华

嘉宾介绍:刘振东 阿里巴巴中间件技术专家,2016年中间件性能挑战赛亚军,具有丰富的分布式系统设计和优化经验,目前负责Apache RocketMQ新航道探索和创新。 分享文章内容包括RocketMQ的起源...

架构师技术联盟
2018/11/07
0
0
读懂这篇文章,你的阿里技术面就可以过关了

在美国的大学课程中,101是所有课程中的第一门,是新生入学后的必修课程。阿里巴巴中间件技术专家刘振东在上周的Apache RocketMQ开发者沙龙北京站的活动上,进行了主题为《ApacheRocketMQ 10...

技术小能手
2018/11/06
0
0
聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
0
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

mariadb 内存占用优化

本文由云+社区发表 作者:工程师小熊 摘要:我们在使用mariadb的时候发现有时候不能启动起来,在使用过程中mariadb占用的内存很大,在这里学习下mariadb与内存相关的配置项,对mariadb进行调...

腾讯云加社区
53分钟前
2
0
spring security 自定义登录认证

spring security 自定义认证登录 1.概要 1.1.简介 spring security是一种基于 Spring AOP 和 Servlet 过滤器的安全框架,以此来管理权限认证等。 1.2.spring security 自定义认证流程 1)认证...

EasyProgramming
53分钟前
1
0
PAI通过流式机器学习算法解决实时热点新闻挖掘案例

(机器学习PAI Online Learning模块上线邀测,目前只支持华北2(北京)区域使用,本实验会用到流式机器学习算法) PAI地址:https://data.aliyun.com/product/learn 邀测申请地址:https://dat...

阿里云官方博客
57分钟前
1
0
Win下Jenkins-2.138源码编译及填坑笔记

源码编译篇 1、 安装JDK1.8-181,操作系统添加JDK环境变量。Java -version验证一下。 注:Jenkins2.138版本,JDK必须jkd1.8.0-101以上,不支持Java9,Maven必须3.5.3以上。 2、 解压Maven3....

编程SHA
59分钟前
2
0
Oracle数据库常用函数 转换函数 日期函数 字符型函数 数值函数

在讲解函数的功能和用法之前,先了解一下dual这个表。 dual这个表是一张只有一个字段,一行记录的表。它是一个虚拟表,用来构成select的语法规则。所以我们接下来会用到这个表来讲解常用函数。...

Sakura20
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部