文档章节

聊聊rocketmq的HAClient

go4it
 go4it
发布于 2019/12/12 23:46
字数 577
阅读 82
收藏 0

本文主要研究一下rocketmq的HAClient

HAClient

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java

    class HAClient extends ServiceThread {
        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
        private final AtomicReference<String> masterAddress = new AtomicReference<>();
        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
        private SocketChannel socketChannel;
        private Selector selector;
        private long lastWriteTimestamp = System.currentTimeMillis();

        private long currentReportedOffset = 0;
        private int dispatchPosition = 0;
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

        public HAClient() throws IOException {
            this.selector = RemotingUtil.openSelector();
        }

        //......

        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {

                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        this.selector.select(1000);

                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }

                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }

                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return HAClient.class.getSimpleName();
        }

        //......

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }

        //......        
    }
  • HAClient继承了ServiceThread,其run方法以isStopped为false进行while循环,之后通过connectMaster方法判断是否连上masterAddress,连不上则执行waitForRunning(1000 * 5);连上了master之后再判断isTimeToReportOffset,即判断当前时间与lastWriteTimestamp的差值,若该值大于defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),则返回true;最后执行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下会执行dispatchReadRequest

dispatchReadRequest

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java

    class HAClient extends ServiceThread {

    	//......

        private boolean dispatchReadRequest() {
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();

            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                if (diff >= msgHeaderSize) {
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                    if (slavePhyOffset != 0) {
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }

                    if (diff >= (msgHeaderSize + bodySize)) {
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                        this.byteBufferRead.get(bodyData);

                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                        this.byteBufferRead.position(readSocketPos);
                        this.dispatchPosition += msgHeaderSize + bodySize;

                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }

                        continue;
                    }
                }

                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }

        //......
    }
  • dispatchReadRequest会判断diff >= (msgHeaderSize + bodySize),若成立则执行defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData)

小结

  • HAClient继承了ServiceThread,其run方法以isStopped为false进行while循环,之后通过connectMaster方法判断是否连上masterAddress,连不上则执行waitForRunning(1000 * 5);连上了master之后再判断isTimeToReportOffset,即判断当前时间与lastWriteTimestamp的差值,若该值大于defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),则返回true;最后执行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下会执行dispatchReadRequest

doc

© 著作权归作者所有

go4it
粉丝 93
博文 1299
码字总数 1212566
作品 0
深圳
私信 提问
加载中

评论(0)

探秘 RocketMQ 消息持久化机制

原文:探秘 RocketMQ 消息持久化机制 我们知道 RocketMQ 是一款高性能、高可靠的分布式消息中间件,高性能和高可靠是很难兼得的。因为要保证高可靠,那么数据就必须持久化到磁盘上,将数据持久...

一条属于你的未来之路
昨天
0
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
44
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
240
0
聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
553
0
消息中间件 RocketMQ源码解析:高可用

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/high-availability/ 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 RocketMQ 4.0.x 正式版 1. 概述 2. Namesrv 高可用 2.1 Broker 注...

芋道源码
2017/05/14
1.4K
0

没有更多内容

加载失败,请刷新页面

加载更多

一篇小短文让你了解Maven

简介 Apache Maven是一个项目管理及构建工具,主要用于Java项目的构建,Maven还可以用于构建和管理以C#,Ruby,Scala和其他语言编写的项目。 Maven解决了构建软件那几方面问题: 编译构建 ...

osc_e3ck15c3
21分钟前
2.1K
0
【转】Aspnet Core为什么支持跨平台

1.框架决定--因为代码运行需要环境,有了能够运行在 Windows和Linux下面的 CLR. 2.netCore有了个 kestrel(具体的解释去查询下).跨平台的适用于ASP.NET Core的WEB服务器。角色类似 IIS,他不是...

osc_8ki1usvn
23分钟前
936
0
小狗 T12 智能无线吸尘器体验:让打扫这件事简单点

摘要 或许你缺这么一件趁手的无线吸尘器。 比起收拾自己的热情,年轻人收拾房间的动力可能几乎为零,心血来潮的大扫除也并不能维持太久。 除了平时工作太忙以外,没有一件趁手的清扫「兵器」...

osc_zqxv5pte
23分钟前
1.6K
0
能听会说、还支持手写,讯飞这款智能笔记本也许能让你爱上学习

摘要 一个集阅读、写作、办公为一体的「效率神器」。 你所期待的笔记本是什么样子?有人重视书写手感,只要有一支笔、一个本子就够了;有人觉得长久保存更重要,所以会选择一些笔记应用;想要...

osc_0qnrwmy3
24分钟前
1.3K
0
WEB缓存系统之varnish缓存项修剪

  前文我们聊了下varnish的状态引擎和不同类型的变量对应该使用在那个状态引擎中,以及每个状态引擎的对应处理事务;回顾请参考https://www.cnblogs.com/qiuhom-1874/p/12643549.html;今天...

osc_sbtpzgv1
25分钟前
2.2K
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部