文档章节

rocketmq 4.0 Broker启动流程分析

Jasun
 Jasun
发布于 2017/06/17 12:37
字数 2028
阅读 235
收藏 1

在一次网络故障中引发的消息重发而导致深度研究源码的瞎想!!!回想当时整个过程是这样的:项目部署的主从,当天下午主网络问题,然后停掉主和从,将从改为主,启动后未发生任何重发消息;我当时建议把以前的主改为从启动起来,运维人员说以前主配置内存大些,然后就停掉了现在的一主,将其又改为从,以前的那个作为主又启动起来,后续问题出现了,结果大量消息重新消费;再由之有些业务并未做去重处理,显然整个数据就紊乱了。

注:消息服务做了补偿机制,即使宕机也不会影响业务端流程正常运行和数据丢失现象。

     我查看了很久,没有看出任何问题,检查了所有日志输出信息也未发现问题,后来的推断应该是consumerOffset.json文件或者其他文件消费索引文件给重置了,然后把消息重新尝试发了一遍,建立消费索引。废话少说,以下我们来分析broker源码启动流程,来寻找真正的原因。

 配置加载和BrokerController创建

    public static BrokerController createBrokerController(String[] args) {
    	// 设置rocketmq.remoting.version变量到系统
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        
        // 检查netty远程通讯缓冲大小配置并初始化
        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
            NettySystemConfig.socketSndbufSize = 131072;
        }

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
            NettySystemConfig.socketRcvbufSize = 131072;
        }

        try {
        	// 创建命令行参数:如help, mqbroker等命令,设置namesrvAddr-->n, h-->help等后续会用到
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }
            
            // 初始化Broker配置
            final BrokerConfig brokerConfig = new BrokerConfig();
            // 初始化netty客户端和服务器配置
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
            nettyServerConfig.setListenPort(10911);
            // 初始化消息存储位置和rocket所有启动broker的加载的properties配置信息
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            
            // 默认初始化是为异步复制
            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }

            if (commandLine.hasOption('p')) {
                MixAll.printObjectProperties(null, brokerConfig);
                MixAll.printObjectProperties(null, nettyServerConfig);
                MixAll.printObjectProperties(null, nettyClientConfig);
                MixAll.printObjectProperties(null, messageStoreConfig);
                System.exit(0);
            } else if (commandLine.hasOption('m')) {
                MixAll.printObjectProperties(null, brokerConfig, true);
                MixAll.printObjectProperties(null, nettyServerConfig, true);
                MixAll.printObjectProperties(null, nettyClientConfig, true);
                MixAll.printObjectProperties(null, messageStoreConfig, true);
                System.exit(0);
            }
            
            // 启动时设置了 -c /conf/2m-2s-sync/broker-a.properties文件加载目录
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                	// broker-a.properties文件
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    // 加载文件到属性集合中
                    properties = new Properties();
                    properties.load(in);
                    
                    // 设置配置对象值
                    parsePropertie2SystemEnv(properties);
                    MixAll.properties2Object(properties, brokerConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
                    MixAll.properties2Object(properties, nettyClientConfig);
                    MixAll.properties2Object(properties, messageStoreConfig);

                    BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();
                }
            }
            
            // 将nameSrv和file路径设置到BrokerConfig对象中
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

            //brokerConfig.setRocketmqHome("H:/open_source_project/incubator-rocketmq");
            // rocketmqHome为空则退出程序
            if (null == brokerConfig.getRocketmqHome()) {
                System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
                    + " variable in your environment to match the location of the RocketMQ installation");
                System.exit(-2);
            }

            String namesrvAddr = brokerConfig.getNamesrvAddr();
            if (null != namesrvAddr) {
                try {
                    String[] addrArray = namesrvAddr.split(";");
                    for (String addr : addrArray) {
                        RemotingUtil.string2SocketAddress(addr);
                    }
                } catch (Exception e) {
                    System.out.printf(
                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                        namesrvAddr);
                    System.exit(-3);
                }
            }
            
            // 设置主从BrokerId参数
            switch (messageStoreConfig.getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                case SLAVE:
                    if (brokerConfig.getBrokerId() <= 0) {
                        System.out.printf("Slave's brokerId must be > 0");
                        System.exit(-3);
                    }

                    break;
                default:
                    break;
            }
            
            // 设置HA监听端口10912
            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
            // 设置日志工厂对象,主要用于Logger实例化
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
            log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
            
            //
            MixAll.printObjectProperties(log, brokerConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
            MixAll.printObjectProperties(log, nettyClientConfig);
            MixAll.printObjectProperties(log, messageStoreConfig);
            
            // 创建并初始化BrokerController对象
            final BrokerController controller = new BrokerController(//
                brokerConfig, //
                nettyServerConfig, //
                nettyClientConfig, //
                messageStoreConfig);
            // 合并当前所有配置信息防止后续丢失
            controller.getConfiguration().registerConfig(properties);
            
            // 初始化加载本地store等系统索引,消费等文件
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }

            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);

                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long begineTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - begineTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));

            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

BrokerController初始化和文件恢复加载等

    public boolean initialize() throws CloneNotSupportedException {
        boolean result = true;
        // 加载config\topics.json文件,若文件为空则加载config\topics.json.bak文件
        result = result && this.topicConfigManager.load();
        // 加载config\consumerOffset.json文件,若文件为空则加载config\consumerOffset.json.bak文件
        result = result && this.consumerOffsetManager.load();
        // 加载config\subscriptionGroup.json文件,若文件为空则加载config\subscriptionGroup.json.bak文件
        result = result && this.subscriptionGroupManager.load();

        if (result) {
            try {
            	// 初始化默认的DefaultMessageStore对象
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //加载插件,此处默认为空
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            } catch (IOException e) {
                result = false;
                e.printStackTrace();
            }
        }
        
//      1.加载延迟delayOffset.json文件
//        --加载config\delayOffset.json文件,若文件为空则加载config\delayOffset.json.bak文件
//        --设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d)
//      2.加载提交commitlog目录日志
//        --commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件
//      3.加载consumequeue目录文件,消费队列
//        --按照topic主题和设置的队列数来加载
//      4.设置checkpoint文件位置信息
//        --index目录下的所有文件
//        --如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件
//        --然后加载到索引文件集合中
//        --恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失
//        --恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251}
        result = result && this.messageStore.load();

        if (result) {
        	// 设置netty远程通讯服务,端口为10909
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            // 发送消息自定义线程池
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
            // 拉取消息自定义线程池
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
            // 管理员Broker自定义线程池
            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                    "AdminBrokerThread_"));
            // 客户端管理自定义线程池
            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));
            // 消息管理自定义线程池
            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));
            // 注册消息处理器
            this.registerProcessor();

            // TODO remove in future
            final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Exception e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Exception e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        } catch (Throwable e) {
                            log.error("ScheduledTask syncAll slave exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        return result;
    }

初始化过程如下:

1.加载延迟delayOffset.json文件
  --加载config\delayOffset.json文件,若文件为空则加载config\delayOffset.json.bak文件
  --设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d)
2.加载提交commitlog目录日志
  --commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件
3.加载consumequeue目录文件,消费队列
  --按照topic主题和设置的队列数来加载
4.设置checkpoint文件位置信息
  --index目录下的所有文件
  --如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件
  --然后加载到索引文件集合中
  --恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失
  --恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251}

© 著作权归作者所有

Jasun
粉丝 4
博文 9
码字总数 7937
作品 0
武汉
私信 提问
[RocketMQ]消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

morpheusWB
2018/09/29
184
0
消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

癫狂侠
2018/08/12
0
0
消息中间件—RocketMQ消息发送

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢? 大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适...

癫狂侠
2018/08/05
0
0
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
2018/06/30
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
2018/10/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

mysql概览

学习知识,首先要有一个总体的认识。以下为mysql概览 1-架构图 2-Detail csdn |简书 | 头条 | SegmentFault 思否 | 掘金 | 开源中国 |

程序员深夜写bug
今天
8
0
golang微服务框架go-micro 入门笔记2.2 micro工具之微应用利器micro web

micro web micro 功能非常强大,本文将详细阐述micro web 命令行的功能 阅读本文前你可能需要进行如下知识储备 golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境, golang微服务框架...

非正式解决方案
今天
6
0
前端——使用base64编码在页面嵌入图片

因为页面中插入一个图片都要写明图片的路径——相对路径或者绝对路径。而除了具体的网站图片的图片地址,如果是在自己电脑文件夹里的图片,当我们的HTML文件在别人电脑上打开的时候图片则由于...

被毒打的程序猿
今天
8
0
Flutter 系列之Dart语言概述

Dart语言与其他语言究竟有什么不同呢?在已有的编程语言经验的基础上,我们该如何快速上手呢?本篇文章从编程语言中最重要的组成部分,也就是基础语法与类型变量出发,一起来学习Dart吧 一、...

過愙
今天
5
0
rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部