文档章节

Zookeeper的选举算法和脑裂问题深度讲解

Owen_Jia
 Owen_Jia
发布于 09/24 11:15
字数 2585
阅读 4921
收藏 104

ZK介绍

ZK = zookeeper

ZK是微服务解决方案中拥有服务注册发现最为核心的环境,是微服务的基石。作为服务注册发现模块,并不是只有ZK一种产品,目前得到行业认可的还有:Eureka、Consul。

这里我们只聊ZK,这个工具本身很小zip包就几兆,安装非常傻瓜,能够支持集群部署。

官网地址:https://zookeeper.apache.org/

背景

在集群环境下ZK的leader&follower的概念,已经节点异常ZK面临的问题以及如何解决。ZK本身是java语言开发,也开源到Github上但官方文档对内部介绍的很少,零散的博客很多,有些写的很不错。

提问:

ZK节点状态角色

ZK集群单节点状态(每个节点有且只有一个状态),ZK的定位一定需要一个leader节点处于lading状态。

  • looking:寻找leader状态,当前集群没有leader,进入leader选举流程。
  • following:跟随者状态,接受leading节点同步和指挥。
  • leading:领导者状态。
  • observing:观察者状态,表名当前服务器是observer。

ZAB协议(原子广播)

Zookeeper专门设计了一种名为原子广播(ZAB)的支持崩溃恢复的一致性协议。ZK实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性,所有的写操作都必须通过Leader完成,Leader写入本地日志后再复制到所有的Follower节点。一旦Leader节点无法工作,ZAB协议能够自动从Follower节点中重新选出一个合适的替代者,即新的Leader,该过程即为领导选举。

ZK集群中事务处理是leader负责,follower会转发到leader来统一处理。简单理解就是ZK的写统一leader来做,读可以follower处理,这也就是CAP理论中ZK更适合读多写少的服务。

过半选举算法

ZK投票处理策略

投票信息包含 :所选举leader的Serverid,Zxid,SelectionEpoch

  • Epoch判断,自身logicEpoch与SelectionEpoch判断:大于、小于、等于。
  • 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
  • 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。

ZK中有三种选举算法,分别是LeaderElection,FastLeaderElection,AuthLeaderElection,FastLeaderElection和AuthLeaderElection是类似的选举算法,唯一区别是后者加入了认证信息, FastLeaderElection比LeaderElection更高效,后续的版本只保留FastLeaderElection。

理解:

在集群环境下多个节点启动,ZK首先需要在多个节点中选出一个节点作为leader并处于Leading状态,这样就面临一个选举问题,同时选举规则是什么样的。“过半选举算法”:投票选举中获得票数过半的节点胜出,即状态从looking变为leading,效率更高。

官网资料描述:Clustered (Multi-Server) Setup,如下图:

As long as a majority of the ensemble are up, the service will be available. Because Zookeeper requires a majority, it is best to use an odd number of machines. For example, with four machines ZooKeeper can only handle the failure of a single machine; if two machines fail, the remaining two machines do not constitute a majority. However, with five machines ZooKeeper can handle the failure of two machines.

以5台服务器讲解思路:

  1. 服务器1启动,此时只有它一台服务器启动了,它发出去的Vote没有任何响应,所以它的选举状态一直是LOOKING状态;
  2. 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.
  3. 服务器3启动,根据前面的理论,分析有三台服务器选举了它,服务器3成为服务器1,2,3中的老大,所以它成为了这次选举的leader.
  4. 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.
  5. 服务器5启动,同4一样,当小弟.

假设5台中挂了2台(3、4),其中leader也挂掉:

leader和follower间有检查心跳,需要同步数据 Leader节点挂了,整个Zookeeper集群将暂停对外服务,进入新一轮Leader选举

1)服务器1、2、5发现与leader失联,状态转为looking,开始新的投票 2)服务器1、2、5分别开始投票并广播投票信息,自身Epoch自增; 3) 服务器1、2、5分别处理投票,判断出leader分别广播 4)根据投票处理逻辑会选出一台(2票过半) 5)各自服务器重新变更为leader、follower状态 6)重新提供服务

源码解析:

URL: FastLeaderElection

/**
 * Starts a new round of leader election. Whenever our QuorumPeer
 * changes its state to LOOKING, this method is invoked, and it
 * sends notifications to all other peers.
 */
