探秘RocketMQ消息持久化

原创
2016/08/02 14:46
阅读数 1.3W

#探秘RocketMQ消息持久化 之前对RocketMq的上层接口进行过介绍,但是作为一个可持久化的MQ中间件,那么其核心必然是对消息的持久化这一块。这也是我一直想去了解一下,最近稍微闲下来一点,把RocketMQ这一块代码实现给过了一下,这里对看到得内容和想法进行一次总结。这里分三部分来介绍RocketMQ消息持久化,分别是来自消息提供端的写,来自消费端的读以及从本地磁盘中进行恢复。

在介绍之前先了解一下RocketMQ中对数IO的统一入口,分别是MapedFileQueueMapedFileMapedFileQueue是对某个目录下面文件IO的统一入口,而MapedFile是对MapedFileQueue 目录下某个文件的IO封装(具体是通过MapedByteBuffer来进行IO操作),对于MapedFileQueue MapedFile来说,它们关注的只是往哪个位置些多少数据,以及从哪个位置读取数据,而不管里面存储的内容,写入的内容都将是byte数组。RocketMQ的消息文件以及其他文件的IO都是基于这两个类来做,而不是直接操作IO。介绍完这个之后,那么下面将对RocketMQ如何基于MapedFileQueueMapedFile来做到消息的写和读。

在介绍之前,先看看RocketMQ的broker的整体架构图

输入图片说明

##来自消息提供端的写

消息提供端发起一个send操作,会被brokerSendMessageProcessor所处理,至于SendMessageProcessor这个类中做了哪些事情,这里就不做解释,主要是对SendMessageProcessor如何将消息写入到磁盘进行介绍,SendMessageProcessor 会把写磁盘的操作交给DefaultMessageStore类去处理,而DefaultMessageStore 也不会做具体IO的事情,而是交给CommitLog,在CommitLog之下则是MapedFileQueue,在MapedFileQueue中会写入到最新的MapedFile中(此时的MapedFile默认最大1G,所有存储的配置都在MessageStoreConfig类中获取。),这里从SendMessageProcessor DefaultMessageStore 再到CommitLog ,最后到MapedFileQueue ,这个过程中是所有的topic都操作同一个MapedFileQueue,那就是说所有的Topic的消息都些在一个目录下面(因为一个MapedFileQueue对应一个目录,CommitLog的目录默认是在${user_home}/store/commitlog下),上面由消息提供端每次send都是一个完整的消息体,那就是一个完整的消息,这个消息体将会连续的写到MapedFileQueue的最新MapedFile中,在MapedFileQueue里面维护了commitlog的全局offset,那么只需要告诉MapedFileQueue一个全局offset和消息体的大小,那么就可以从MapedFileQueue中读取一个消息。但是在commitlog中只是负责将消息写入磁盘,而不管你怎么来读取,但是CommitLog通过MapedFileQueue写完之后,那么会得到当前写的位置,以及消息体大小,同时加上topic的元数据信息,通过异步队列的方式写到topic的索引文件,这个文件就是下面介绍消息读取的时候用到。

CommitLogputMessage方法调用MapedFileQueue写完消息之后,那么会调用DefaultMessageStoreputDispatchRequest方法进行将本次写操作广播出去,具体代码如下:

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
            ..........
            result = mapedFile.appendMessage(msg, this.appendMessageCallback);
            ..........
            DispatchRequest dispatchRequest = new DispatchRequest(//
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10

            this.defaultMessageStore.putDispatchRequest(dispatchRequest);
            ..........
        return putMessageResult;
  }

如果继续跟进putDispatchRequest代码,就会发现是将dispatchRequest放到一个队列,然后由另一个线程去处理这个数据,这样可以提高消息提供端写入broker的效率,在这个线程中,会触发DefaultMessageStoreputMessagePostionInfo方法,该方法实现如下:

public void putMessagePostionInfo(String topic, int queueId, long offset, int size, long tagsCode,
            long storeTimestamp, long logicOffset) {
        ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
        cq.putMessagePostionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
    }

可以看到通过topicqueueId得到具体的ConsumerQueue,确定这个消息在哪个消费队列里面,同事触发cq.putMessagePostionInfoWrapper 调用,从方法名就知道是记录消息位置的,最后会调用ConsumerQueue putMessagePostionInfo方法,这个方法实现如下:

private boolean putMessagePostionInfo(final long offset, final int size, final long tagsCode,
            final long cqOffset) {
        // 在数据恢复时会走到这个流程
        if (offset <= this.maxPhysicOffset) {
            return true;
        }

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQStoreUnitSize);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQStoreUnitSize;

        MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
        if (mapedFile != null) {
            // 纠正MapedFile逻辑队列索引顺序
            if (mapedFile.isFirstCreateInQueue() && cqOffset != 0 && mapedFile.getWrotePostion() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.fillPreBlank(mapedFile, expectLogicOffset);
                log.info("fill pre blank space " + mapedFile.getFileName() + " " + expectLogicOffset + " "
                        + mapedFile.getWrotePostion());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mapedFile.getWrotePostion() + mapedFile.getFileFromOffset();
                if (expectLogicOffset != currentLogicOffset) {
                    // XXX: warn and notify me
                    logError
                        .warn(
                            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",//
                            expectLogicOffset, //
                            currentLogicOffset,//
                            this.topic,//
                            this.queueId,//
                            expectLogicOffset - currentLogicOffset//
                        );
                }
            }

            // 记录物理队列最大offset
            this.maxPhysicOffset = offset;
            return mapedFile.appendMessage(this.byteBufferIndex.array());
        }

        return false;
    }

下面是要写入磁盘的内容

this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQStoreUnitSize);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

可以看到是写入了commitlog得全局offset和消息体的大小,以及tags信息。

MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);

上面看到,ConsumerQueue(默认情况下ConsumerQueue是在${user_home}/store/consumerqueue/${queueId})也是通过MapedFileQueue来进行磁盘IO的,素以MapedFileQueue可以理解为RocketMQ的磁盘访问入口。

到这里基本上完成了一次消息提供端发起send操作所做的哪些事情,主要是通过CommitLog来进行消息内容的持久化,以及通过ConsumerQueue来确定消息被哪个队列消费,以及消息的索引持久化。

这里再介绍一下上面的queueId是怎么来的,因为消息提供端send某个topic的消息并不知道queueId,这个queueId是在broker端生成的,生成代码在SendMessageProcessor的方法consumerSendMsgBack中,代码段如下:

 int queueIdInt =
                Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

##来自消息消费端的读

broker端处理来自消费端的读请求,是交给PullMessageProcessor类来处理,在方法processRequest经过一系列处理之后,会交给DefaultMessageStoregetMessage方法,我这里贴出该方法主要代码段

 public GetMessageResult getMessage(final String group, final String topic, final int queueId,
           final long offset, final int maxMsgNums, final SubscriptionData subscriptionData) {
       ........
       GetMessageResult getResult = new GetMessageResult();

       // 有个读写锁,所以只访问一次,避免锁开销影响性能
       final long maxOffsetPy = this.commitLog.getMaxOffset();

       ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
       if (consumeQueue != null) {
           minOffset = consumeQueue.getMinOffsetInQuque();
           maxOffset = consumeQueue.getMaxOffsetInQuque();
           .........//逻辑校验
           else {
               //这里的offset是只从第几个消息开始消费,该方法返回的时从offset之后的消息体索引的io
               SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
               if (bufferConsumeQueue != null) {
                   try {
                       status = GetMessageStatus.NO_MATCHED_MESSAGE;

                       long nextPhyFileStartOffset = Long.MIN_VALUE;
                       long maxPhyOffsetPulling = 0;

                       int i = 0;
                       final int MaxFilterMessageCount = 16000;
                       boolean diskFallRecorded = false;
                       //循环读出多个消息内容
                       for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i +=
                               ConsumeQueue.CQStoreUnitSize) {
                           //得到了一个消息体索引信息
                           long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();//commitlog的全局偏移量
                           int sizePy = bufferConsumeQueue.getByteBuffer().getInt();//消息大小
                           long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();//tag信息

                           maxPhyOffsetPulling = offsetPy;

                           //参数校验

                           // 消息过滤
                           if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                               //从commitlog中读取消息
                               SelectMapedBufferResult selectResult =
                                       this.commitLog.getMessage(offsetPy, sizePy);
                               .........
                       }
       ......

       getResult.setStatus(status);
       getResult.setNextBeginOffset(nextBeginOffset);
       getResult.setMaxOffset(maxOffset);
       getResult.setMinOffset(minOffset);
       return getResult;
   }

上面是读取消息的部分逻辑,可以看到是先从ConsumerQueue中获取消息索引,然后再从commitlog中读取消息内容。这些内容也是在存储消息的时候写入的。因为broker端并不是一直运行的,而里面的commitlog的offset是有状态的,不能说你的broker挂掉了,导致commitlog的offset丢失,可能导致消息被覆盖。所以下面再用上一小段来介绍RocketMQ如何做到commitlog的offset重启后不丢失。

##从本地磁盘恢复

MapedFileQueue中有一个load方法,这个方法是将MapedFileQueue所管理目录中得文件加载到MapedFile中,如果你追踪这个load方法的被调用链路,会发现是在BrokerControllerinitialize触发了整个调用,那就是说在broker启动的时候,会触发CommitLog去将本地磁盘的数据关系加载到系统里面来,上面说了CommitLog有一个全局offset,这个offset在broker启动的时候怎么被查找的呢?如果你们熟悉的话,应该猜得到,是将CommitLogMapedFileQueue中文件进行计算,得到当前CommitLog的全局offset,下面我贴出具体找得代码:

 /**
     * recover时调用,不需要加锁
     */
    public void truncateDirtyFiles(long offset) {
        List<MapedFile> willRemoveFiles = new ArrayList<MapedFile>();

        for (MapedFile file : this.mapedFiles) {
            long fileTailOffset = file.getFileFromOffset() + this.mapedFileSize;
            if (fileTailOffset > offset) {
                if (offset >= file.getFileFromOffset()) {
                    file.setWrotePostion((int) (offset % this.mapedFileSize));
                    file.setCommittedPosition((int) (offset % this.mapedFileSize));
                }
                else {
                    // 将文件删除掉
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

上面代码是在MapedFileQueue中得,是被CommitLogrecoverAbnormally方法调用,而recoverAbnormally最上层触发也是在BrokerControllerinitialize方法中。上面说的是CommitLog的恢复过程,而ConsumerQueue的恢复恢复过程也是类似,感兴趣可以自己去看看。

展开阅读全文
打赏
2
6 收藏
分享
加载中

引用来自“业余编程人士”的评论

我想问一下 像RocketMQ这种中间件的开发 是不是只有放到服务器上才可以测试 在本地Eclipse里面可以测试吗?

@业余编程人士 可以的
2018/03/03 11:46
回复
举报
Bieber博主

引用来自“业余编程人士”的评论

我想问一下 像RocketMQ这种中间件的开发 是不是只有放到服务器上才可以测试 在本地Eclipse里面可以测试吗?
可以,可以看看各个的start类
2017/06/01 09:37
回复
举报
我想问一下 像RocketMQ这种中间件的开发 是不是只有放到服务器上才可以测试 在本地Eclipse里面可以测试吗?
2017/05/31 15:53
回复
举报
更多评论
打赏
3 评论
6 收藏
2
分享
返回顶部
顶部