文档章节

RockMQ学习总结之二:rocketmq-namesrv

土豆_BIG
 土豆_BIG
发布于 2017/07/20 16:19
字数 2105
阅读 10
收藏 0
点赞 0
评论 0

主要功能:

  • 维护broker列表

    • 接收broker注册请求,并添加到相应的broker列表中
    • broker心跳定期扫描(启动后,定期(默认10s)扫描broker存活状态,删除无效broker及相关配置)
  • 维护Topic及Topic对应的队列地址信息

主要结构:

  • 类结构

    • NamesrvStartup
      • 初始化NamesrvConfig
      • 初始化NettyServerConfig
      • 初始化NamesrvController并启动
      • 添加钩子方法,优雅关闭NamesrvController
    • NamesrvController
      • 初始化KVConfigManager
      • 初始化NettyRemotingServer
      • 创建NettyServer工作线程池
      • 向NettyRemotingServer注册默认处理器(DefaultRequestProcessor)
      • 创建扫描失效的broker调度任务,默认10s扫描一次(broker心跳定期扫描)
      • 创建打印配置信息任务,默认10分钟打印一次
    • RouteInfoManager
      • 获取所有 Cluster
      • 获取所有/根据Cluster获取 Topic
      • 注册/注销 broker
      • 删除/根据brokerName删除 Topic
      • 扫描失效的broker
      • 根据Topic获取TopicRouteData
    • DefaultRequestProcessor
      • 接收Netty消息并根据RequestCode执行具体操作
        • 添加、修改、删除配置信息
        • 注册、注销broker
        • 根据Topic获取路由信息(TopicRouteData)
        • 获取所有Cluster
        • 获取所有Topic
        • 删除指定Topic
        • 根据Cluster获取Topic列表
        • 根据NameSpace获取配置信息列表
  • 数据结构

    • KVConfigManager类
      • 根据NameSpace维护配置信息列表
        private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
                new HashMap<String, HashMap<String, String>>();
    • RouteInfoManager类
      • 维护Topic与相应队列的关系
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        • QueueData实体信息
          private String brokerName;
          private int readQueueNums;
          private int writeQueueNums;
          private int perm;
          private int topicSynFlag;
      • 维护brokerName与详细信息的关系
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable>> clusterAddrTable;
        • BrokerData实体信息
          private String cluster;
          private String brokerName;
          private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
      • 维护clusterName与brokerName列表的关系
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
      • 维护broker地址与broker存活信息的关系
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        • BrokerLiveInfo
          private long lastUpdateTimestamp;
          private DataVersion dataVersion;
          private Channel channel;
          private String haServerAddr;
      • 维护broker地址与Filter列表关系
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

详解数据结构关系:

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

具体关系可以从获取Topic对应的TopicRouteData方法中了解到:

  1. 获取Topic对应的List<QueueData>
  2. 循环List<QueueData>从brokerAddrTable中获取每个QueueData的BrokerData
    1. 循环brokerData中brokerAddrs集合
    2. 根据brokerAddr从filterServerTable中获取对应的filter集合

源码RouteInfoManager类的pickupTopicRouteData方法:

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);

    try {
        try {
            this.lock.readLock().lockInterruptibly();
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if (queueDataList != null) {
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;


                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }

                for (String brokerName : brokerNameSet) {
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        BrokerData brokerDataClone = new BrokerData();
                        brokerDataClone.setBrokerName(brokerData.getBrokerName());
                        brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
                                .getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }

    if (log.isDebugEnabled()) {
        log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
    }

    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }

    return null;
}

 

核心功能:

  • broker心跳定期扫描

    • NamesrvController.initialize()中初始化
public boolean initialize() {

    this.kvConfigManager.load();


    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);


    this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    //此处初始化,创建独立的线程交给调度线程池,每10s执行一次
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    return true;
}
  • 扫描失效的broker,并删除相关联的信息
    • 遍历brokerLiveTable,判断broker是否失效(最后活跃时间+2分钟 < 当前时间)
    • 遍历brokerLiveTable,获取失效broker的brokerAddr
    • 根据brokerAddr从brokerLiveTable中删除无效记录
    • 根据brokerAddr从filterServerTable删除无效记录
    • 删除brokerData中无效的brokerAddr
    • 根据brokerName从clusterAddrTable删除无效记录
    • 根据brokerName从topicQueueTable中删除无效记录
private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        //最后活跃时间
        long last = next.getValue().getLastUpdateTimestamp();
        //最后活跃时间 + 2分钟 < 当前时间,关闭当前broker并删除相关联信息
        if ((last + BrokerChannelExpiredTime) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BrokerChannelExpiredTime);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                   //从broker存活信息表里,找到当前broker地址
                   while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }


    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                //从broker存活信息表中将此broker删除
                this.brokerLiveTable.remove(brokerAddrFound);
                //从filterServerTable删除broker信息
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                //从brokerAddrs中删除失效broker地址
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();

                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info(
                                    "remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                            break;
                        }
                    }
                    //如果broker地址为空,则从brokerAddrTable中删除此broker
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                    }
                }
                //从clusterAddrTable中删除无效broker
                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info(
                                    "remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);

                            //如果brokerNames列表为空,则删除此cluster
                            if (brokerNames.isEmpty()) {
                                log.info(
                                        "remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                it.remove();
                            }

                            break;
                        }
                    }
                }
                //从topicQueueTable中删除无效broker
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();
                        //遍历queueData集合,queueData的brokerName与无效broker相等,删除当前queueData
                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {
                            QueueData queueData = itQueueData.next();
                            if (queueData.getBrokerName().equals(brokerNameFound)) {
                                itQueueData.remove();
                                log.info(
                                        "remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                        topic, queueData);
                            }
                        }
                        //如果删除QueueData后,queueDataList为空,则删除此Topic
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                            log.info(
                                    "remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}
  • broker注册

    • 默认netty消息处理器
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",//
                request.getCode(), //
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
                request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        //注册broker
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            }
            else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        default:
            break;
    }
    return null;
}
  • 注册broker

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    //创建响应协议
    final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
    //获取响应头
    final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
    //获取请求头
    final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

    RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

    if (request.getBody() != null) {
        registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
    } else {
        registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
        registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
    }
    //注册broker的具体实现
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
            requestHeader.getClusterName(), // 1
            requestHeader.getBrokerAddr(), // 2
            requestHeader.getBrokerName(), // 3
            requestHeader.getBrokerId(), // 4
            requestHeader.getHaServerAddr(),// 5
            registerBrokerBody.getTopicConfigSerializeWrapper(), // 6
            registerBrokerBody.getFilterServerList(),//
            ctx.channel()// 7
    );
    //设置响应头
    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());


    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

RouteInfoManager类registerBroker方法

