文档章节

zk中QuorumPeer

writeademo
 writeademo
发布于 10/16 16:30
字数 671
阅读 12
收藏 0
ZK

内部类

AddressTuple 地址组

QuorumServer

ServerState状态looking f o l

ZabState ZabState当前状态

SyncMode 同步机制

LearnerType 学习类型

属性

Vote currentVote

节点认为当前服务是谁

 

方法

构造函数

public QuorumPeer() throws SaslException {
    super("QuorumPeer");
    quorumStats = new QuorumStats(this);
    jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
    adminServer = AdminServerFactory.createAdminServer();
    x509Util = createX509Util();
    initialize();
}


启动方法
选举状态
epoch持久化

QuorumServer记录服务相关的属性信息等
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
    this.id = id;
    this.addr = addr;
    this.electionAddr = electionAddr;
    this.type = type;
    this.clientAddr = clientAddr;
    setMyAddrs();
}

构造函数
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
    this();
    this.cnxnFactory = cnxnFactory;
    this.electionType = electionType;
    this.myid = myid;
    this.tickTime = tickTime;
    this.initLimit = initLimit;
    this.syncLimit = syncLimit;
    this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
    this.quorumListenOnAllIPs = quorumListenOnAllIPs;
    this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
    this.zkDb = new ZKDatabase(this.logFactory);
    if (quorumConfig == null) {
        quorumConfig = new QuorumMaj(quorumPeers);
    }
    setQuorumVerifier(quorumConfig, false);
    adminServer = AdminServerFactory.createAdminServer();
}

获取最大的事务id
public long getLastLoggedZxid() {
    if (!zkDb.isInitialized()) {
        loadDataBase();
    }
    return zkDb.getDataTreeLastProcessedZxid();
}


线程启动
@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    //加载数据库
    loadDataBase();
    //启动服务连接工厂
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    //开始选举
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}


private void loadDataBase() {
    try {
        zkDb.loadDataBase();

        // load the epochs
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            currentEpoch = epochOfZxid;
            LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, "
                                  + ZxidUtils.zxidToString(currentEpoch)
                                  + ", is older than the last zxid, "
                                  + lastProcessedZxid);
        }
        try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            acceptedEpoch = epochOfZxid;
            LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, "
                                  + ZxidUtils.zxidToString(acceptedEpoch)
                                  + " is less than the current epoch, "
                                  + ZxidUtils.zxidToString(currentEpoch));
        }
    } catch (IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}


public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            //正在寻找leader,创建选票
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    //创建选举算法
    this.electionAlg = createElectionAlgorithm(electionType);
}


protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
    return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
    return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}


从文件读取long值
private long readLongFromFile(String name) throws IOException {
    File file = new File(logFactory.getSnapDir(), name);
    BufferedReader br = new BufferedReader(new FileReader(file));
    String line = "";
    try {
        line = br.readLine();
        return Long.parseLong(line);
    } catch (NumberFormatException e) {
        throw new IOException("Found " + line + " in " + file);
    } finally {
        br.close();
    }
}

写入文件long值
private void writeLongToFile(String name, final long value) throws IOException {
    File file = new File(logFactory.getSnapDir(), name);
    new AtomicFileWritingIdiom(file, new WriterStatement() {
        @Override
        public void write(Writer bw) throws IOException {
            bw.write(Long.toString(value));
        }
    });
}


© 著作权归作者所有

writeademo
粉丝 25
博文 700
码字总数 267560
作品 0
东城
私信 提问
zk中选举Leader时的网络IO QuorumCnxManager解析

每台服务启动过程中,会启动一个QuorumCnxManager,负责各台服务器之间底层Leader选举过程中的网络通信 当集群中有服务器服务中断时,zk会重新选举leader 内部类 Message定义消息结构 包含了...

writeademo
10/14
23
0
zookeeper源码分析之集群模式服务端(下)

接上篇文章,本文主要分析一下一个ZK集群从刚启动到对外提供服务这段时间发生了什么 一、执行流程概述 首先在ZK集群中,不管是什么类型的节点,刚刚启动时都是LOOKING状态然后发起选举寻找L...

凌风郎少
2018/11/04
0
0
ZooKeeper-3.3.4集群安装配置

ZooKeeper-3.3.4集群安装配置 ZooKeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(N...

片刻
2016/01/25
103
0
zookeeper 集群安装(单点与分布式成功安装)摘录

http://www.blogjava.net/hello-yun/archive/2012/05/03/377250.html ZooKeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distr...

毛朱
2015/04/29
141
0
zookeeper 集群安装(单点与分布式成功安装)摘录

ZooKeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(G...

蓝狐乐队
2014/04/21
67
0

没有更多内容

加载失败,请刷新页面

加载更多

5 分钟快速学习,缓存一致性优化方案!

缓存操作 读缓存 读缓存可以分为两种情况命中(cache hit)和未命中(cache miss): 缓存命中 首先从缓存中获取数据 将缓存中的数据返回 缓存未命中 首先从缓存中获取数据 此时缓存未命中,...

架构文摘
7分钟前
1
0
【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版

设计 我们依然实现java.util.concurrent.locks.Lock接口。 和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。 当首次获取锁时,会创建一个临时节点,如果...

阿里云官方博客
8分钟前
1
0
inner join 和 union all 做的汇总区别

inner join CREATE OR REPLACE VIEW M_INVENT_BARCODE_DIFF ASSELECT "INV_PART_NO","INV_ONHAND","INV_LOCATION","PART_NO","BAR_ONHAND","BAR_LOCATION"FROM (SELECT m.part_no AS......

donald121
14分钟前
3
0
EMC 设计经验总结

整体布局 1、高速、中速、低速电路要分开; 2、强电流、高电压、强辐射元器件远离弱电流、低电压、敏感元器件; 3、模拟、数字、电源、保护电路要分开; 4 、多层板设计,有单独的电源和地平...

demyar
18分钟前
2
0
支付宝高级Java现场面试35题:页锁+死锁+集群+雪崩+负载等

年底是冲刺大厂的良机,这个时间点大部分人都在观望年终奖与加薪幅度,看情况再伺机而动,人才市场的竞争反而没那么激烈。 获悉到支付宝近期有HC放出,我通过内推渠道,得到了支付宝的面试机...

mikechen优知
20分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部