文档章节

ZooKeeper系列之(十):投票选举(2)

守望者之父
 守望者之父
发布于 09/09 13:13
字数 2152
阅读 12
收藏 0

ZooKeeper的选举过程默认使用FastLeaderElection类,FastLeaderElection启动时启动Messenger收发选举信息。选举完成后选出1个Leader和若干Follower。

首先理解几个概念:   

Epoch:投票周期,用于区分每一个round,每一次建立一个新的leader-follower关系,都会有一个唯一的epoch值去标识。就好像皇帝登基必须得有一个年号,与之前或之后的皇帝进行区分。刚启动是从文件读取保存的currentEpoch和acceptedEpoch值。默认是-1。

zxid:事务ID,表示Zookeeper当前写操作的序列号,确保写操作的按顺序执行。有一种场景是当Follower的最大zxid大于Leader的zxid时,Leader会发送TRUNC包给Follower截断多余的事务,保证和Leader数据一致。zxid会在ZxDataBase的loadDataBase方法中初始化。每次执行一次写事务zxid就会加1,默认为0。

选举规则:先判断双方的Epoch,留取大的Epoch;再看双方的zxid,留取大的zxid一方。最后的结果就是(Epoch<32|zxid)的一方会被推举为Leader。

集群间选举使用Messenger来负责选举信息的交互,Messenger底层使用QuorumCnxManager管理本机和集群其他机器之间的Socket数据传递,通信流程示意图如下所示。

集群中所有LOOKING状态的机器同时启动选举过程。选举结束则产生一个Leader和多个Follower。然后就是各Follower连接到Leader进行事务初始化,实现数据同步。

Observer不参与选举,但会接收Leader的信息同步要求。

QuorumPeer刚启动时会设置初始状态为LOOKING,然后启动FastLeaderElection选举过程,首先将自己作为Vote群发给其他QuorumPeer,群发的数据结构为Notification。同时启动Messenage的收发线程,调用lookForLeader方法发起一轮选举Leader的过程。

在Leader/Follower状态,如果接收到远方LOOKING节点寻找Leader而发送的Notification包,则Messenger会自动回复Leader信息给对方;在LOOKING状态,则Messenger会回复当前Vote节点(临时Leader)信息给对方。

默认选举规则规定当超过半数的QuorumPeer都认同同一个Leader时,选举过程结束,各QuorumPeer将自己设置为Leader、Follower或Observer。当然有兴趣的读者也可以尝试创建自己的选举规则类,只要实现QuorumVerifier即可。

进行Fast leader election的先决条件:

1、 每个QuorumPeer都知道其他QuorumPeer的ip地址,并知道QuorumPeer的总数。

2、 每个QuorumPeer一开始都是发起一个vote,选取自己为leader。向其他所有的QuorumPeer发送vote的notification,并等待回复。

3、 根据QuorumPeer的状态处理vote notification消息。

选举过程中产生的临时Leader成为Vote,当选举结束后最后一轮产生的Vote即成为新的Leader。

