文档章节

探秘RocketMQ消息持久化

Bieber
 Bieber
发布于 2016/08/02 14:46
字数 2259
阅读 4439
收藏 6

#探秘RocketMQ消息持久化 之前对RocketMq的上层接口进行过介绍,但是作为一个可持久化的MQ中间件,那么其核心必然是对消息的持久化这一块。这也是我一直想去了解一下,最近稍微闲下来一点,把RocketMQ这一块代码实现给过了一下,这里对看到得内容和想法进行一次总结。这里分三部分来介绍RocketMQ消息持久化,分别是来自消息提供端的写,来自消费端的读以及从本地磁盘中进行恢复。

在介绍之前先了解一下RocketMQ中对数IO的统一入口,分别是MapedFileQueueMapedFileMapedFileQueue是对某个目录下面文件IO的统一入口,而MapedFile是对MapedFileQueue 目录下某个文件的IO封装(具体是通过MapedByteBuffer来进行IO操作),对于MapedFileQueue MapedFile来说,它们关注的只是往哪个位置些多少数据,以及从哪个位置读取数据,而不管里面存储的内容,写入的内容都将是byte数组。RocketMQ的消息文件以及其他文件的IO都是基于这两个类来做,而不是直接操作IO。介绍完这个之后,那么下面将对RocketMQ如何基于MapedFileQueueMapedFile来做到消息的写和读。

在介绍之前,先看看RocketMQ的broker的整体架构图

输入图片说明

##来自消息提供端的写

消息提供端发起一个send操作,会被brokerSendMessageProcessor所处理,至于SendMessageProcessor这个类中做了哪些事情,这里就不做解释,主要是对SendMessageProcessor如何将消息写入到磁盘进行介绍,SendMessageProcessor 会把写磁盘的操作交给DefaultMessageStore类去处理,而DefaultMessageStore 也不会做具体IO的事情,而是交给CommitLog,在CommitLog之下则是MapedFileQueue,在MapedFileQueue中会写入到最新的MapedFile中(此时的MapedFile默认最大1G,所有存储的配置都在MessageStoreConfig类中获取。),这里从SendMessageProcessor DefaultMessageStore 再到CommitLog ,最后到MapedFileQueue ,这个过程中是所有的topic都操作同一个MapedFileQueue,那就是说所有的Topic的消息都些在一个目录下面(因为一个MapedFileQueue对应一个目录,CommitLog的目录默认是在${user_home}/store/commitlog下),上面由消息提供端每次send都是一个完整的消息体,那就是一个完整的消息,这个消息体将会连续的写到MapedFileQueue的最新MapedFile中,在MapedFileQueue里面维护了commitlog的全局offset,那么只需要告诉MapedFileQueue一个全局offset和消息体的大小,那么就可以从MapedFileQueue中读取一个消息。但是在commitlog中只是负责将消息写入磁盘,而不管你怎么来读取,但是CommitLog通过MapedFileQueue写完之后,那么会得到当前写的位置,以及消息体大小,同时加上topic的元数据信息,通过异步队列的方式写到topic的索引文件,这个文件就是下面介绍消息读取的时候用到。

CommitLogputMessage方法调用MapedFileQueue写完消息之后,那么会调用DefaultMessageStoreputDispatchRequest方法进行将本次写操作广播出去,具体代码如下:

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
            ..........
            result = mapedFile.appendMessage(msg, this.appendMessageCallback);
            ..........
            DispatchRequest dispatchRequest = new DispatchRequest(//
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10

            this.defaultMessageStore.putDispatchRequest(dispatchRequest);
            ..........
        return putMessageResult;
  }

如果继续跟进putDispatchRequest代码,就会发现是将dispatchRequest放到一个队列,然后由另一个线程去处理这个数据,这样可以提高消息提供端写入broker的效率,在这个线程中,会触发DefaultMessageStoreputMessagePostionInfo方法,该方法实现如下:

public void putMessagePostionInfo(String topic, int queueId, long offset, int size, long tagsCode,
            long storeTimestamp, long logicOffset) {
        ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
        cq.putMessagePostionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
    }

可以看到通过topicqueueId得到具体的ConsumerQueue,确定这个消息在哪个消费队列里面,同事触发cq.putMessagePostionInfoWrapper 调用,从方法名就知道是记录消息位置的,最后会调用ConsumerQueue putMessagePostionInfo方法,这个方法实现如下:

