文档章节

Rocketmq之namesrv启动流程源码详解分析

tantexian
 tantexian
发布于 2016/07/06 19:33
字数 2376
阅读 1999
收藏 6

云笔记版本地址:http://note.youdao.com/yws/public/redirect/share?id=86ccd81cf192ebe8af5bf8742aa60c84&type=false

 

一、namesrv整体类图:

NamesrvStartup:namesrv启动的入口类。
NamesrvConfig:namesrv配置文件类
NettysystemConfig:Netty配置文件类设置监听接口及发送接收信息用于与其他模块的远程通信
NamesrvController:namesrv初始化服务控制对象(核心类)
RouteInfoManager:namesrv管理所有broker的路由及topic配置信息类
DefaultRequestProcessor:对netty的封装(封装通信交互协议),管理和启动netty网络通信框架
 
 
 
 

 

二、namesrvStartup时序图:

 

 

 

三、源码分析详解:

根据时序图来讲解namesrv启动流程(核心方法讲解):

1、执行NamesrvStartup类的启动入口main方法,再调用main0

// 此处为Namesrv启动的入口函数 2016/7/6 Add by tantexixan
public static void main(String[] args) {
main0(args);// 调用main0函数
}

 

2、main0方法中获取NamesrvConfig、NettyServerConfig配置:

// 初始化配置文件
final NamesrvConfig namesrvConfig = new NamesrvConfig();// 加载namesrv的相关配置项
// 加载nettyServerConfig配置项( Netty是一个高性能、异步事件驱动的NIO框架,用来与其他模块通信交互,例如broker模块通信)
// netty具体的启动工作为后续函数controller.start();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置netty监听端口为9876
nettyServerConfig.setListenPort(9876);

 

3、创建NamesrvController对象,并执行initialize方法:

// 初始化服务控制对象(NamesrvController为核心类,保存了大量的namesrv需要的信息)
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// initialize核心函数,其中主要作用为:
// 1、初始化netty相关配置
// 2、定义broker与namesrv通过netty进行通信,的通信协议(即请求中带上code,来代表对应调用哪个方法函数)
// 3、定时每10s扫描broker信息,如果过期则移除
// 4、定时每10s将configTable的信息记录到日志文件中
boolean initResult = controller.initialize();
if (!initResult) {
    controller.shutdown();
System.exit(-3);
}

 

3-1、跟进到NamesrvController构造函数:(创建KVConfigManager、RouteInfoManager、BrokerHousekeepingService对象)


public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig; // namesrv相关配置
this.nettyServerConfig = nettyServerConfig; // netty相关配置
this.kvConfigManager = new KVConfigManager(this); // KV配置管理
this.routeInfoManager = new RouteInfoManager(); // 路由信息、topic信息管理
this.brokerHousekeepingService = new BrokerHousekeepingService(this); // broker管理服务
}

其中此处重点应该关注的是RouteInfoManager类:

 

3-1-1、跟进,详解RouteInfoManager的registerBroker方法:

 
/**
* broker向namesrv注册函数
* 主要功能步骤包括:
* 1、将当前请求注册的broker信息保存或者更新到clusterAddrTable、brokerAddrTable中
* 2、将当前请求注册的broker的topic信息,保存或者更新到topicQueueTable中
* -- 其中isBrokerTopicConfigChanged用来判断当前请求broker信息是否为最新版本,如果是则替换,不是则跳过
* -- createAndUpdateQueueData为具体觉得创建还是更新topicQueueTable
* -- 其中topicQueueTable中保存了对应topic的queueDate,queueDate保存了broker的name、write及read的queue数量,及topicSynFlag
* 3、如果当前broker为master节点,则直接按照上述步骤更新,如果为slave节点,则将haServerAddr、masterAddr等信息设置到result返回值中
* @author tantexian
* @since 2016/7/6
* @return 如果是slave,则返回master的ha地址
*/
public RegisterBrokerResult registerBroker(//

final String clusterName,// 1
final String brokerAddr,// 2
final String brokerName,// 3
final long brokerId,// 4
final String haServerAddr,// 5
final TopicConfigSerializeWrapper topicConfigWrapper,// 6
final List<String> filterServerList, // 7
final Channel channel// 8
) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 加写锁
this.lock.writeLock().lockInterruptibly();

// 更新集群信息(根据集群名字,获取当前集群下面的所有brokerName)
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
// 如果当前集群下面brokerNames为空,则将当前请求broker加入到clusterAddrTable中
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

boolean registerFirst = false;

// 更新主备信息(在brokerAddrTable中获取所有的brokerDAte)
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// 如果当前不存在brokerDate,即还没有broker向namesrv注册,则直接将当前broker信息put加入
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData();
brokerData.setBrokerName(brokerName);
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerData.setBrokerAddrs(brokerAddrs);

this.brokerAddrTable.put(brokerName, brokerData);
}
// 获取当前注册broker的brokerAddr地址
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);

