文档章节

聊聊rocketmq的TransientStorePool

go4it
 go4it
发布于 2019/12/06 23:45
字数 515
阅读 24
收藏 0

本文主要研究一下rocketmq的TransientStorePool

TransientStorePool

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java

public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final int poolSize;
    private final int fileSize;
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It's a heavy init method.
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

    public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }

    public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }

    public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }

    public int availableBufferNums() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}
  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

isTransientStorePoolEnable

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

public class MessageStoreConfig {
    //The root directory in which the log data is kept
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

    //The directory in which the commitlog is kept
    @ImportantField
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";

    //......
        
    @ImportantField
    private boolean transientStorePoolEnable = false;

    //......

    /**
     * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
     * ASYNC_FLUSH
     *
     * @return <tt>true</tt> or <tt>false</tt>
     */
    public boolean isTransientStorePoolEnable() {
        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
            && BrokerRole.SLAVE != getBrokerRole();
    }

    public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
        this.transientStorePoolEnable = transientStorePoolEnable;
    }

    //......
}
  • MessageStoreConfig定义了transientStorePoolEnable属性,默认为false;其isTransientStorePoolEnable方法在transientStorePoolEnable为true且flushDiskType为FlushDiskType.ASYNC_FLUSH且brokerRole不为BrokerRole.SLAVE的时候返回true

小结

  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1225
码字总数 1142134
作品 0
深圳
私信 提问
聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
396
0
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》一导读

内容介绍 本书由RocketMQ社区早期的布道者和技术专家撰写,Apache RocketMQ创始人/Linux OpenMessaging创始人兼主席/Alibaba Messaging开源技术负责人冯嘉对其高度评价并作序推荐。 源码角度...

小编辑01
2019/01/04
0
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
32
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
38
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
217
0

没有更多内容

加载失败,请刷新页面

加载更多

用Markdown编程之类型

类型就是约定。而现有的类型是单纬度的。用标注法编程好处就是可以多维度。 类型基础分为: 虚 实 在此之上分为: 根 寄存器级 联 内存级 外 网络级 虚:说白了就是指针或索引之类的概念。之...

dwcz
8分钟前
21
0
WPF中的StaticResource和DynamicResource有什么区别?

在WPF中使用画笔,模板和样式等资源时,可以将它们指定为StaticResources <Rectangle Fill="{StaticResource MyBrush}" /> 或者作为DynamicResource <ItemsControl ItemTemplate="{DynamicR......

javail
33分钟前
49
0
Day07继承中的面试题 答案

1. 每一个构造方法的第一条语句默认都是:super() Object类最顶层的父类。 class Zi extends Fu{ public int num = 20; public Zi(){ //super(); System.out.println("zi"); } 2.class Test......

Lao鹰
39分钟前
46
0
每天AC系列(四):四数之和

1 题目 Leetcode第18题,给定一个数组与一个target,找出数组中的四个数之和为target的不重复的所有四个数. 2 暴力 List<List<Integer>> result = new ArrayList<>();if (nums.length == 4 &......

Blueeeeeee
49分钟前
54
0
git clone --mirror和git clone --bare有什么区别

git clone帮助页面上有关于--mirror : 设置远程存储库的镜像。 这意味着--bare 。 但没有详细介绍--mirror克隆与--bare克隆--mirror不同。 #1楼 克隆将从远程服务器复制参考,并将其填充到名...

技术盛宴
今天
72
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部