文档章节

zk中ZooKeeperServer解析

writeademo
 writeademo
发布于 10/17 15:55
字数 559
阅读 13
收藏 0
ZK

内部类

 

ChangeRecord 处理PrepRP和FinalRP之间的信息

static class ChangeRecord {

    ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
        this.zxid = zxid;
        this.path = path;
        this.stat = stat;
        this.childCount = childCount;
        this.acl = acl;
    }

    long zxid;

    String path;

    StatPersisted stat; /* Make sure to create a new object when changing */

    int childCount;

    List<ACL> acl; /* Make sure to create a new object when changing */

    ChangeRecord duplicate(long zxid) {
        StatPersisted stat = new StatPersisted();
        if (this.stat != null) {
            DataTree.copyStatPersisted(this.stat, stat);
        }
        return new ChangeRecord(zxid, path, stat, childCount, acl == null ? new ArrayList<>() : new ArrayList<>(acl));
    }

}


protected enum State {
    INITIAL,
    RUNNING,
    SHUTDOWN,
    ERROR
}


初始化函数
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
    serverStats = new ServerStats(this);
    this.txnLogFactory = txnLogFactory;
    this.txnLogFactory.setServerStats(this.serverStats);
    this.zkDb = zkDb;
    this.tickTime = tickTime;
    setMinSessionTimeout(minSessionTimeout);
    setMaxSessionTimeout(maxSessionTimeout);
    this.listenBacklog = clientPortListenBacklog;

    listener = new ZooKeeperServerListenerImpl(this);

    readResponseCache = new ResponseCache();

    connThrottle = new BlueThrottle();

    this.initialConfig = initialConfig;

    this.requestPathMetricsCollector = new RequestPathMetricsCollector();

    this.initLargeRequestThrottlingSettings();

    LOG.info("Created server with tickTime " + tickTime
             + " minSessionTimeout " + getMinSessionTimeout()
             + " maxSessionTimeout " + getMaxSessionTimeout()
             + " clientPortListenBacklog " + getClientPortListenBacklog()
             + " datadir " + txnLogFactory.getDataDir()
             + " snapdir " + txnLogFactory.getSnapDir());

}

通过参数构造一个数据管理Log FileTxnSnapLog

public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
    this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
}

集群和单机中加载数据

集群调用顺序

Leader#lead

ZooKeeperServer#loadData

单机调用顺序

ServerCnxFactory#startUp

ZooKeeperServer#startdata

ZooKeeperServer#loadData

 

 

 

单机版的startData方法

 

public void startdata() throws IOException, InterruptedException {
    //check to see if zkDb is not null
    if (zkDb == null) {
        zkDb = new ZKDatabase(this.txnLogFactory);//实例化zkdatabase
    }
    if (!zkDb.isInitialized()) {
        loadData();//没有初始化就重新初始化
    }
}
//加载数据
public void loadData() throws IOException, InterruptedException {
    /*
     * When a new leader starts executing Leader#lead, it
     * invokes this method. The database, however, has been
     * initialized before running leader election so that
     * the server could pick its zxid for its initial vote.
     * It does it by invoking QuorumPeer#getLastLoggedZxid.
     * Consequently, we don't need to initialize it once more
     * and avoid the penalty of loading it a second time. Not
     * reloading it is particularly important for applications
     * that host a large database.
     *
     * The following if block checks whether the database has
     * been initialized or not. Note that this method is
     * invoked by at least one other method:
     * ZooKeeperServer#startdata
     */
     //加载信息
    if (zkDb.isInitialized()) {
        setZxid(zkDb.getDataTreeLastProcessedZxid());
    } else {
        setZxid(zkDb.loadDataBase());
    }

    // Clean up dead sessions
    //获取超时deadSessions
    List<Long> deadSessions = new ArrayList<>();
    for (Long session : zkDb.getSessions()) {
        if (zkDb.getSessionWithTimeOuts().get(session) == null) {
            deadSessions.add(session);
        }
    }
    //杀掉session
    for (long session : deadSessions) {
        // TODO: Is lastProcessedZxid really the best thing to use?
        killSession(session, zkDb.getDataTreeLastProcessedZxid());
    }

    // Make a clean snapshot 创建快照
    takeSnapshot();
}