// 更新Topic信息
if (null != topicConfigWrapper //如果topicConfigWrapper不为空,且当前brokerId == 0,即为当前broker为master
&& MixAll.MASTER_ID == brokerId) {
// 如果Topic配置信息发生变更或者该broker为第一次注册
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
|| registerFirst) {
// 获取所有topic信息
ConcurrentHashMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
// 遍历所有Topic
for (String topic : tcTable.keySet()) {
TopicConfig topicConfig = tcTable.get(topic);
// 根据brokername及topicconfig(read、write queue数量等)新增或者更新到topicQueueTable中
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
}
}

// 更新最后变更时间(将brokerLiveTable中保存的对应的broker的更新时间戳,设置为当前时间)
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, //
new BrokerLiveInfo(//
System.currentTimeMillis(), //
topicConfigWrapper.getDataVersion(),//
channel, //
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
}

// 更新Filter Server列表
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
}
else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}

// 返回值(如果当前broker为slave节点)则将haServerAddr、masterAddr等信息设置到result返回值中
if (MixAll.MASTER_ID != brokerId) {
// 通过brokename的brokedate获取当前slave节点的master节点addr
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
}
finally {// 释放写锁
this.lock.writeLock().unlock();
}
}
catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

 

3-2、跟进到controller.initialize():

public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();

// 初始化通信层
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

// 初始化线程池(根据getServerWorkerThreads值,启动相应数量线程)
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactoryImpl("RemotingExecutorThread_"));

// 此注册函数主要作用就是,定义RequestCode,用来作为netty的通信协议字段
// 即:如果broker通过netty发送通信请求,其中请求信息中带有code == RequestCode.REGISTER_BROKER,
//那么在namesrv的netty端接收到该通信连接时候,
// 则对应调用namesrv的DefaultRequestProcessor类下面的registerBroker方法,从而完成broker向namesrv注册
// 具体请参考com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor类
// 更多关于netty在gmq中的通信机制及原理,请关注后续博文(博客地址为:http://my.oschina.net/tantexian)
this.registerProcessor();

// 增加定时任务(延时5秒,每间隔10s钟,定时扫描一次)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
        public void run() {
// 定时扫描notActive的broker(若发现broker过期,则清除该broker与namesrv之间的socketChanel通道)
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
    }, 5, 10, TimeUnit.SECONDS);

// 增加定时任务(延时1秒,每间隔10s钟,定时扫描一次)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
        public void run() {
// 定时将configTable相关信息记录到日志文件中
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
    }, 1, 10, TimeUnit.MINUTES);

// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    //
    // @Override
    // public void run() {
    // NamesrvController.this.routeInfoManager.printAllPeriodically();
    // }
    // }, 1, 5, TimeUnit.MINUTES);

return true;
}

 

3-2-1、跟进到this.remotingServer = newNettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

下述函数为对netty的封装,其中new NioEventLoopGroup为启动一定数量的netty线程来用来与broker通信

 

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig
.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
}

this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);


        @Override
        public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
    });

    this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);


        @Override
        public Thread newThread(Runnable r) {
return new Thread(r,
String.format("NettyBossSelector_%d", this.threadIndex.incrementAndGet()));
}
    });

    this.eventLoopGroupWorker =
            new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();


                @Override
                public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerSelector_%d_%d", threadTotal,
                        this.threadIndex.incrementAndGet()));
}
            });
}

 

3-2-2、this.registerProcessor();

 

 
// 此注册函数主要作用就是,定义RequestCode,用来作为netty的通信协议字段
// 即:如果broker通过netty发送通信请求,其中请求信息中带有code == RequestCode.REGISTER_BROKER,
//那么在namesrv的netty端接收到该通信连接时候,
// 则对应调用namesrv的DefaultRequestProcessor类下面的registerBroker方法,从而完成broker向namesrv注册
// 具体请参考com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor类
// 更多关于netty在gmq中的通信机制及原理,请关注后续博文(博客地址为:http://my.oschina.net/tantexian)
this.registerProcessor();

 

3-2-3、增加定时任务(延时5秒,每间隔10s钟,定时扫描一次),扫描notActive的broker(若发现broker过期,则清除该broker与namesrv之间的socketChanel通道)

// 增加定时任务(延时5秒,每间隔10s钟,定时扫描一次)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
        public void run() {
// 定时扫描notActive的broker(若发现broker过期,则清除该broker与namesrv之间的socketChanel通道)
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
    }, 5, 10, TimeUnit.SECONDS);

 

3-2-4、增加定时任务,每10s定时,将configTable相关信息记录到日志文件中:

// 增加定时任务(延时1秒,每间隔10s钟,定时扫描一次)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
    public void run() {
// 定时将configTable相关信息记录到日志文件中
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

 

4、设置一个jvm退出勾子函数,即jvm退出时,此处线程调用controller.shutdown(),清理controller相关资源:

 

// 设置一个jvm退出勾子函数,即jvm退出时,此处线程调用controller.shutdown(),清理controller相关资源
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);


    @Override
    public void run() {
synchronized (this) {
log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
            if (!this.hasShutdown) {
this.hasShutdown = true;
                long begineTime = System.currentTimeMillis();
controller.shutdown();
                long consumingTimeTotal = System.currentTimeMillis() - begineTime;
log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
}
        }
    }
}, "ShutdownHook"));

 