private boolean putMessagePostionInfo(final long offset, final int size, final long tagsCode,
            final long cqOffset) {
        // 在数据恢复时会走到这个流程
        if (offset <= this.maxPhysicOffset) {
            return true;
        }

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQStoreUnitSize);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQStoreUnitSize;

        MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
        if (mapedFile != null) {
            // 纠正MapedFile逻辑队列索引顺序
            if (mapedFile.isFirstCreateInQueue() && cqOffset != 0 && mapedFile.getWrotePostion() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.fillPreBlank(mapedFile, expectLogicOffset);
                log.info("fill pre blank space " + mapedFile.getFileName() + " " + expectLogicOffset + " "
                        + mapedFile.getWrotePostion());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mapedFile.getWrotePostion() + mapedFile.getFileFromOffset();
                if (expectLogicOffset != currentLogicOffset) {
                    // XXX: warn and notify me
                    logError
                        .warn(
                            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",//
                            expectLogicOffset, //
                            currentLogicOffset,//
                            this.topic,//
                            this.queueId,//
                            expectLogicOffset - currentLogicOffset//
                        );
                }
            }

            // 记录物理队列最大offset
            this.maxPhysicOffset = offset;
            return mapedFile.appendMessage(this.byteBufferIndex.array());
        }

        return false;
    }

下面是要写入磁盘的内容

this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQStoreUnitSize);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

可以看到是写入了commitlog得全局offset和消息体的大小,以及tags信息。

MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);

上面看到,ConsumerQueue(默认情况下ConsumerQueue是在${user_home}/store/consumerqueue/${queueId})也是通过MapedFileQueue来进行磁盘IO的,素以MapedFileQueue可以理解为RocketMQ的磁盘访问入口。

到这里基本上完成了一次消息提供端发起send操作所做的哪些事情,主要是通过CommitLog来进行消息内容的持久化,以及通过ConsumerQueue来确定消息被哪个队列消费,以及消息的索引持久化。

这里再介绍一下上面的queueId是怎么来的,因为消息提供端send某个topic的消息并不知道queueId,这个queueId是在broker端生成的,生成代码在SendMessageProcessor的方法consumerSendMsgBack中,代码段如下:

 int queueIdInt =
                Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

##来自消息消费端的读

