RockMQ学习总结之二:rocketmq-namesrv
RockMQ学习总结之二:rocketmq-namesrv
土豆_BIG 发表于7个月前
RockMQ学习总结之二:rocketmq-namesrv
  • 发表于 7个月前
  • 阅读 6
  • 收藏 0
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

摘要: NameServer是RocketMQ的注册中心,主要是服务治理,维护topic及broker等相关信息。

主要功能:

  • 维护broker列表

    • 接收broker注册请求,并添加到相应的broker列表中
    • broker心跳定期扫描(启动后,定期(默认10s)扫描broker存活状态,删除无效broker及相关配置)
  • 维护Topic及Topic对应的队列地址信息

主要结构:

  • 类结构

    • NamesrvStartup
      • 初始化NamesrvConfig
      • 初始化NettyServerConfig
      • 初始化NamesrvController并启动
      • 添加钩子方法,优雅关闭NamesrvController
    • NamesrvController
      • 初始化KVConfigManager
      • 初始化NettyRemotingServer
      • 创建NettyServer工作线程池
      • 向NettyRemotingServer注册默认处理器(DefaultRequestProcessor)
      • 创建扫描失效的broker调度任务,默认10s扫描一次(broker心跳定期扫描)
      • 创建打印配置信息任务,默认10分钟打印一次
    • RouteInfoManager
      • 获取所有 Cluster
      • 获取所有/根据Cluster获取 Topic
      • 注册/注销 broker
      • 删除/根据brokerName删除 Topic
      • 扫描失效的broker
      • 根据Topic获取TopicRouteData
    • DefaultRequestProcessor
      • 接收Netty消息并根据RequestCode执行具体操作
        • 添加、修改、删除配置信息
        • 注册、注销broker
        • 根据Topic获取路由信息(TopicRouteData)
        • 获取所有Cluster
        • 获取所有Topic
        • 删除指定Topic
        • 根据Cluster获取Topic列表
        • 根据NameSpace获取配置信息列表
  • 数据结构

    • KVConfigManager类
      • 根据NameSpace维护配置信息列表
        private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
                new HashMap<String, HashMap<String, String>>();
    • RouteInfoManager类
      • 维护Topic与相应队列的关系
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        • QueueData实体信息
          private String brokerName;
          private int readQueueNums;
          private int writeQueueNums;
          private int perm;
          private int topicSynFlag;
      • 维护brokerName与详细信息的关系
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable>> clusterAddrTable;
        • BrokerData实体信息
          private String cluster;
          private String brokerName;
          private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
      • 维护clusterName与brokerName列表的关系
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
      • 维护broker地址与broker存活信息的关系
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        • BrokerLiveInfo
          private long lastUpdateTimestamp;
          private DataVersion dataVersion;
          private Channel channel;
          private String haServerAddr;
      • 维护broker地址与Filter列表关系
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

详解数据结构关系:

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

具体关系可以从获取Topic对应的TopicRouteData方法中了解到:

  1. 获取Topic对应的List<QueueData>
  2. 循环List<QueueData>从brokerAddrTable中获取每个QueueData的BrokerData
    1. 循环brokerData中brokerAddrs集合
    2. 根据brokerAddr从filterServerTable中获取对应的filter集合

源码RouteInfoManager类的pickupTopicRouteData方法:

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);

    try {
        try {
            this.lock.readLock().lockInterruptibly();
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if (queueDataList != null) {
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;


                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }

                for (String brokerName : brokerNameSet) {
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        BrokerData brokerDataClone = new BrokerData();
                        brokerDataClone.setBrokerName(brokerData.getBrokerName());
                        brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
                                .getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }

    if (log.isDebugEnabled()) {
        log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
    }

    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }

    return null;
}

 

核心功能:

  • broker心跳定期扫描

    • NamesrvController.initialize()中初始化
public boolean initialize() {

    this.kvConfigManager.load();


    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);


    this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    //此处初始化,创建独立的线程交给调度线程池,每10s执行一次
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    return true;
}
  • 扫描失效的broker,并删除相关联的信息
    • 遍历brokerLiveTable,判断broker是否失效(最后活跃时间+2分钟 < 当前时间)
    • 遍历brokerLiveTable,获取失效broker的brokerAddr
    • 根据brokerAddr从brokerLiveTable中删除无效记录
    • 根据brokerAddr从filterServerTable删除无效记录
    • 删除brokerData中无效的brokerAddr
    • 根据brokerName从clusterAddrTable删除无效记录
    • 根据brokerName从topicQueueTable中删除无效记录