5、启动服务(主要就是启动netty监听网络通信请求,即初始化netty启动异步通信server):

// 启动服务(主要就是启动netty监听网络通信请求,即初始化netty启动异步通信server)
controller.start();

 

5-1、跟进到nameController的start方法:

public void start() throws Exception {
this.remotingServer.start();
}

 

5-1-1、跟进到NettyRemotingServer的start方法:

主要功能即,通过封装调用netty启动netty异步通信框架。

@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyServerConfig.getServerWorkerThreads(), //
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);


            @Override
            public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerWorkerThread_" + this.threadIndex.incrementAndGet());
}
        });

ServerBootstrap childHandler = //
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NioServerSocketChannel.class)
//
.option(ChannelOption.SO_BACKLOG, 1024)
//
.option(ChannelOption.SO_REUSEADDR, true)
//
.option(ChannelOption.SO_KEEPALIVE, false)
//
.childOption(ChannelOption.TCP_NODELAY, true)
//
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
//
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
                    public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//
defaultEventExecutorGroup, //
new NettyEncoder(), //
new NettyDecoder(), //
new IdleStateHandler(0, 0, nettyServerConfig
.getServerChannelMaxIdleTimeSeconds()),//
new NettyConnetManageHandler(), //
new NettyServerHandler());
}
                });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
// 这个选项有可能会占用大量堆外内存,暂时不使用。
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
}
catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}

if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}

// 每隔1秒扫描下异步调用超时情况
this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
        public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
}
catch (Exception e) {
log.error("scanResponseTable exception", e);
}
        }
    }, 1000 * 3, 1000);
}

© 著作权归作者所有

tantexian
粉丝 225
博文 527
码字总数 746616
作品 0
成都
架构师
私信 提问
加载中

评论(1)

叫那扬清风
叫那扬清风
源码分析的正确打开方式
RocketMQ部分数据消费不了问题排查

问题现象 今天忽然收到RocketMQ预警信息如下: 提醒有部分数据没有消费,产生堆积情况。 打开RocketMq-Console-Ng查看如下图形式: 备注:第一反应是Consumer Group内订阅了多个topic?(为什...

匠心零度
2018/10/29
0
0
消息中间件 RocketMQ 源码解析 —— 调试环境搭建

摘要: 原创出处 www.iocoder.cn/RocketMQ/bu… 「芋道源码」欢迎转载,保留摘要,谢谢! 0. 友情提示 1. 依赖工具 2. 源码拉取 3. 启动 RocketMQ Namesrv 4. 启动 RocketMQ Broker 5. 启动 ...

芋道源码_以德服人_不服就干
01/31
0
0
rocketmq源码解析之NamesrvController创建

说在前面 本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡、容错的一些处理,4.3以上支持消息事务,有管理控制台、命...

天河2018
04/12
130
0
rocketmq4.x快速入门指南

以下采用的是版本 相关文档如下 快速体验: http://blog.seoui.com/2018/07/24/rocketmqinstall/ rocketmq简单消息发送: http://blog.seoui.com/2018/07/24/rocketmq_simple_message/ rock......

peachyy
2018/08/02
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
2018/10/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

mysql-connector-java升级到8.0后保存时间到数据库出现了时差

在一个新项目中用到了新版的mysql jdbc 驱动 <dependency>     <groupId>mysql</groupId>     <artifactId>mysql-connector-java</artifactId>     <version>8.0.18</version> ......

ValSong
40分钟前
5
0
Spring Boot 如何部署到 Linux 中的服务

打包完成后的 Spring Boot 程序如何部署到 Linux 上的服务? 你可以参考官方的有关部署 Spring Boot 为 Linux 服务的文档。 文档链接如下: https://docs.ossez.com/spring-boot-docs/docs/r...

honeymoose
42分钟前
5
0
Spring Boot 2 实战:使用 Spring Boot Admin 监控你的应用

1. 前言 生产上对 Web 应用 的监控是十分必要的。我们可以近乎实时来对应用的健康、性能等其他指标进行监控来及时应对一些突发情况。避免一些故障的发生。对于 Spring Boot 应用来说我们可以...

码农小胖哥
今天
6
0
ZetCode 教程翻译计划正式启动 | ApacheCN

原文:ZetCode 协议:CC BY-NC-SA 4.0 欢迎任何人参与和完善:一个人可以走的很快,但是一群人却可以走的更远。 ApacheCN 学习资源 贡献指南 本项目需要校对,欢迎大家提交 Pull Request。 ...

ApacheCN_飞龙
今天
4
0
CSS定位

CSS定位 relative相对定位 absolute绝对定位 fixed和sticky及zIndex relative相对定位 position特性:css position属性用于指定一个元素在文档中的定位方式。top、right、bottom、left属性则...

studywin
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部