删除会话
protected void killSession(long sessionId, long zxid) {
    zkDb.killSession(sessionId, zxid);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.SESSION_TRACE_MASK,
            "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
    }
    if (sessionTracker != null) {
        sessionTracker.removeSession(sessionId);
    }
}




public synchronized void startup() {
    if (sessionTracker == null) {
        createSessionTracker();
    }
    //责任链处理
    startSessionTracker();
    //设置请求处理器
    setupRequestProcessors();

    startRequestThrottler();
    //注册jmx
    registerJMX();

    startJvmPauseMonitor();

    registerMetrics();
    //设置状态
    setState(State.RUNNING);

    requestPathMetricsCollector.start();

    localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
    notifyAll();
}

 

 

 

© 著作权归作者所有

writeademo
粉丝 25
博文 695
码字总数 265412
作品 0
东城
私信 提问
zk请求处理类PrepRequestProcessor

PrepRequestProcessor作为leader第一个请求处理器,可以识别当前客户端请求是否是事务请求,如果是,会进行一系列预处理,创建请求事务头,事务体,会话检测,ACL检查和版本检测 属性 构造方...

writeademo
10/21
11
0
ZooKeeper系列之(十一):客户端实现机制

笔者为什么要讲ZooKeeper的源码,对于程序员来说,光知道用是成为不了优秀的行家的,还要知道所以然,只有知道了内部实现机制,才能开拓眼界提高自我。而笔者认为ZooKeeper是最好的入门分布式...

守望者之父
09/09
10
0
zk中ServerCnxnFactory连接管理工厂

作为ServerCnxn的工厂抽象类 属性 ZOOKEEPERSERVERCNXN_FACTORY zookeeper.serverCnxnFactory secure 在ServerCnxnFactory中SSL是否启用 sessionMap session管理配置中信息(sessionId,Serve......

writeademo
10/17
12
0
ZooKeeper系列之(十二):服务端实现机制

服务端有3种运行方式:leader,follower,observer。leader是领导者,一个ZooKeeper集群同一时刻最多只能有一个leader。follower是跟随者,可以有多个跟随者。Observer是观察者,也可以有多个...

守望者之父
09/09
17
0
ZooKeeper源码研究系列(3)单机版服务器介绍

1 系列目录 - ZooKeeper源码研究系列(1)源码环境搭建- ZooKeeper源码研究系列(2)客户端创建连接过程分析- ZooKeeper源码研究系列(3)单机版服务器介绍- ZooKeeper源码研究系列(4)集群...

乒乓狂魔
2015/08/13
2.7K
0

没有更多内容

加载失败,请刷新页面

加载更多

交换两数(函数)

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> void Exchange(int* x, int* y){ int tmp = 0; tmp = *x; *x = *y; *y = tmp; } int main(){ int a; int b; scanf......

Lxxxxx256
11分钟前
2
0
给 K8s API “做减法”:阿里巴巴云原生应用管理的挑战和实践

作者 | 孙健波(天元) 阿里巴巴技术专家 本文整理自 11 月 21 日社群分享,每月 2 场高质量分享,点击加入社群。 早在 2011 年,阿里巴巴内部便开始了应用容器化,当时最开始是基于 LXC 技术...

阿里巴巴云原生
今天
6
0
数据平面

3.1数据平面的任务 解析数据包头 转发数据包到某些端口 通过查询由控制平面所生成的转发表 传统网络数据平面 数据包--输入端口---拆封和解析,转发策略匹配,转发调度---输出端口(协议相关,...

Firefly-
昨天
6
0
如何高效的阅读uni-app框架?(建议收藏)

作者 | Jeskson 来源 | 达达前端小酒馆 uni-app的框架,配置:page.json,manifest.json,package.json,vue.config.js。脚本,应用程序,main.js。日志打印,定时器,生命周期,页面,页面通...

达达前端小酒馆
昨天
7
0
实现原理专题--存储器的实现(三)

计算机实现原理专题--存储器的实现(二)中描述了一种电平触发器,但是某些应用需要在保持位从0到1变化的过程中对数据端进行保存。这种触发器叫边沿触发器。 一开始Q为0,时钟信号为0。当数据...

FAT_mt
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部