private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        //最后活跃时间
        long last = next.getValue().getLastUpdateTimestamp();
        //最后活跃时间 + 2分钟 < 当前时间,关闭当前broker并删除相关联信息
        if ((last + BrokerChannelExpiredTime) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BrokerChannelExpiredTime);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                   //从broker存活信息表里,找到当前broker地址
                   while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }


    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                //从broker存活信息表中将此broker删除
                this.brokerLiveTable.remove(brokerAddrFound);
                //从filterServerTable删除broker信息
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                //从brokerAddrs中删除失效broker地址
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();

                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info(
                                    "remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                            break;
                        }
                    }
                    //如果broker地址为空,则从brokerAddrTable中删除此broker
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                    }
                }
                //从clusterAddrTable中删除无效broker
                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info(
                                    "remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);

                            //如果brokerNames列表为空,则删除此cluster
                            if (brokerNames.isEmpty()) {
                                log.info(
                                        "remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                it.remove();
                            }

                            break;
                        }
                    }
                }
                //从topicQueueTable中删除无效broker
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();
                        //遍历queueData集合,queueData的brokerName与无效broker相等,删除当前queueData
                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {
                            QueueData queueData = itQueueData.next();
                            if (queueData.getBrokerName().equals(brokerNameFound)) {
                                itQueueData.remove();
                                log.info(
                                        "remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                        topic, queueData);
                            }
                        }
                        //如果删除QueueData后,queueDataList为空,则删除此Topic
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                            log.info(
                                    "remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}
  • broker注册

    • 默认netty消息处理器
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",//
                request.getCode(), //
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
                request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        //注册broker
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            }
            else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        default:
            break;
    }
    return null;
}
  • 注册broker

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    //创建响应协议
    final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
    //获取响应头
    final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
    //获取请求头
    final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

    RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

    if (request.getBody() != null) {
        registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
    } else {
        registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
        registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
    }
    //注册broker的具体实现
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
            requestHeader.getClusterName(), // 1
            requestHeader.getBrokerAddr(), // 2
            requestHeader.getBrokerName(), // 3
            requestHeader.getBrokerId(), // 4
            requestHeader.getHaServerAddr(),// 5
            registerBrokerBody.getTopicConfigSerializeWrapper(), // 6
            registerBrokerBody.getFilterServerList(),//
            ctx.channel()// 7
    );
    //设置响应头
    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());


    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

RouteInfoManager类registerBroker方法

public RegisterBrokerResult registerBroker(//
                                           final String clusterName,// 1 集群名称
                                           final String brokerAddr,// 2 broker地址
                                           final String brokerName,// 3 broker名称
                                           final long brokerId,// 4 brokerId
                                           final String haServerAddr,// 5 master用来监听slave复制数据的端口
                                           final TopicConfigSerializeWrapper topicConfigWrapper,// 6 //Topic配置序列化类
                                           final List<String> filterServerList, // 7 //filter列表
                                           final Channel channel// 8
) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();

            //根据集群名称获取broker名称列表
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            //根据brokerName获取broker信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            //broker不存在,则创建broker并维护至brokerAddrTable中
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData();
                brokerData.setBrokerName(brokerName);
                HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
                brokerData.setBrokerAddrs(brokerAddrs);

                this.brokerAddrTable.put(brokerName, brokerData);
            }
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            //broker为Master且topicConfigWrapper不为空
            if (null != topicConfigWrapper //
                    && MixAll.MASTER_ID == brokerId) {
                //判断brokerTopicConfig是否改变,根据brokerAddr获取brokerLiveTable的brokerLiveInfo,
                //brokerLiveInfo != null && !brokerLiveInfo.getDataVersion().equals(topicConfigWrapper.getDataVersion())
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
                        || registerFirst) {
                    ConcurrentHashMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()){
                            //创建并更新QueueData
                            //根据TopicName获取queueList
                            //    1、null == queueList,则创建queueList并将queueData添加至topicQueueTable
                            //    2、不为空,循环queueList
                            //        1、brokerName.equals(queueData.getBrokerName())
                            //        2、不相等则删除当前brokerData,并将新的BrokerData添加至queueDataList
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            //创建新的BrokerLiveInfo与brokerAddr关系存入brokerLiveTable中
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, //
                    new BrokerLiveInfo(//
                            System.currentTimeMillis(), //
                            topicConfigWrapper.getDataVersion(),//
                            channel, //
                            haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            //添加brokerAddr对应的filterServerList
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }

            //如果broker不是Master,则获取Master地址,不为空获取Master的brokerLiveInfo并设置haServerAddr和MasterAddr
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

 

标签: RocketMQ
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 0
博文 4
码字总数 7234
×
土豆_BIG
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: