文档章节

RocketMQ源码:Producer启动分析

Jacktanger
 Jacktanger
发布于 09/15 00:05
字数 1002
阅读 51
收藏 1

    本文主要分析RocketMQ中Producer的启动过程。

    RocketMQ的版本为:4.2.0 release。

一.时序图

    根据源码,把Producer启动过程的时序图画了一遍:

 

二.源码分析

    1 start() :DefaultMQProducer启动。

    DefaultMQProducer主要功能都是在DefaultMQProducerImpl中实现的。类似的,DefaultMQPushConsumer的大部分功能也在DefaultMQPushConsumerImpl中实现:

    //DefaultMQProducer#start
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }

 

    1.1 checkConfig:检查producerGroup是否合法。

    // DefaultMQProducerImpl#checkConfig
    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
        if (null == this.defaultMQProducer.getProducerGroup()) {
            throw new MQClientException("producerGroup is null", null);
        }
        if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {// 不能等于"DEFAULT_PRODUCER"
            throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
                null);
        }
    }
    // Validators#checkGroup
    public static void checkGroup(String group) throws MQClientException {
        if (UtilAll.isBlank(group)) {// 不能为空
            throw new MQClientException("the specified group is blank", null);
        }
        if (!regularExpressionMatcher(group, PATTERN)) {
            throw new MQClientException(String.format(
                "the specified group[%s] contains illegal characters, allowing only %s", group,
                VALID_PATTERN_STR), null);
        }
        if (group.length() > CHARACTER_MAX_LENGTH) {// 长度不能大于255
            throw new MQClientException("the specified group is longer than group max length 255.", null);
        }
    }

 

    1.2 getAndCreateMQClientInstance:获取MQClientInstance。

    //MQClientManager#getAndCreateMQClientInstance
    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();// 构建该Producer的ClientID,等于IP地址@instanceName
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {// 如果当前客户端不在mq客户端实例集合中,则创建一个实例并加入
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {// 说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        return instance;
    }

 

    1.3 registerProducer:注册Producer。

    // MQClientInstance#registerProducer
    public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);// 如果没有添加过,就往producerTable中加入当前的Producer
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }
        return true;
    }

 

    1.4 MQClientInstance#start 启动mQClientFactory。

    // DefaultMQProducerImpl#start(true)
    if (startFactory) {
        mQClientFactory.start();
    }
    // MQClientInstance#start
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();// 获取nameService地址
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();// 对象负责底层消息通信,获取nameService地址
                    // Start various schedule tasks
                    this.startScheduledTask();// 启动各种定时任务
                    // Start pull service
                    this.pullMessageService.start();// 启动拉取消息服务
                    // Start rebalance service
                    this.rebalanceService.start();// 启动消费端负载均衡服务
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);// 再次调用DefaultMQProducerImpl.start(),注意传参为false。此时ServiceState还是 START_FAILED 只调用了一次心跳服务 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;// 改变serviceState状态为 RUNNING 启动状态
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

 

    1.4.1 MQClientAPIImpl#start 负责底层消息通信,启动客户端对象。

    // MQClientAPIImpl#start
    public void start() {
        this.remotingClient.start();// RemotingClient是RocketMQ封装了Netty网络通信的客户端
    }

 

    1.4.2 MQClientInstance#startScheduledTask 启动各种定时任务。

    // MQClientInstance#startScheduledTask
    private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();// 更新NameServer地址
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();// 从nameService更新Topic路由信息
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();// 清理挂掉的broker
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();// 向broker发送心跳信息
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();// 持久化consumerOffset,保存消费者的Offset
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();// 调整消费线程池
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

 

    1.5 MQClientInstance#sendHeartbeatToAllBrokerWithLock 向所有的Broker发送心跳信息。

    // MQClientInstance#sendHeartbeatToAllBrokerWithLock
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();// 向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息
                this.uploadFilterClassSource();// 向Filter过滤服务器发送REGISTER_MESSAGE_FILTER_CLASS请求码,更新过滤服务器中的Filterclass文件
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed.");
        }
    }

 

    上面就是RocketMQ中Producer的启动过程,上面分析了主要的几处地方,如果想了解启动过程中的详细代码,可以从Github上面clone代码到本地,试着调试和分析。附上地址:4.2.0 release

© 著作权归作者所有

共有 人打赏支持
Jacktanger
粉丝 19
博文 106
码字总数 63146
作品 0
浦东
程序员
私信 提问
RocketMQ源码:有序消息分析

本文主要分析RocketMQ中如何保证消息有序的。 RocketMQ的版本为:4.2.0 release。 一.时序图 还是老规矩,先把分析过程的时序图摆出来: 1.Producer发送顺序消息 2.Consumer接收顺序消息(一)...

Jacktanger
09/16
0
0
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
06/30
0
0
rocketmq4.x快速入门指南

以下采用的是版本 相关文档如下 快速体验: http://blog.seoui.com/2018/07/24/rocketmqinstall/ rocketmq简单消息发送: http://blog.seoui.com/2018/07/24/rocketmq_simple_message/ rock......

peachyy
08/02
0
0
消息中间件—RocketMQ消息发送

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢? 大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适...

癫狂侠
08/05
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
10/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Windows 10 设置 Java 环境变量

首先你需要在我的电脑中打开,找到环境变量属性。 找到环境变量属性 找到环境变量属性后单击将会看到下面的设置界面。 在这个界面中设置高级系统设置。 环境变量 在弹出的界面中选择设置环境...

honeymose
31分钟前
1
0
用any-loader封装jQuery的XHR —— 随便写着玩系列

哎,都说没人用JQuery啦,叫你别写这个。 其实我也是好高骛远使用过npm上某个和某个很出名的XHR库,嗯,认识我的人都知道我喜欢喷JQ,以前天天喷,见面第一句,你还用JQ,赶紧丢了吧。但我也...

曾建凯
今天
5
0
聊聊storm的AggregateProcessor的execute及finishBatch方法

序 本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法 实例 TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout......

go4it
今天
4
0
大数据教程(7.5)hadoop中内置rpc框架的使用教程

博主上一篇博客分享了hadoop客户端java API的使用,本章节带领小伙伴们一起来体验下hadoop的内置rpc框架。首先,由于hadoop的内置rpc框架的设计目的是为了内部的组件提供rpc访问的功能,并不...

em_aaron
今天
5
0
CentOS7+git+github创建Python开发环境

1.准备CentOS7 (1)下载VMware Workstation https://pan.baidu.com/s/1miFU8mk (2)下载CentOS7镜像 https://mirrors.aliyun.com/centos/ (3)安装CentOS7系统 http://blog.51cto.com/fengyuns......

枫叶云
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部