public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }

    self.start_fle = Time.currentElapsedTime();
    try {
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();

        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = minNotificationInterval;

        synchronized (this) {
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();

        SyncedLearnerTracker voteSet;

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if (n == null) {
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout * 2;
                notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                /*
                 * Only proceed if the vote comes from a replica in the current or next
                 * voting view for a replica in the current or next voting view.
                 */
                switch (n.state) {
                case LOOKING:
                    if (getInitLastLoggedZxid() == -1) {
                        LOG.debug("Ignoring notification as our zxid is -1");
                        break;
                    }
                    if (n.zxid == -1) {
                        LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                        break;
                    }
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
                                + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding vote: from=" + n.sid
                                  + ", proposed leader=" + n.leader
                                  + ", proposed zxid=0x" + Long.toHexString(n.zxid)
                                  + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    // don't care about the version if it's in LOOKING state
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                    if (voteSet.hasAllQuorums()) {

                        // Verify if there is any change in the proposed leader
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: {}", n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if (n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            setPeerState(n.leader, voteSet);
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify that
                     * a majority are following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                    voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                        synchronized (this) {
                            logicalclock.set(n.electionEpoch);
                            setPeerState(n.leader, voteSet);
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } finally {
        try {
            if (self.jmxLeaderElectionBean != null) {
                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
    }
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
*  as current zxid, but server id is higher.
*/

return ((newEpoch > curEpoch)
	|| ((newEpoch == curEpoch)
		&& ((newZxid > curZxid)
			|| ((newZxid == curZxid)
				&& (newId > curId)))));

脑裂问题

脑裂问题出现在集群中leader死掉,follower选出了新leader而原leader又复活了的情况下,因为ZK的过半机制是允许损失一定数量的机器而扔能正常提供给服务,当leader死亡判断不一致时就会出现多个leader。

方案:

ZK的过半机制一定程度上也减少了脑裂情况的出现,起码不会出现三个leader同时。ZK中的Epoch机制(时钟)每次选举都是递增+1,当通信时需要判断epoch是否一致,小于自己的则抛弃,大于自己则重置自己,等于则选举;

// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
    logicalclock.set(n.electionEpoch);
    recvset.clear();
    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
        updateProposal(n.leader, n.zxid, n.peerEpoch);
    } else {
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }
    sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
    if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
            + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
    }
    break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
    updateProposal(n.leader, n.zxid, n.peerEpoch);
    sendNotifications();
}

归纳

在日常的ZK运维时需要注意以上场景在极端情况下出现问题,特别是脑裂的出现,可以采用:

过半选举策略下部署原则:

  1. 服务器群部署要单数,如:3、5、7、...,单数是最容易选出leader的配置量。
  2. ZK允许节点最大损失数,原则就是“保证过半选举正常”,多了就是浪费。

详细的算法逻辑是很复杂要考虑很多情况,其中有个Epoch的概念(自增长),分为:LogicEpoch和ElectionEpoch,每次投票都有判断每个投票周期是否一致等等。

在思考ZK策略时经常遇到这样的问题(上文中两块),梳理了一下思路以便于理解也作为后续回顾,特别感谢下面几篇博文的支持,感谢分享;

作者:Owen Jia

可以关注他的博客:Owen Blog

参考博文资料:

zookeeper3.3.5

理解zookeeper选举机制

Zookeeper选举算法原理

看完这篇文章你就清楚的知道 ZooKeeper的 概念了

脑裂是什么?Zookeeper是如何解决的?

zookeeper脑裂问题

© 著作权归作者所有

Owen_Jia

Owen_Jia

粉丝 28
博文 4
码字总数 6512
作品 0
浦东
架构师
私信 提问
加载中

评论(19)

lcylcy
lcylcy
你微信是多少,我加一下,私信没法发送了
bjwzds
bjwzds
按照作者的说法的话加入我有7台服务器,其中三台和四台分别失联,他们会分别选举出自己的leader?那这个算是脑裂吗?
Owen_Jia
Owen_Jia 博主
另外的3台不能产生leader
bjwzds
bjwzds
可是你在文章中这么写“1)服务器1、2、5发现与leader失联,状态转为looking,开始新的投票 2)服务器1、2、5分别开始投票并广播投票信息,自身Epoch自增; 3) 服务器1、2、5分别处理投票,判断出leader分别广播 4)根据投票处理逻辑会选出一台(2票过半) 5)各自服务器重新变更为leader、follower状态 6)重新提供服务”第四条2票就算过半了,那为什么三台服务器没法选出leader呢
Owen_Jia
Owen_Jia 博主
7台与5台过半都是4台,3台过半是2台
bjwzds
bjwzds
三台过半是两台的话,那我三台不就可以产生leader吗,只要有两台可以通过就行
BaiYang
BaiYang
你的意思是 ZooKeeper 里没有租期的概念?如果有租期的话:那么 Lease 到期前不能开始选举新 Leader,Lease 到期后老 Leader 要进行重新选取才能保持其 Leader 的地位,这样就可以保证没有你说的竞态条件出现。
BaiYang
BaiYang
补充说明下:若有租期机制,则老 Leader“复活”时只有两种可能: 1)老 Leader 在其租期内复活:那么它仍然还是唯一的 Leader(因为 Lease 未到期,因此不会开始下一次选举); 2)老 Leader 在其租期过期后复活:此时它已经自动失去的 Leader 身份,自觉开始一次新选举,并会在选举过程中发现新的 Leader,因此也不会造成双主问题。 至于“正好在租期结束那一刻恢复”这种情况是不存在的。我们实际在写分布式协调算法的时候,会故意在 Leader 和非 Leader 节点进行控制,保留安全的时间窗口。比如管理员设置租期为 3 秒,则 Leader 上的实际租期可能是 2.75s,而其它 Follower 上的实际租期则是 3.25s。因此不会有租期范围有效性判定上的交叠。
fengymi
fengymi
没明白为啥会出现脑裂
文中说: 脑裂问题出现在集群中leader死掉,follower选出了新leader而原leader又复活了的情况下
leader重新复活他会发现自己并不能拥有过半的follower,会变成follower才对
Owen_Jia
Owen_Jia 博主
复活的已经是leader了
老菜鸟0217
老菜鸟0217
我也觉得是, 以往测试的经验是“先杀掉leader, 然后再把它复活, 它就会变成follower” 如果说leader压根没死, 只是部分机器跟它失去了联系, 那整体对外可能出现两个leader.
晒太阳的小猪
晒太阳的小猪
这个玩意和etcd哪个更自由
Owen_Jia
Owen_Jia 博主
zk
小腊肠
小腊肠
很想问一下, 因为优先判断事务id谁大来选举的, 如果leader挂了, 没来得及同步写操作, 然后再选举其它leader的时候, 有没有可能造成数据丢失?
Owen_Jia
Owen_Jia 博主
最大那台id的机器挂了,是会的
小腊肠
小腊肠
如果挂掉的那台服务器, 已经写了log(因为处理顺序好像是先落盘, 然后再记录完成的Request队列), 服务器启动后不会出现这个path, 记录不一致的问题吗?
fengymi
fengymi
事务在提交之前会先prepare,也就是一定存在过半的机器已经prepare了,当主挂了之后,新的leader一定是在已经prepare的机器里面的
老唱片之桥-
老唱片之桥-
集群挂掉,重新选举leader的时候,既然是选举算法zab协议没讲,事务id这块儿没说,不过整体还是通俗易懂的
Owen_Jia
Owen_Jia 博主
谢谢!回头我补充一些
Zookeeper集群节点数量为什么要是奇数个?