QuorumPeer每次发起选举都调用lookForLeader方法实现,首先将自己设置为LOOKING状态。该方法是FastLeaderElection类的主方法,具体的流程如下:

  • 首先更新选举周期logicalclock, 并把自己作为leader作为投票发给所有其他的server。
  • 然后进入本轮投票的循环,直到自己不再是LOOKING状态。
  1. 从recvqueue获取一个网络包(recvqueue的数据来自Messenger),如果没有收到包则检查是否要重连和重发自己的投票。
  2. 收到投票后判断对方投票的状态。
  1. LOOKING:
    • 如果对方投票的周期(Epoch)大于自己的周期(Epoch),那就清空自己的已经收到的投票集合recvset,并将自己作为候选和对方投票的leader做比较,选出大的作为新的投票,然后再发送给所有人。 这里比较大小是通过比较(zxid,sid)这个二元组来的,zxid大的就大,否则sid大的就大。
    • 如果对方的投票周期小于自己,则忽略对方的投票。
    • 如果周期相等,则比较对方的投票和自己认为的候选,选出大的作为新的候选,然后再发送给所有人。
    • 然后判断当前收到的投票是否可以得出谁是leader的结论,这里主要是通过判断当前的候选leader在收到的投票中是否占了多数。
    • 如果候选leader在收到的投票中占了多数,则再等待finalizeWait时钟,看是否有人修改leader的候选,如果修改了则把投票放到recvqueue中再重新循环。
  2. OBSERVING:如果对方是一个观察者,由于它没有投票权,则无视它
  3. FOLLOWING或LEADING:
  • 如果对方和自己再一个时钟周期,说明对方已经完成选举,如果对方说它是leader,那我们就把它作为leader,否则就要比较下对方选举的leader在自己这里是否占有多数,并且选举的leader确认了愿意当leader,如果都通过了,就把这个候选作为自己的leader
  • 如果对方和自己不在一个时钟周期,说明自己挂掉后又恢复起来,这个时候把别人的投票收集到一个单独的集合outofelection(从名字可以看出这个集合不是用在选举判断),如果对方的投票在outofelection中占有大多数,并且leader也确认了自己愿意做leader,这个时候更新自己的选举周期logicalclock,并修改自己的状态为FOLLOWING或LEADING

 

lookForLeader代码较长,我们先看看它的主体结构。

public Vote lookForLeader() throws InterruptedException {
     try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
       int notTimeout = finalizeWait;
        synchronized(this){
             logicalclock.incrementAndGet();
             updateProposal(getInitId(), getInitLastLoggedZxid(), 
getPeerEpoch());
       }
        sendNotifications();
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){           
            Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
            if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
               switch (n.state) {
                  case LOOKING:
                       {A代码}
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:                    
                        {B代码}
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: " + n.state
                              + " (n.state), " + n.sid + " (n.sid)");
                        break;
                    }
                } 
            }
            return null;
        }   
 }

具体分析如下:

首先通过sendNotification方法告诉集群我在寻找Leader,集群中其他机器会在Messenger进程中接收到Notification,然后都会回复谁应该是当前Leader。

FastLeaderElection收集到足够的Notification消息,来判断到底谁才是合法的Leader。为此它对每条Notification消息进行下列判断:

A. 回复Notification的发送方也是LOOKING状态

如果回复Notification的发送方也是LOOKING状态,说明它还不知道最终的Leader是谁,这时候FastLeaderElection和发送方比较,看看谁的潜在Leader的Epoch和zxid最大,最大的设置为当前的候选Leader,然后将这个候选Leader广播出去,让发送方也能更改自己的后续Leader。同时判断自己的收集的Notification能否达到超过半数的条件从而决定最终的Leader,如果能决定则设置最终Leader,退出选举过程。

case LOOKING:   
   // If notification > current, replace and send messages out
   if (n.electionEpoch > logicalclock.get()) {
       logicalclock.set(n.electionEpoch);
       recvset.clear();
       if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                       getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
       } else {
         updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());
       }
       sendNotifications();
   } else if (n.electionEpoch < logicalclock.get()) {
       break;
   } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {
       updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications();
   }
   recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, 
n.peerEpoch));
   if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                            logicalclock.get(), proposedEpoch))) {
      while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
         if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){
             recvqueue.put(n);
             break;
         }
      }
      if (n == null) {
           self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
           Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
           leaveInstance(endVote);
        return endVote;
      }
  }
  break;

totalOrderPredicate方法用于判断对方发送过来的Vote是不是更新的Leader候选者,如果是的话则更新本地proposedLeader。主要代码如下:

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&  ((newZxid > curZxid) ||
 ((newZxid == curZxid) && (newId > curId)))));
}

termPredicate用于判断是否满足选出Leader条件,QuorumVerifier接口的实现,如果满足则设置Leader,并退出FastLeaderElection过程。主要代码如下:

private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }
        return voteSet.hasAllQuorums();
}

 

B. 回复Notification的发送方也是LeaderFollower