broker端处理来自消费端的读请求,是交给PullMessageProcessor类来处理,在方法processRequest经过一系列处理之后,会交给DefaultMessageStoregetMessage方法,我这里贴出该方法主要代码段

 public GetMessageResult getMessage(final String group, final String topic, final int queueId,
           final long offset, final int maxMsgNums, final SubscriptionData subscriptionData) {
       ........
       GetMessageResult getResult = new GetMessageResult();

       // 有个读写锁,所以只访问一次,避免锁开销影响性能
       final long maxOffsetPy = this.commitLog.getMaxOffset();

       ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
       if (consumeQueue != null) {
           minOffset = consumeQueue.getMinOffsetInQuque();
           maxOffset = consumeQueue.getMaxOffsetInQuque();
           .........//逻辑校验
           else {
               //这里的offset是只从第几个消息开始消费,该方法返回的时从offset之后的消息体索引的io
               SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
               if (bufferConsumeQueue != null) {
                   try {
                       status = GetMessageStatus.NO_MATCHED_MESSAGE;

                       long nextPhyFileStartOffset = Long.MIN_VALUE;
                       long maxPhyOffsetPulling = 0;

                       int i = 0;
                       final int MaxFilterMessageCount = 16000;
                       boolean diskFallRecorded = false;
                       //循环读出多个消息内容
                       for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i +=
                               ConsumeQueue.CQStoreUnitSize) {
                           //得到了一个消息体索引信息
                           long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();//commitlog的全局偏移量
                           int sizePy = bufferConsumeQueue.getByteBuffer().getInt();//消息大小
                           long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();//tag信息

                           maxPhyOffsetPulling = offsetPy;

                           //参数校验

                           // 消息过滤
                           if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                               //从commitlog中读取消息
                               SelectMapedBufferResult selectResult =
                                       this.commitLog.getMessage(offsetPy, sizePy);
                               .........
                       }
       ......

       getResult.setStatus(status);
       getResult.setNextBeginOffset(nextBeginOffset);
       getResult.setMaxOffset(maxOffset);
       getResult.setMinOffset(minOffset);
       return getResult;
   }

上面是读取消息的部分逻辑,可以看到是先从ConsumerQueue中获取消息索引,然后再从commitlog中读取消息内容。这些内容也是在存储消息的时候写入的。因为broker端并不是一直运行的,而里面的commitlog的offset是有状态的,不能说你的broker挂掉了,导致commitlog的offset丢失,可能导致消息被覆盖。所以下面再用上一小段来介绍RocketMQ如何做到commitlog的offset重启后不丢失。

##从本地磁盘恢复

MapedFileQueue中有一个load方法,这个方法是将MapedFileQueue所管理目录中得文件加载到MapedFile中,如果你追踪这个load方法的被调用链路,会发现是在BrokerControllerinitialize触发了整个调用,那就是说在broker启动的时候,会触发CommitLog去将本地磁盘的数据关系加载到系统里面来,上面说了CommitLog有一个全局offset,这个offset在broker启动的时候怎么被查找的呢?如果你们熟悉的话,应该猜得到,是将CommitLogMapedFileQueue中文件进行计算,得到当前CommitLog的全局offset,下面我贴出具体找得代码:

 /**
     * recover时调用,不需要加锁
     */
    public void truncateDirtyFiles(long offset) {
        List<MapedFile> willRemoveFiles = new ArrayList<MapedFile>();

        for (MapedFile file : this.mapedFiles) {
            long fileTailOffset = file.getFileFromOffset() + this.mapedFileSize;
            if (fileTailOffset > offset) {
                if (offset >= file.getFileFromOffset()) {
                    file.setWrotePostion((int) (offset % this.mapedFileSize));
                    file.setCommittedPosition((int) (offset % this.mapedFileSize));
                }
                else {
                    // 将文件删除掉
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

上面代码是在MapedFileQueue中得,是被CommitLogrecoverAbnormally方法调用,而recoverAbnormally最上层触发也是在BrokerControllerinitialize方法中。上面说的是CommitLog的恢复过程,而ConsumerQueue的恢复恢复过程也是类似,感兴趣可以自己去看看。

© 著作权归作者所有

Bieber

Bieber

粉丝 209
博文 36
码字总数 83312
作品 1
杭州
高级程序员
私信 提问
加载中

评论(3)

正是那朵玫瑰

引用来自“业余编程人士”的评论

我想问一下 像RocketMQ这种中间件的开发 是不是只有放到服务器上才可以测试 在本地Eclipse里面可以测试吗?

@业余编程人士 可以的
Bieber
Bieber 博主

引用来自“业余编程人士”的评论

我想问一下 像RocketMQ这种中间件的开发 是不是只有放到服务器上才可以测试 在本地Eclipse里面可以测试吗?
可以,可以看看各个的start类
业余编程人士
业余编程人士
我想问一下 像RocketMQ这种中间件的开发 是不是只有放到服务器上才可以测试 在本地Eclipse里面可以测试吗?
[RocketMQ]消息中间件—RocketMQ消息存储(一)

文章摘要:MQ分布式消息队列大致流程在于消息的一发一收一存,本篇将为大家主要介绍下RocketMQ存储部分的架构 消息存储是MQ消息队列中最为复杂和最为重要的一部分,所以小编也就放在RocketM...

morpheusWB
2018/09/29
137
0
Apache Pulsar简介

Apache Pulsar What is Pulsar "Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API." Pulsar是pub-sub模式的分布式消......

杭州.Mark
2018/07/16
0
0
消息中间件—RocketMQ消息发送

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢? 大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适...

癫狂侠
2018/08/05
0
0
消息中间件系列第2讲:如何进行消息队列选型?

要做技术选型,那么必须对现今的各个消息中间件有个深入的理解才能做技术选型。否则别人问你,你为什么要用这个消息中间件,你说不出个所以然来,怎么做架构师呢? 截止到目前为止,现在业界...

陈树义
01/07
0
0
爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场
2018/04/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

使用原生css+js+html实现打印A4纸张的功能页面

有时候我们需要使用html+css实现打印A4纸张的功能页面,以下代码实现 <!DOCTYPE html><html lang="zh-CN"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatibl......

b0cloud
18分钟前
2
0
读组件化之MGJRouter源码第二次的收获与思考

上一次我们写好了一个自定义的 路由类 ,然后我们来制作自己的 库 ,可以用来被 pod 引入 : 库的制作参考:https://www.jianshu.com/p/928d2ab053be 以下是我创建的: 利用上篇提到的 ,组件...

T型人才追梦者
19分钟前
1
0
spring cache、ehcache的使用及集成

项目中需要加缓存,故学习了 1、spring cache、ehcache的使用及集成 2、缓存的命中率等统计数据 一、spring cache 1、概述 Spring 3.1 引入了基于注解(annotation)的缓存(cache)技术 2、...

qkKing
21分钟前
2
0
Windows 10上源码编译Poco并编写httpserver和tcpserver | compile and install poco cpp library on windows

本文首发于个人博客https://kezunlin.me/post/9587bb47/,欢迎阅读! compile and install poco cpp library on windows Series guide to compile and install poco cpp library on windows g......

kezunlin
21分钟前
2
0
if-else-if-else与switch的区别

if-else-if-else: 适合分支较少 判断条件类型不单一 支持取 boolean 类型的所有运算 满足条件即停止对后续分支语句的执行 switch: 适合分支较多 判断条件类型单一,JDK 1.7 之前仅支持 in...

ConstXiong
22分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部