public RegisterBrokerResult registerBroker(//
                                           final String clusterName,// 1 集群名称
                                           final String brokerAddr,// 2 broker地址
                                           final String brokerName,// 3 broker名称
                                           final long brokerId,// 4 brokerId
                                           final String haServerAddr,// 5 master用来监听slave复制数据的端口
                                           final TopicConfigSerializeWrapper topicConfigWrapper,// 6 //Topic配置序列化类
                                           final List<String> filterServerList, // 7 //filter列表
                                           final Channel channel// 8
) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();

            //根据集群名称获取broker名称列表
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            //根据brokerName获取broker信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            //broker不存在,则创建broker并维护至brokerAddrTable中
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData();
                brokerData.setBrokerName(brokerName);
                HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
                brokerData.setBrokerAddrs(brokerAddrs);

                this.brokerAddrTable.put(brokerName, brokerData);
            }
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            //broker为Master且topicConfigWrapper不为空
            if (null != topicConfigWrapper //
                    && MixAll.MASTER_ID == brokerId) {
                //判断brokerTopicConfig是否改变,根据brokerAddr获取brokerLiveTable的brokerLiveInfo,
                //brokerLiveInfo != null && !brokerLiveInfo.getDataVersion().equals(topicConfigWrapper.getDataVersion())
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
                        || registerFirst) {
                    ConcurrentHashMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()){
                            //创建并更新QueueData
                            //根据TopicName获取queueList
                            //    1、null == queueList,则创建queueList并将queueData添加至topicQueueTable
                            //    2、不为空,循环queueList
                            //        1、brokerName.equals(queueData.getBrokerName())
                            //        2、不相等则删除当前brokerData,并将新的BrokerData添加至queueDataList
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            //创建新的BrokerLiveInfo与brokerAddr关系存入brokerLiveTable中
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, //
                    new BrokerLiveInfo(//
                            System.currentTimeMillis(), //
                            topicConfigWrapper.getDataVersion(),//
                            channel, //
                            haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            //添加brokerAddr对应的filterServerList
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }

            //如果broker不是Master,则获取Master地址,不为空获取Master的brokerLiveInfo并设置haServerAddr和MasterAddr
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

 

© 著作权归作者所有

共有 人打赏支持
土豆_BIG
粉丝 0
博文 5
码字总数 7234
作品 0
朝阳
rocketmq学习过程中踩过的坑总结

1.连接异常 前几天在虚拟机上部署了rocketmq,一切正常,今天把rocketmq搬到了我的云主机上,一直报错 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: conn...

qq_23603437
04/21
0
0
RocketMQ(六):namesrv再探

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
04/11
0
0
RocketMQ部分数据消费不了问题排查

问题现象 今天忽然收到RocketMQ预警信息如下: 提醒有部分数据没有消费,产生堆积情况。 打开RocketMq-Console-Ng查看如下图形式: 备注:第一反应是Consumer Group内订阅了多个topic?(为什...

匠心零度
05/14
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
06/19
0
0
让你rocketmq用得比预期要好的 1 种方法

匠心零度 转载请注明原创出处,谢谢! 方法 让你rocketmq用得比预期要好的 1 种方法:就是认真思考下面的几个问题: 使用rocketmq能解决那些问题?那些问题是不能解决的? 我们什么时候该添加...

匠心零度
04/19
0
0
RocketMQ源码之Producer获取topicPublishInfo

从RocketMQ架构解析中我们了解到了RocketMQ的架构设计原理,接下来我们根据架构图来解析各个步骤的源码,探索RocketMQ是怎么实现相关功能的,从Producer发送消息开始。 下面是Producer发送一...

激情的狼王
04/25
0
0
Mac和Linux中Apache RocketMQ的安装和使用(亲测有效,不服来战)

一、项目需要用到Apache RocketMQ Apache RocketMQ™ is an open source distributed messaging and streaming data platform. 这是阿里开源的一个消息中间件框架。 官网:官网 二、下面来快...

王木东
04/09
0
0
linux下RocketMQ的安装

下载和构建 从 https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip 下载 4.2.0 的源码版本,执行以下命令来解压4.2.0源码版本并构建二进制文...

yushiwh
05/21
0
0
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
06/30
0
0
RocketMQ 消息发送与消费源码分析

MQ在我们日常开发过程中有着不可替代的作用,不仅可以帮助我们做到信息在系统间的传递,还能进行系统间的解耦合,也就是说消息的发送端与接收端不会有强依赖关系(例如接口调用)。市场上MQ的...

数齐
07/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

【面试题】盲人坐飞机

有100位乘客乘坐飞机,其中有一位是盲人,每位乘客都按自己的座位号就坐。由于盲人看不见自己的座位号,所以他可能会坐错位置,而自己的座位被占的乘客会随便找个座位就坐。问所有乘客都坐对...

garkey
48分钟前
0
0
谈谈神秘的ES6——(二)ES6的变量

谈谈神秘的ES6——(二)ES6的变量 我们在《零基础入门JavaScript》的时候就说过,在ES5里,变量是有弊端的,我们先来回顾一下。 首先,在ES5中,我们所有的变量都是通过关键字var来定义的。...

JandenMa
今天
1
0
arts-week1

Algorithm 594. Longest Harmonious Subsequence - LeetCode 274. H-Index - LeetCode 219. Contains Duplicate II - LeetCode 217. Contains Duplicate - LeetCode 438. Find All Anagrams ......

yysue
今天
0
0
NNS拍卖合约

前言 关于NNS的介绍,这里就不多做描述,相关的信息可以查看NNS的白皮书http://doc.neons.name/zh_CN/latest/nns_background.html。 首先nns中使用的竞价货币是sgas,关于sgas介绍可以戳htt...

红烧飞鱼
今天
1
0
Java IO类库之管道流PipeInputStream与PipeOutputStream

一、java管道流介绍 在java多线程通信中管道通信是一种重要的通信方式,在java中我们通过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通信。多线程管道通信的...

老韭菜
今天
0
0
用Python绘制红楼梦词云图,竟然发现了这个!

Python在数据分析中越来越受欢迎,已经达到了统计学家对R的喜爱程度,Python的拥护者们当然不会落后于R,开发了一个个好玩的数据分析工具,下面我们来看看如何使用Python,来读红楼梦,绘制小...

猫咪编程
今天
1
0
Java中 发出请求获取别人的数据(阿里云 查询IP归属地)

1.效果 调用阿里云的接口 去定位IP地址 2. 代码 /** * 1. Java中远程调用方法 * http://localhost:8080/mavenssm20180519/invokingUrl.action * @Title: invokingUrl * @Description: * @ret......

Lucky_Me
今天
1
0
protobuf学习笔记

相关文档 Protocol buffers(protobuf)入门简介及性能分析 Protobuf学习 - 入门

OSC_fly
昨天
0
0
Mybaties入门介绍

Mybaties和Hibernate是我们在Java开发中应用的比较多的两个ORM框架。当然,目前Mybaties正在慢慢取代Hibernate,这是因为相比较Hibernate而言Mybaties性能更好,响应更快,更加灵活。我们在开...

王子城
昨天
2
0
编程学习笔记之python深入之装饰器案例及说明文档[图]

编程学习笔记之python深入之装饰器案例及说明文档[图] 装饰器即在不对一个函数体进行任何修改,以及不改变整体的原本意思的情况下,增加函数功能的新函数,因为这个新函数对旧函数进行了装饰...

原创小博客
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部