如果回复Notification的发送方是Leader或者Follower,则流程比较简单,将Notification消息保存到recvset中,并调用termPredicate方法判断是否能确定Leader并结束选举过程。

代码片段:

case FOLLOWING:
case LEADING:
     if(n.electionEpoch == logicalclock.get()){
         recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, 
n.peerEpoch));
         if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch,
n.peerEpoch, n.state))  && checkLeader(outofelection, n.leader, 
n.electionEpoch)) {
            self.setPeerState((n.leader == self.getId()) ?
                                  ServerState.LEADING: learningState());
            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
         }
    }

 

© 著作权归作者所有

守望者之父
粉丝 12
博文 131
码字总数 125387
作品 0
南京
私信 提问
Hadoop(6)--zookeeper

Zookeeper 每一个专业的技术总可以在生活中找到相应的实例,就比如说zookeeper,攘其外必先安其内就很好的解释了zookeeper,Hadoop集群的组件中的很多在学习的时候都会觉得每一个都不稳定,都...

spark009
2018/08/15
0
0
分布式锁服务ZooKeeper

ZooKeeper的安装和配置: 下载ZooKeeper 解压:tar -xzvf zookeeper-3.4.3.tar.gz 在conf目录下创建一个配置文件zoo.cfg,tickTime=2000 dataDir=/opt/zookeeper/data dataLogDir=/opt/zooke......

Stefan555
2014/01/24
157
1
Dubbo负载均衡、容错、高可用

原文链接:https://juejin.im/post/5d391104f265da1bab29f23e

JAVA高级架构v
07/28
0
0
Java之品优购部署_day01(3)

2.2 搭建 Zookeeper 集群 2.2.1 搭建要求 真实的集群是需要部署在不同的服务器上的,但是在我们测试时同时启动十几个虚拟机 内存会吃不消,所以我们通常会搭建伪集群,也就是把所有的服务都搭...

我是小谷粒
2018/07/06
0
0
ZooKeeper教程资源收集(简介/原理/示例/解决方案)

菩提树下的杨过: ZooKeeper 笔记(1) 安装部署及hello world ZooKeeper 笔记(2) 监听数据变化 ZooKeeper 笔记(3) 实战应用之【统一配置管理】 ZooKeeper 笔记(4) 实战应用之【消除单点故障】...

easonjim
2017/09/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Kotlin基础语法学习

安装好安卓studio,以及插件支持Kotlin 就可以在创建项目的时候选择 Kotlin语言了。 https://www.jianshu.com/p/4ab13691d681 参考手册: https://www.runoob.com/kotlin/otlin-android-setu...

T型人才追梦者
21分钟前
4
0
java实现简单计算器

1.概述 之前作者写过一篇文章,也是关于计算器的,用的是C++与Qt,链接在这里 这次用java的swing写的(这差距好像有点大,好吧是qt太强了). 先上图: 2.UI 总体布局使用流布局. (1)文本框 文本框就...

Blueeeeeee
22分钟前
4
0
x004-python中循环结构

循环结构 Python中构造循环结构有两种做法,一种是for-in循环,一种是while循环 for in range(101)可以产生一个0到100的整数序列range(1, 100)可以产生一个1到99的整数序列range(1, 100, ...

伟大源于勇敢的开始
27分钟前
5
0
纯CSS实现DIV悬浮(固定位置)

纯CSS实现的DIV悬浮效果(固定位置),兼容常用的浏览器:IE8、360、FireFox、Chrome、Safari、Opera、傲游、搜狗、世界之窗等。效果如下: 实现代码: <!DOCTYPE html> <html> <head> <meta ...

独钓渔
今天
5
0
OSChina 周二乱弹 —— 给我来个女菩萨

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @这次装个文艺青年吧 :#今日歌曲推荐#分享XXXTENTACION/Travis Barker的单曲《Pain = BESTFRIEND》: 《Pain = BESTFRIEND》- XXXTENTACION/...

小小编辑
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部