无论是公司的生产环境,还是自己搭建的测试环境,Zookeeper集群的节点个数都是奇数个。至于为什么要是奇数个,以前只是模糊的知道是为了满足选举需要,并不知道详细的原因。最近重点学习zoo...

薛定谔的旺
2018/12/27
254
0
Kafka Partition Leader选主机制

Kafka Partition Leader选主机制 更多干货 分布式实战(干货) spring cloud 实战(干货) mybatis 实战(干货) spring boot 实战(干货) React 入门实战(干货) 构建中小型互联网企业架构...

tantexian
2018/11/14
344
0
Zookeeper专题——选主过程,脑裂问题如何解决

目前有5台服务器,每台服务器均没有数据,它们的编号分别是1,2,3,4,5,按编号依次启动,它们的选择举过程如下: 服务器1启动,给自己投票,然后发投票信息,由于其它机器还没有启动所以它收不...

yntmdr
2018/08/07
0
0
大数据教程(3.3):zookeeper简介

一、概念 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功...

em_aaron
2018/07/28
101
0
Hadoop HA 是什么?架构?

Hadoop HA 是什么? Hadoop HA架构详解 1.1 HDFS HA背景 HDFS集群中NameNode 存在单点故障(SPOF)。对于只有一个NameNode的集群,如果NameNode机器出现意外情况,将导致整个集群无法使用,直...

weixin_39915358
2018/05/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Kafka实战(五) - 核心API及适用场景全面解析

1 四个核心API ● Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。 ● Consumer API 允许一个应用程序订阅一个或多个topic ,并且对发布给他们的流式数据进行处...

JavaEdge
21分钟前
5
0
实现线程的第三种方式——Callable & Future

Callable Runnable 封装一个异步运行的任务, 可以把它想象成为一个没有参数和返回值的异步方 法。Callable 与 Runnable 类似, 但是有返回值。Callable 接口是一个参数化的类型, 只有一 个...

ytuan996
今天
8
0
OSChina 周六乱弹 —— 不要摁F了!

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @巴拉迪维 : 朴树写的词曲都给人一种莫名的失落感,不过这首歌他自己却没有唱,换成赵传这种高音阶嘶喊的确很好,低沉但却有力,老男人的呐喊...

小小编辑
今天
10
0
Android Binder机制 - interface_cast和asBinder讲解

研究Android底层代码时,尤其是Binder跨进程通信时,经常会发现interface_cast和asBinder,很容易被这两个函数绕晕,下面来讲解一下: interface_cast 下面根据下述ICameraClient例子进行分析...

天王盖地虎626
昨天
12
0
计算机实现原理专题--存储器的实现(二)

计算机实现原理专题--存储器的实现(一)中描述了一种可以记住输入端变化的装置。现需要对其功能进行扩充,我们将上面的开关定义为置位,下面的开关定义为复位,然后需要增加一个保持位,当保...

FAT_mt
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部