文档章节

聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

go4it
 go4it
发布于 2019/12/07 10:43
字数 850
阅读 27
收藏 0

本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock

sendHeartbeatToAllBrokerWithLock

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();

    //......

    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed.");
        }
    }

    private void sendHeartbeatToAllBroker() {
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            log.warn("sending heartbeat, but no consumer and no producer");
            return;
        }

        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, HashMap<Long, String>> entry = it.next();
                String brokerName = entry.getKey();
                HashMap<Long, String> oneTable = entry.getValue();
                if (oneTable != null) {
                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                        Long id = entry1.getKey();
                        String addr = entry1.getValue();
                        if (addr != null) {
                            if (consumerEmpty) {
                                if (id != MixAll.MASTER_ID)
                                    continue;
                            }

                            try {
                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                if (!this.brokerVersionTable.containsKey(brokerName)) {
                                    this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                }
                                this.brokerVersionTable.get(brokerName).put(addr, version);
                                if (times % 20 == 0) {
                                    log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                    log.info(heartbeatData.toString());
                                }
                            } catch (Exception e) {
                                if (this.isBrokerInNameServer(addr)) {
                                    log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                                } else {
                                    log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    private void uploadFilterClassSource() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> next = it.next();
            MQConsumerInner consumer = next.getValue();
            if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
                Set<SubscriptionData> subscriptions = consumer.subscriptions();
                for (SubscriptionData sub : subscriptions) {
                    if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
                        final String consumerGroup = consumer.groupName();
                        final String className = sub.getSubString();
                        final String topic = sub.getTopic();
                        final String filterClassSource = sub.getFilterClassSource();
                        try {
                            this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                        } catch (Exception e) {
                            log.error("uploadFilterClassToAllFilterServer Exception", e);
                        }
                    }
                }
            }
        }
    }

    //......
}
  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

sendHearbeat

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

    private final static InternalLogger log = ClientLogger.getLog();
    private static boolean sendSmartMsg =
        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

    static {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    }

    //......

    public int sendHearbeat(
        final String addr,
        final HeartbeatData heartbeatData,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return response.getVersion();
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    //......
}
  • MQClientAPIImpl的sendHearbeat方法首先构建RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null),之后通过remotingClient.invokeSync(addr, request, timeoutMillis)执行请求,成功的话返回response.getVersion(),否则抛出MQBrokerException(response.getCode(), response.getRemark())

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;

    public ClientManageProcessor(final BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    //......

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }

    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),
            heartbeatData.getClientID(),
            request.getLanguage(),
            request.getVersion()
        );

        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                    data.getGroupName());
            boolean isNotifyConsumerIdsChangedEnable = true;
            if (null != subscriptionGroupConfig) {
                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                int topicSysFlag = 0;
                if (data.isUnitMode()) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                }
                String newTopic = MixAll.getRetryTopic(data.getGroupName());
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,
                    subscriptionGroupConfig.getRetryQueueNums(),
                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            }

            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );

            if (changed) {
                log.info("registerConsumer info changed {} {}",
                    data.toString(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                );
            }
        }

        for (ProducerData data : heartbeatData.getProducerDataSet()) {
            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                clientChannelInfo);
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    //......
}
  • processRequest方法对于RequestCode.HEART_BEAT会执行heartBeat(ctx, request)方法,该方法会解析body为heartbeatData,之后遍历heartbeatData的consumerData及producerData做不同处理,最后返回处理结果
  • 对于consumerData,会先取出subscriptionGroupConfig,对于config不为null的会执行brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod方法;最后执行brokerController.getConsumerManager().registerConsumer方法
  • 对于producerData,则执行brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo)方法

小结

  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1250
码字总数 1168102
作品 0
深圳
私信 提问
加载中

评论(0)

聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
428
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
35
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
221
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
39
0
聊聊rocketmq的SequenceProducerImpl

序 本文主要研究一下rocketmq的SequenceProducerImpl SequenceProducerImpl io/openmessaging/rocketmq/producer/SequenceProducerImpl.java 采用的是LinkedBlockingQueue,send方法实际调用......

go4it
2018/07/30
21
0

没有更多内容

加载失败,请刷新页面

加载更多

突破CRUD | 万能树工具类封装

0、学完本文你或许可以收获 感受一个树工具从初始逐步优化完善的过程 树工具封装的设计思考与实现思路 最后收获一款拿来即用的树工具源代码 对于前端树组件有一定了解和使用过的同学可直接跳...

风象南
39分钟前
169
0
精选开源SpringBoot项目:涵盖权限、搜索、秒杀、支付!

发文以来,分享了一些科技资讯,今天首次整理一些开源项目分享。既是对自己的学习督促,也是对有需要人的一种便利。 开源项目一: mall商城 star数:29.8k 项目地址:https://github.com/ma...

Java进阶程序员xx
49分钟前
79
0
Xilinx Zynq-7000 SoC高性能处理器的串口、CAN接口

TLZ7x-EasyEVM是广州创龙基于Xilinx Zynq-7000 SoC设计的高速数据采集处理开发板,采用核心板+底板的设计方式,尺寸为160mm*108mm,它主要帮助开发者快速评估核心板的性能。 核心板采用12层板...

Tronlong创龙
今天
73
0
工作记录-图片高亮和 IE 下载图片错误

去年年底换了新的新项目组,又开始用自己喜欢的 angular 框架开发了,开心[眯眼笑]。这段时间做了几个需求,这里记录一下需要注意的几个点。 图片高亮功能 在原来的DOM层上添加遮罩,添加新的...

IrisHuang
今天
71
0
如何修复MacBook Pro过热:保持MacBook散热的13个技巧

尽管MacBook Pro是一台能处理繁重工作量的高能效机器,但它却无法像市场上其他笔记本电脑一样避免过热。至于什么可以防止发热,那不是一两个技巧就能解决的问题。相反,这是一组技巧可以解决...

mac小叮当
今天
63
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部