文档章节

zookeeper实现分析之二:ZAB

谈吐鱼
 谈吐鱼
发布于 2013/12/22 18:25
字数 7003
阅读 3.3K
收藏 10

<p>一年多前学习zookeeper时做的笔记,主要是翻译自“ZooKeeper's atomic broadcast protocol:Theory and practice”,并添加了自己的一些理解,整理一下作为一篇博客贴出来,后续有时间会分析一下在zookeeper源码中,zab是如何实现的,以及zab与paxos的区别。</p> <p>--------------------------------------------------------------------------</p> <h3>1 Consistency Guarantees</h3> <p>Zookeeper不能保证强一致性,客户端能看到older数据。Zookeeper提供顺序一致性。</p> <p>Zookeeper的一致性保证:</p> <p>1、顺序一致性:客户端的更新通知是严格按照顺序进行发送。</p> <p>2、原子性:更新操作要么成功要么失败,没有中间状态。</p> <p>3、Single system image:不管客户端连接哪一个服务器,客户端看到的都是the same view of service。</p> <p>4、Reliability:一旦一个更新成功,那么那就会被持久化,直到客户端用新的更新覆盖这个更新。</p> <p>5、Timeliness:Zookeeper确保客户端在一定时间内(几十秒)完成或看到系统的数据更新。</p> <p>那么zab是如何确保这些一致性相关的特点。</p> <p>Zab的两个重要的要求如下:</p> <p>1、 支持同时处理多个outstanding的客户端写操作。一个outstanding事务的含义是事务已经被提交但没有被commit。</p> <p>2、 有效的从crash状态恢复过来。</p> <p>Zookeeper能处理并发地处理多个客户端的outstanding 写请求,并且以FIFO顺序commit这些写操作。FIFO的特性对于zookeeper能够有效的从crash状态恢复过来也是至关重要的。</p> <p>原始的paxos协议不能同时处理多个outstanding transaction,paxos不要求通信时的FIFO通道特性,paxos可以容忍消息丢失和重新排序。</p> <p>在paxos中,从primary crash中恢复过来并保证事务的序列化的能力不是足够有效,而zab改进了这方面的能力,采用了一个事务ID来实现事务的totally order。</p> <p>Zookeeper的性能要求如下:</p> <p>1、 低延时(low latency)。</p> <p>2、 Good throughput。高吞吐量。</p> <p>3、 Smooth failure handling。容错。</p> <p>在这种情况下,为了能有效地更新一个new primary的应用程序状态,在zab中new primary会被期望拥有最高事务ID的进程,整个集群可以通过从new primary中拷贝事务,从而所有数据副本都可以达到一个一致性。</p> <p>而在paxos,没有采用类似zab的序列号,所以一个新的primary需要执行paxos算法的第一阶段,以便于获取到所有primary没有学习到值。</p> <h5>2 ZAB协议和流程介绍</h5> <p>Zab协议有四个阶段,如下:</p> <p>1、阶段0:Leader election</p> <p>2、阶段1:Discovery(或者epoch establish) </p> <p>3、阶段2:Synchronization(或者sync with followers)</p> <p>4、阶段3:Broadcast</p> <p>在Zab协议的实现时,合并为三个阶段:</p> <p>1、 Fast Leader Election</p> <p>2、 Recovery Phase</p> <p>3、 Broadcast Phase</p> <p>在实现中将discovery和synchronization这两个phase合并成了broadcast phase。</p> <p>ZAB的流程图如下所示:</p> <p><a href="http://static.oschina.net/uploads/img/201312/22194147_iKYR.jpg"><img title="1" style="border-right: 0px; border-top: 0px; display: inline; border-left: 0px; border-bottom: 0px" height="322" alt="1" src="http://static.oschina.net/uploads/img/201312/22194147_K2bP.jpg" width="500" border="0" /></a> </p> <p><font size="1">CEPOCH = Follower sends its last promise to the prospective leader</font></p> <p><font size="1">NEWEPOCH = Leader proposes a new epoch e'</font></p> <p><font size="1">ACK-E = Follower acknowledges the new epoch proposal</font></p> <p><font size="1">NEWLEADER = Prospective leader proposes itself as the new leader of epoch e'</font></p> <p><font size="1">ACK-LD = Follower acknowledges the new leader proposal</font></p> <p><font size="1">COMMIT-LD = Commit new leader proposal</font></p> <p><font size="1">PROPOSE = Leader proposes a new transaction</font></p> <p><font size="1">ACK = Follower acknowledges leader proosal</font></p> <p><font size="1">COMMIT = Leader commits proposal</font></p> <h3>3 Leader election</h3> <h4>3.1 leader election后置条件</h4> <p>Leader election可能有多种方式,但在这里我们只分析一种,fast leader election。</p> <p>Leader election后置条件:</p> <p>1、条件:Leader election这个过程必须保证选举出来的leader能看到所有历史的commited transactions。</p> <p>2、原因:这个后置条件是为了确保在后续recovery phase步骤中zookeeper replicas的一致性。它是防止follower中包含leader中没有的committed transaction,而且在recovery phase中只有leader向follower和observer同步,follower不会向leader同步,如果出现这种情况,那么zookeeper的replicas就出现了不一致的情况。</p> <p>所以为了达到这个后置条件,leader election需要选择出一个拥有highest lastZxid的leader。</p> <p>那么fast leader election是如何选择出一个拥有highest lastZxid的leader?</p> <h4>3.2 Fast leader election介绍</h4> <p>在进行fast leader election过程中,为了选举出一个拥有highest lastZxid的leader(能看到最新的历史committed transaction),处于election状态的peer servers会对其他peer server进行表决。Peer server会交换他们的vote(选举)的通知。同时当peer server发现一个拥有recent history的peer server(peer server拥有higher history Zxid),peer server会更新其自身的vote。当选举出一个leader后,然后进入recovery phase,fast leader election就结束了,假如vote选举出来leader就是peer server自身,那么peer server变成leading状态(fast leader election过程中,peer server本身的状态是following),其他的peer server则进入following状态。如果后续的recovery phase和broadcast phase发生任何失败的情况,那么peer server都会回到election状态,重新启动fast leader election。</p> <h4>3.3 Epoch number</h4> <p>Epoch是用于区分每一个round,每一次建立一个新的leader-follower关系,都会有一个唯一的epoch值去标识。就好像皇帝登基必须得有一个年号,与之前或之后的皇帝进行区分。</p> <p>Epoch在两个过程中用到:1、leader election时。2、recovery过程(新建立一个leader-follower关系)。</p> <p>1、过程1:每一个fast leader election开始时epoch的值都为0,epoch的值会在fast leader election过程中进行更新。</p> <p>个人理解每个zookeeper节点刚启动时没有leader-follower关系视图,那么它就会认为自己是leader,然后发起leader electoin,那么这个leader election的epoch值为0;在leader election过程中,将epoch更新到currepoch值(其他peer server中的最高的epoch)。使用epoch number来区分不同的fast leader election过程。就好像你想当皇帝,定了一个年号发起登基过程,如果当前有其他皇帝存在,且他的年号比你的年号更新,那么你就得更新年号,重新发起登基,谁支持的人多谁就是皇帝;如果没有其他皇帝存在,但有其他人也在登基,那么大家就一起比比,看谁的年号更新,看谁的资格更老(同样的epoch,vote值越大越优先),那么选举谁当皇帝。</p> <p>2、过程2:在一个faster leader election结束后,新产生的leader会获取epoch,其值为lastest history zxid的高32位,然后对epoch自增,然后用新的epoch值作为新zxid的高32,zxid的低32位为0。一旦当上皇帝后,就发布一个新的年号。</p> <p>这里有矛盾的地方:</p> <p>两个过程的epoch是否是同一个?过程1的epoch是不会持久化的。过程2中因为zxid是持久化的,那么相当于epoch也是持久化的。所以不理解。</p> <h4>3.4 选取出highest zxid的leader</h4> <p>为了能选举出highest zxid的leader,那么就需要对vote进行比较。</p> <p>对于peer server集合 PSET = {p1, p2, p3, …., pn},其中{1, 2, 3, …. , n }是peer server的ID.</p> <p>那么Pi的vote可以用pair(Zi, i)表示,Zi是Pi的highest zxid,也是lastest zxid。</p> <p>那么两个vote比较大小的准则是:</p> <p>&#160;&#160; (Zi, i) &gt;= (Zj, j) : Zi &gt; Zj 或者( Zi = Zj &amp;&amp; i &gt;= j )</p> <p>每一个peer server都有一个唯一的ID,且都知道其replicas中保存的latest zxid,那么所有的peer就会以一定顺序进行排序。</p> <h4>3.5 Fast leader election持久化</h4> <p>在fast leader election过程中,不会对任何数据进行持久化,不会把过程中产生的值写入到disk中。包括epoch number和ID但在fast leader election会使用已经持久化的latest zxid。</p> <h4>3.6 Fast leader election过程和伪码</h4> <p>进行Fast leader election的先决条件:</p> <p>1、 每个peer server都知道其他peer的ip地址,并知道peer server的总数。</p> <p>2、 每个peer server一开始都是发起一个vote,选取自己为leader。向其他所有的peer server发送vote的notification,并等待回复。</p> <p>3、 根据peer server的状态处理vote notification或则notifincation的回复.</p> <p>如果peer server处于election状态,那么peer server会收到其他peer server的vote,如果收到的vote值更大,那么peer server会更新其vote。</p> <p>如果peer server不处于election状态,那么peer server会更新其所看到的leader-follower关系。</p> <p>不管哪种情况下,当peer server检测到大部分peers持有相同的vote时,那么它会返回</p> <p><strong>Fast leader election逻辑伪代码</strong></p> <p>主要有两个逻辑分支:</p> <p>1、正常过程,vote的notification的回复的peer server的状态为election</p> <p>2、另外过程,vote的notification的回复的peer server的状态为leading/following</p> <p>执行leader election的情况较为复杂,可能是一个服务器节点新加入到zookeeper集群中。也可能是zookeeper集群刚启动,大家都处于leader election状态。以上两个逻辑分支能处理这些情况。</p> <p><font color="#800000">***初始化vote和peer server状态***</font></p> <p><font color="#008000">1 Peer P:</font></p> <p><font color="#008000">2 timeout &lt;---T0 // use some reasonable timeout value</font></p> <p><font color="#008000">3 ReceivedVotes &lt;--- 0; OutOfElection &lt;--- 0; // key-value mappings where keys are server ids</font></p> <p><font color="#008000">4 P:state &lt;--- election;&#160; P:vote &lt;---(P:lastZxid; P:id);&#160; P:round &lt;--- P:round + 1</font></p> <p><font color="#800000">1-4是初始化过程,设置超时时间,receivedVotes是收到的vote noficaton回复。</font></p> <p><font color="#800000">进入election状态,根据lastZxid和ServerID生成一个vote,vote的epoch自增。</font></p> <p><font color="#800000">ReceivedVotes作为一个结果集合,在收到所有vote后,进行表决。OutOfElection用于保存状态为leading/folling的rspvote,用于表决先存在的leader/follower是否有效。</font></p> <p><font color="#008000">5 Send notification (P:vote, P:id, P:state, P:round) to all peers</font></p> <p><font color="#800000">向所有的peer server发送notification,一个notification包括vote,id,peer state,和vote的epoch number。</font></p> <p><font color="#800000">***开始接收notification回复的循环处理***</font></p> <p><font color="#008000">6 while P:state = election do</font></p> <p><font color="#008000">7&#160;&#160;&#160;&#160; n &lt;---(null if P:queue = 0; for timeout milliseconds, otherwise pop from P:queue)</font></p> <p><font color="#008000">8&#160;&#160;&#160;&#160; if n = null then</font></p> <p><font color="#008000">9&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send notification (P:vote, P:id, P:state, P:round) to all peers</font></p> <p><font color="#008000">10&#160;&#160;&#160;&#160;&#160;&#160;&#160; timeout &lt;---(2* timeout), unless a predefined upper bound has been reached</font></p> <p><font color="#800000">8-10是当notification回复为空时,有两种情况,一种是信令发送出去回复超时,第二种是没有建立于peer server的连接.</font></p> <p><font color="#800000">如果是第一种情况,那么重新发送notification;如果是第二种情况,那么建立与peer server的tcp连接.</font></p> <p><font color="#008000">11&#160;&#160;&#160; else if n:state = election then </font><font color="#800000">//当nofication回复不为空,且peer server的状态也是election时</font></p> <p><font color="#008000">12&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if n:round &gt; P:round then</font></p> <p><font color="#008000">13&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; P:round &lt;--- n:round</font></p> <p><font color="#008000">14&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; ReceivedVotes &lt;---0</font></p> <p><font color="#008000">15&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if n:vote &gt; (P:lastZxid; P:id) then P:voteßn:vote</font></p> <p><font color="#008000">16&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else P:vote &lt;---(P:lastZxid; P:id)</font></p> <p><font color="#008000">17&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send notification (P:vote, P:id, P:state, P:round) to all peers</font></p> <p><font color="#800000">这个逻辑分支是notification回复中resvote的epoch要大于vote</font></p> <p><font color="#800000">的epoch(说明回复中的peer vote的zxid &gt; vote的zxid),那么vote失效了,需要更新vote,比较回复中的两个vote值的大小,选择值大的vote,然后重新发送notification。</font></p> <p><font color="#008000">18&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if n:round = P:round and n:vote &gt; P:vote then</font></p> <p><font color="#008000">19&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; P:vote &lt;--- n:vote</font></p> <p><font color="#008000">20&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send notification (P:vote, P:id, P:state, P:round) to all peers</font></p> <p><font color="#800000">&#160;&#160;&#160;&#160;&#160; 当回复中的rspvote的epoch等于vote的epoch,但rspvote &gt; vote,那么更新vote信息</font></p> <p><font color="#800000">&#160;&#160;&#160;&#160;&#160; 然后重新将vote向所有的peer server发送。</font></p> <p><font color="#008000">21&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if n:round &lt; P:round then goto line 6</font></p> <p>&#160;&#160;&#160;&#160; Resvote的epoch小于vote的epoch,那么这个回复是无效的,</p> <p>&#160;&#160;&#160;&#160;&#160;&#160; 直接忽略,继续下一个循环。</p> <p><font color="#008000">22&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Put(ReceivedVotes(n:id); n:vote; n:round)</font></p> <p>&#160;&#160;&#160; 将rspvote放入到ReceivedVotes中。</p> <p>23&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if&#160; ReceivedVotes = SizeEnsemble then</p> <p>24&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(P.vote.id);&#160; return P.vote</p> <p>&#160;&#160;&#160;&#160; 如果已经收到了所有peer server的vote,如果vote中的leaderID == currentPeer本身,</p> <p>&#160;&#160;&#160;&#160; 那么currPeer为leader,结束并返回此次vote结果。</p> <p>25&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if P.vote has a quorum in ReceivedVotes</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; and there are no new notifications within T0 milliseconds then</p> <p>26&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(P.vote.id);&#160; return P.vote</p> <p>&#160;&#160;&#160;&#160;&#160;&#160; 如果收到超过半数peer server的vote,那么vote中的leaderID == currentPeer本身,</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; 那么currPeer为leader,结束并返回此次vote结果.</p> <p>27&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>&#160;&#160;&#160;&#160; 逻辑分支1总结:</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; 如果rspvote中epoch &gt; vote epoch,更新epoch和vote后重新发起vote</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; 如果rspvote中epoch &lt; vote epoch,无效rspvote</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; 其他,都保存在结果集合中,如果有rspvote&gt;vote,那么将vote更新到rspvote;等待所有rspvote都收到,那么vote的值应该为结果集合中最大值,如果结果集合超过半数,那么此次vote生效,leader为vote中的serverID。如果serverID为本身的serverID,那么currpeer的状态为leader否则为follower</p> <p>28&#160;&#160;&#160; else // state of n is LEADING or FOLLOWING</p> <p>当rspvote的状态为following或leading,说明vote之外已经存在了一个leader,那么此段逻辑主要是分成两部分:一部分是vote的表决;另一部分是vote之外的leader/follower表决.</p> <p>29&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if n:round = P:round then</p> <p>30&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Put(ReceivedVotes(n.id); n:vote; n:round)</p> <p>31&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if n:state = LEADING then</p> <p>32&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(n:vote:id); return n:vote</p> <p>33&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if n:vote:id = P:id and n:vote has a quorum in ReceivedVotes then</p> <p>34&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(n:vote:id); return n:vote</p> <p>35&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if n:vote has a quorum in ReceivedVotes and the voted peer n:vote:id is in</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; state LEADING and n:vote:id 2 OutOfElection then</p> <p>36&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(n:vote:id); return n:vote</p> <p>37&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>38&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>以上部分是vote的表决,以上的逻辑跟代码中不符合,代码中的逻辑是:</p> <p>如果rspvote的epoch==vote的epoch,放入到receivedVots中,如果rspvote的状态是leader</p> <p>且集合中的rspvote超过半数,那么vote的表决的leader就是rspvote的leader。</p> <p>39&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Put(OutOfElection(n:id); n:vote; n:round)</p> <p>40&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if n:vote:id = P:id and n:vote has a quorum in OutOfElection then</p> <p>41&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; P:round &lt;--- n:round</p> <p>42&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(n:vote:id); return n:vote</p> <p>43&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if n:vote has a quorum in OutOfElection and the voted peer n:vote:id is in state</p> <p>&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; LEADING and n:vote:id 2 OutOfElection then</p> <p>44&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; P:round &lt;--- n:round</p> <p>45&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; DeduceLeader(n:vote:id); return n:vote</p> <p>46&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>以上部分是对vote之外的leader/follower进行表决,OutOfElection是用来存放状态为leader/follow的rspstate,如果OutOfElection的rspvote超过半数,那么说明election之外的leader./follow是有效地,</p> <p>47&#160; end</p> <p>&#160;&#160; 逻辑分支2总结:这部分是考虑到可能有部分peer server维持leader/follower的状态,部分peer server处于election状态,如果维持leader/follower状态的peer server数据过半,那么leader/follower就是有效地。或者vote的epoch等于leader的epoch,那么如果有半数以上的rspvote,那么当前的leader/follower也是有效的。</p> <p></p> <h3>4 Discovery and synchronization</h3> <p>在broadcast阶段,zookeeper集群必须有一个leader peer,zookeeper集群是primary/backup模式,那么leader就是primary。Discovery和synchronization这两个阶段的作用就是将全部的zookeeper节点带入到一个最终一致的状态,特别是当从crash中恢复时。这两个阶段组成了zab的recovery部分,对于允许多个独立事务的情况下,保证事务的顺序起着关键作用。</p> <p>不管在discovery、synchronization还是broadcast,一旦发生错误,那么都可以回到leader election过程。</p> <p>用户如果需要使用zookeeper服务,那么必须连接一个zookeeper节点。用户向连接的服务器提交写操作,然后zab协议层会执行一个broadcast;假如用户向follower提交写操作,那么follower会把写操作发送给leader;如果leader收到写操作,leader会执行,然后向所有follower扩散这个写操作对应的数据更新。读操作可以由与用户相连接的zookeeper节点直接完成。用户可以通过发送sync命令保证数据副本的更新。</p> <p>在zab协议中,zxid(transaction identifiers)对于实现顺序一致性十分关键。在zookeeper中事务可以用(v, z)表示,v是新状态(znode),z则是zxid,一个identifier。那么一个zxid也是一个pair(e, c),e是一个primary Pe(可以理解为leader)的epoch number,c是一个整数值,作为计数器使用。Primary每产生一个新的事务,那么计数器c就会+1。</p> <p>当一个新的epoch开始时,一个新的leader会被激活,此时c会被设置为0,e会在前一个epoch的值上+1。</p> <p>在代码实现中e是zxid的高32位,c是zxid的低32位。</p> <p>以下四个变量构成了一个peer的持久化状态:</p> <p>1、History:已经被接受的事务提案(transaction proposal)。</p> <p>2、acceptedEpoch:最近收到的NEWEPOCH信令中的epoch number。</p> <p>3、currentEpoch:最近收到的NEWLEADER信令中的epoch number。</p> <p>4、lastZxid:history中的最近的zxid。</p> <h3>5 Discovery过程</h3> <p>在这个阶段,followers会跟他们的未来预期中的leader进行通信,准leader会收集accepted follower(已经建立连接的)的latest transactions,这个阶段的目的是发现quorum peer server中的highest histroy transaction,然后建立一个新的epoch,这样就可以防止previous leader不会commit 新的proposals(因为previous leader的epoch已经过期了)。</p> <p>在discovery阶段的开始,一个follower peer会建立于准leader的leader-follower connection。</p> <p>Follower同时只是连接一个leader。假如一个peer P不是leading状态,其他peer会考虑p是一个准leader,任何其他leader-follower连接都会被p拒绝;同样leader-follower连接的拒绝或其他的failure能将follower重新带入到leader election状态。</p> <p>1 Follower F:</p> <p>2 Send the message FOLLOWERINFO(F:acceptedEpoch) to L</p> <p>3&#160;&#160; upon receiving NEWEPOCH(e0) from L do</p> <p>4&#160;&#160;&#160;&#160;&#160; if e0 &gt; F:acceptedEpoch then</p> <p>5&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; F:acceptedEpoch &lt;--- e0 // stored to non-volatile memory</p> <p>6&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send ACKEPOCH(F:currentEpoch; F:history; F:lastZxid) to L</p> <p>7&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; goto Phase 2</p> <p>8&#160;&#160;&#160;&#160;&#160; else if e0 &lt; F:acceptedEpoch then</p> <p>9&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; F:state &lt;--- election and goto Phase 0 (leader election)</p> <p>10&#160;&#160;&#160;&#160; end</p> <p>11 end</p> <p>这个过程是follower端,follower向准leader发送FOLLOWERINFO信令,告诉leader自己的信息,最重要的就是把accepted epoch发送给leader。然后接收leader的NEWLEADER信令,NEWLEADER信令中带有new epoch(这个epoch表示这这一轮过程,每一次建立leader-follower关系,都会有一个新的epoch来唯一标识,与previous leader-follower进行区分)。Follower检查这个new epoch是否有效,如果有效,follower更新自身的epoch并回复一个ACKEPOCH,上报当前follower的状态,进入下一个阶段。如果无效,那么follower会重新跳到leader electoin阶段。</p> <p>12 Leader L:</p> <p>13 upon receiving FOLLOWERINFO(e) messages from a quorum Q of connected followers do</p> <p>14&#160;&#160;&#160;&#160;&#160; Make epoch number e0 such that e0 &gt; e for all e received through FOLLOWERINFO(e)</p> <p>15&#160;&#160;&#160;&#160;&#160; Propose NEWEPOCH(e0) to all followers in Q</p> <p>16 end</p> <p>17 upon receiving ACKEPOCH from all followers in Q do</p> <p>18&#160;&#160;&#160;&#160;&#160; Find the follower f in Q such that for all f0 2 Q n ffg:</p> <p>19&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; either f0:currentEpoch &lt; f:currentEpoch</p> <p>20&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; or (f0:currentEpoch = f:currentEpoch) ^ (f0:lastZxid _z f:lastZxid)</p> <p>21&#160;&#160;&#160;&#160;&#160; L:history &lt;--- f:history&#160; // stored to non-volatile memory</p> <p>22&#160;&#160;&#160;&#160;&#160; goto Phase 2</p> <p>23 end</p> <p>这个是leader端的recovery过程,leader会生产一个new epoch,首先接收所有follower的epoch,确定new epoch要大于所有的follower epoch。然后向所有follower发送NEWEPOCH信令,将new epoch下发到所有的follower中。</p> <p>等待follower的ACKEPOCH回复,如果所有的follower的currEpoch和zxid都小于等于leader的currEpoch和zxid,那么进入下一个过程。</p> <h3>6 Synchronization过程</h3> <p>这个过程是将follower的数据副本与准leader的历史数据进行同步,使得zookeeper集群的数据处于一致的状态。同步的方向是准leader向follower同步。同步的过程如下:leader与follower进行通信,发送NEWLEADER信令,带有历史事务的highest zxid;follower收到这些信令后,决定是否更新历史事务,然后响应leader。当leader看到quorum follower的响应后,就会向它们发送commit信令。在这之后leader就建立完成了。</p> <p>1 Leader L:</p> <p>2 Send the message NEWLEADER(e0;L:history) to all followers in Q</p> <p>3 upon receiving ACKNEWLEADER messages from some quorum of followers do</p> <p>4&#160;&#160;&#160;&#160;&#160; Send a COMMIT message to all followers</p> <p>5&#160;&#160;&#160;&#160;&#160; goto Phase 3</p> <p>6 end</p> <p>这是leader端的过程,发送NEWLEADER,然后接受响应,最后发送commit,至此leader建立完毕。</p> <p>7 Follower F:</p> <p>8 upon receiving NEWLEADER(e0;H) from L do</p> <p>9&#160;&#160;&#160;&#160;&#160; if F:acceptedEpoch = e0 then</p> <p>10&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; atomically</p> <p>11&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; F:currentEpoch &lt;--- e0 // stored to non-volatile memory</p> <p>12&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; for each (v; z) in H, in order of zxids, do</p> <p>13&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Accept the proposal (e0; (v; z))</p> <p>14&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>15&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; F:history &lt;---H // stored to non-volatile memory</p> <p>16&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>17&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send an ACKNEWLEADER(e0;H) to L</p> <p>18&#160;&#160;&#160;&#160; else</p> <p>19&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; F:state &lt;--- election and goto Phase 0</p> <p>20&#160;&#160;&#160;&#160; end</p> <p>21 end</p> <p>22 upon receiving COMMIT from L do</p> <p>23&#160;&#160;&#160;&#160;&#160; for each outstanding transaction (v; z) in F:history, in order of zxids, do</p> <p>24&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Deliver (v; z)</p> <p>25&#160;&#160;&#160;&#160;&#160; end</p> <p>26&#160;&#160;&#160;&#160;&#160; goto Phase 3</p> <p>27 end</p> <p>这是follower端的流程,先是收到NEWLEADER信令,然后原子地更新epoch和历史事务,发送ACKNEWLEADER信令响应leader;然后等待commit信令,收到commit信令后进行处理,进入下一个阶段。</p> <h3>7 代码实现的Recovery phase</h3> <p>在实现discovery和synchronization时,没有严格分成两个阶段进行实现,在实现时进行了一些优化,合并成一个阶段实现,那么这个阶段就是recovery phase;recovery阶段就是将所有的zookeeper集群的数据副本进入到最终一致性地状态中,且建立出一个具有最高highest zxid的leader。</p> <p>在实现中,第0阶段的fast leader election与第一阶段discovery紧密结合在一起,faster leader election在实现时做了一个优化,它会选择出一个most up-to-date的history(个人理解就是选择出一个具有最新的commit事务的peer server),那么这样的一个leader被选举出来后,在第一阶段就不需要去与followers通信去发现latest history。</p> <p>那么既然在fast leader election中包括了discovery阶段的责任,那么这个discovery阶段就可以被忽略,所以在实现时就将discovery和synchornization阶段合并成一个recovery阶段。这个阶段是在fast leader election之后,且认为leader拥有lastest history。</p> <p>伪码:</p> <p>1 Leader L:</p> <p>2 L:lastZxid &lt;--- (L:lastZxid:epoch + 1; 0)</p> <p>3 upon receiving FOLLOWERINFO(f:lastZxid) message from a follower f do</p> <p>4&#160;&#160;&#160;&#160;&#160; Send NEWLEADER(L:lastZxid) to f</p> <p>5&#160;&#160;&#160;&#160;&#160; if f:lastZxid&#160; &lt;=&#160; L:history:lastCommittedZxid then</p> <p>6&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if f:lastZxid&#160; &lt;=&#160; L:history:oldThreshold then</p> <p>7&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send a SNAP message with a snapshot of the whole database of L</p> <p>8&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else</p> <p>9&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send a DIFF({committed transaction (v; z) in L:history : f:lastZxid &lt; z})</p> <p>10&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>11&#160;&#160;&#160;&#160; else</p> <p>12&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send a TRUNC(L:history:lastCommittedZxid) message to f</p> <p>13&#160;&#160;&#160;&#160; end</p> <p>14 end</p> <p>15 upon receiving ACKNEWLEADER messages from some quorum of followers do</p> <p>16&#160;&#160;&#160;&#160; goto Phase 3 // Algorithm 3</p> <p>17 end</p> <p>以上是leader端的流程,先生存一个新的zxid和epoch,接收follower的FOLLOWERINFO信令(包含follower的lastzxid),然后向follower发送NEWLEADER(包含leader的zxid)。然后根据FOLLOWERINFO中带有的lastzxid对follower进行更新。分成三种情况…….</p> <p>History.lastCommittedZxid是最新committed的历史事务。History.oldThreshold是太久的历史提案,比leader上一次snapshot的时间还久。见2.6.2关于TRUNC的说明。</p> <p>第一种情况是TRUNC,follower丢弃从leader.latestZxid到follower.lasterZxid之间的提案。</p> <p>第二种情况是DIFF,follower接收新的提案从follower.lasterZxid到leader.lasterZxid之间的新提案。</p> <p>第三种情况是SNAP,follower中的提案太旧,leader将snap更新到follower上。</p> <p>18 Follower F:</p> <p>19 Connect to its prospective leader L</p> <p>20 Send the message FOLLOWERINFO(F:lastZxid) to L</p> <p>21 upon L denies connection do</p> <p>22&#160;&#160;&#160;&#160; F:state &lt;--- election and goto Phase 0</p> <p>23 end</p> <p>24 upon receiving NEWLEADER(newLeaderZxid) from L do</p> <p>25&#160;&#160;&#160;&#160; if newLeaderZxid:epoch &lt; F:lastZxid:epoch then</p> <p>26&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; F:state &lt;--- election and goto Phase 0</p> <p>27&#160;&#160;&#160;&#160; end</p> <p>28&#160;&#160;&#160;&#160; upon receiving a SNAP, DIFF, or TRUNC message do</p> <p>29&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if got TRUNC(lastCommittedZxid) then</p> <p>30&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Abort all proposals from lastCommittedZxid to F:lastZxid</p> <p>31&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if got DIFF(H) then</p> <p>32&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Accept all proposals in H, in order of zxids, then commit all</p> <p>33&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; else if got SNAP then</p> <p>34&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Copy the snapshot received to the database, and commit the changes</p> <p>35&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; end</p> <p>36&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Send ACKNEWLEADER</p> <p>37&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; goto Phase 3 // Algorithm 3</p> <p>38&#160;&#160;&#160; end</p> <p>39 end</p> <p>以上是follower的流程,首先是向leader连接,然后发送FOLLOWERINFO信令,如果leader拒绝连接,那么follower重新回到leader election阶段。接收NEWLEADER信令,如果信令中带有的epoch无效(小于follower的epoch),那么follower重新回到leader election状态。</p> <p>然后接收SNAP/DIFF/TRUNC信令,同步数据副本和zxid,最后回复ACKNEWLEADER信令。进入到下一个阶段。</p> <p>这个同步的目的是让所有数据副本都进入一个最终一致性状态。为了达到这个目的,任何副本中的committed transactions必须以同样一种顺序,甚至已经被提交的transaction但没有被任何一个peer节点committ的事务必须被抛弃。SNAP和DIFF用于保证各个副本中的committed事务的顺序一致性;而TRUNC用于处理已经被提交但没有被committed的事务。</p> <h3>8 Broadcast phase</h3> <p>Zookeeper peer之间的双向通道使用TCP连接实现,TCP通信的FIFO序列化特性对于实现broadcast协议至关重要。</p> <p>假如没有发生崩溃,那么peers会一直停留在broadcast阶段。第三阶段中只能有一个leader。</p> <p>Broadcast的过程是leader与follower之间的一个两阶段的提交过程(two-phase commit)</p> <p>1、 leader与follower的通讯通道(communication channel)是一个FIFO,所有都是是按顺序处理。</p> <p>2、 leader收到一个request后,会生成一个propose。然后执行两阶段提交.</p> <p><a href="http://static.oschina.net/uploads/img/201312/22185922_040N.png"><img title="wps_clip_image-14769" style="border-top-width: 0px; display: inline; border-left-width: 0px; border-bottom-width: 0px; border-right-width: 0px" height="126" alt="wps_clip_image-14769" src="http://static.oschina.net/uploads/img/201312/22185922_KHTe.png" width="244" border="0" /></a></p> <p>Broadcast的伪码和流程</p> <p>1 Leader L:</p> <p>2 upon receiving a write request v do</p> <p>3&#160;&#160;&#160;&#160; Propose (e0; (v; z)) to all followers in Q, where z = (e0; c), such that z succeeds all zxid</p> <p>&#160;&#160;&#160;&#160;&#160;&#160; values previously broadcast in e0 (c is the previous zxid's counter plus an increment of one)</p> <p>4 end</p> <p>5 upon receiving ACK((e0; (v; z))) from a quorum of followers do</p> <p>6&#160;&#160;&#160;&#160; Send COMMIT(e0; (v; z)) to all followers</p> <p>7 end</p> <p>以上是leader处理的两阶段提交的流程:首先leader受到写请求v,然后生成一个提案(e,(v,z)),向所有follower发送此提案的内容,然后等待follower的ack;如果ack超过半数,那么提案成立。向所有follower下发commit提案的命令。</p> <p>8 // Reaction to an incoming new follower:</p> <p>9 upon receiving FOLLOWERINFO(e) from some follower f do</p> <p>10&#160;&#160;&#160;&#160; Send NEWEPOCH(e0) to f</p> <p>11&#160;&#160;&#160;&#160; Send NEWLEADER(e0;L:history) to f</p> <p>12 end</p> <p>13 upon receiving ACKNEWLEADER from follower f do</p> <p>14&#160;&#160;&#160;&#160; Send a COMMIT message to f</p> <p>15&#160;&#160;&#160;&#160; Q &lt;--- Q 并集 {f}</p> <p>16 end</p> <p>以上是一个新follower加入leader的流程:首先leader收到FOLLOWERINFO信令,然后向new follower发送NEWEPOCH信令,再发送NEWLEADER信令给new follower;等待new follower的ACKNEWLEADER,最后发送commit,至此new follower就加入到了集群中。</p> <p>17 Follower F:</p> <p>18 if F is leading then Invokes ready(e0)</p> <p>19 upon receiving proposal (e0; (v; z)) from L do</p> <p>20&#160;&#160;&#160;&#160; Append proposal (e0; (v; z)) to F:history</p> <p>21&#160;&#160;&#160;&#160; Send ACK((e0; (v; z))) to L</p> <p>22 end</p> <p>23 upon receiving COMMIT(e0; (v; z)) from L do</p> <p>24&#160;&#160;&#160;&#160; while there is some outstanding transaction (v0; z0) in F:history such that z0 &lt; z do</p> <p>25&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; Do nothing (wait)</p> <p>26&#160;&#160;&#160;&#160; end</p> <p>27&#160;&#160;&#160;&#160; Commit (deliver) transaction (v; z)</p> <p>28 end</p> <p>这是follower的broadcast流程:接收到leader的提案,然后将提案写入到history中,然后发送响应。等待leader的commit信令,收到后执行commit 提案。</p> <h3>9 Zab所存在的问题</h3> <h4>9.1 acceptedEpoch和currentEpoch的作用</h4> <p>在recovery开始阶段,准leader甚至在与大部分follower成功建立连接之前就增加其epoch(包括在lastZxid内)值。因为在recovery阶段,follower在发现其epoch值要比准leader大时,会返回到leader election阶段。那么当准leader失去leader地位,并成为previous leader(其epoch比准leader要小1)的一个follower,那么准leader会发现previous leader的epoch值比其要小,那么它会返回到leader election阶段。这个现象会导致此peer一直在recovery阶段和leader election阶段之间循环。</p> <p>所以使用lastZxid来存储epoch number,没有对一个tried epoch(个人理解是一个准leader在尝试成为leader时使用的epoch)和一个joined epoch(一个成功的leader所使用的epoch)进行区分。使用acceptedEpoch和currentEpoch的目的就是在于防止此类问题的发生。</p> <h4>9.2 Abandon follower proposal</h4> <p>假设一个集合{p1, p2, p3},所有的peers都处于broadcast阶段,且都已经同步到了最新的committed事务,事务的ID是(e= 1, c= 3),p1为leader;一个新的提案,事务ID为(1, 4)已经被leader p1发出,但在p2和p3收到事务之前,p1就已经发生了崩溃(比如已经放到socket缓存区中),那么{p2, p3}会重新回到leader election,并选举出一个新的leader。当p1恢复正常了,此时p2已经成为了leader;那么根据fast leader election,在recovery阶段p2会将epoch设置为2(p2.latestZxid = (2, 0)),那么在broadcast阶段,已经新的提案已经被quorum接收和commit,它的zxid为(2, 1)。在这个时候leader p2的history.lastCommittedZxid = (2, 1),并且p2的history.OlderThreshold = (1, 1);那么p1重新启动后,p1会执行fast leader election,然后发现其他peer已经建立leader-follower关系,且p2是leader,那么p1会向发送FOLLOWERINFO(p1.latestZxid = (1, 4))。</p> <p>在这种情况下,</p> <p>p1.lastestZxid(1,4) &lt; p2.history.lastCommittedZxid(2, 1) </p> <p>&amp;&amp; p2.history.oldThreshold(1, 1)&lt; p1.lastestZxid (1, 4),那么这种情况下leader p2需要向p1发送TRUNC信令,让follower放弃uncommitted proposal(1, 4)。</p> <p></p> <p></p> <p></p> <p>作者zy,QQ105789990</p>

© 著作权归作者所有

谈吐鱼
粉丝 37
博文 13
码字总数 41855
作品 0
杭州
程序员
私信 提问
加载中

评论(0)

这可能是把ZooKeeper概念讲的最清楚的一篇文章

我本人曾经使用过 ZooKeeper 作为 Dubbo 的注册中心,另外在搭建 Solr 集群的时候,我使用到了 ZooKeeper 作为 Solr 集群的管理工具。 前几天,总结项目经验的时候,我突然问自己 ZooKeeper ...

51CTO技术栈
2018/09/11
0
0
这应该是全网对 ZooKeeper 概念讲的最清楚的一篇文章了

我本人曾经使用过 ZooKeeper 作为 Dubbo 的注册中心,另外在搭建 Solr 集群的时候,我使用到了 ZooKeeper 作为 Solr 集群的管理工具。 前几天,总结项目经验的时候,我突然问自己 ZooKeeper ...

Java干货分享
2018/09/16
150
1
ZAB协议 对分布式一致性的保证

在前面, 我讲了Raft协议在分布式环境下的作用,这篇讲解ZK的ZAB协议算法。 谈到ZAB协议,不得不提的是Paxos算法,可是Paxos就如所有的书和博客里面说的那样,的确让人很难理解,我尝试看了很...

coderlong
04/01
0
0
Zookeeper的一致性协议:Zab

Zookeeper使用了一种称为Zab(Zookeeper Atomic Broadcast)的协议作为其一致性复制的核心,据其作者说这是一种新发算法,其特点是充分考虑了Yahoo的具体情况:高吞吐量、低延迟、健壮、简单...

小报童
2013/01/06
548
0
ZooKeeper学习笔记三 ZooKeeper与Paxos

本文学习内容来自: 《从Paxos到ZooKeeper分布式一致性原理与实践》 电子工业出版社 Apache ZooKeeper是由Apache Hadoop的子项目发展而来,于2010年11月正式成为了Apache的顶级项目。ZooKeep...

xundh
2018/04/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

java关键字 —— new、this、static

  java关键字,也叫保留字(50个),是java有特殊意义的标识符,不能用作参数名、变量名、方法名、类名、包名等。   现在我们讲讲其中三个关键字的使用吧~~~ 一、new关键字 1. 用途:新建...

osc_s2b5kacl
3分钟前
5
0
java 集合框架的工具类Collections

sort(),max(),binarySearch(),fill() public class CollectionsDemo { public static void main(String[] args) { replaceAllDemo(); } public static void replaceAll......

osc_r9yyhhqz
4分钟前
3
0
创龙基于Xilinx Kintex-7系列高性价比FPGA开发板散热风扇接口、SATA接口

处理器 Xilinx Kintex-7系列FPGA处理器,芯片型号为XC7K325T-2FFG676I,兼容XC7K160T/410T-2FFG676I,高达326K逻辑单元,840个DSP Slice,硬件如下图: 散热风扇接口 开发板引出1个散热风扇接...

Tronlong创龙
4分钟前
15
0
【经验分享】学习Java的好书有哪些?Java书籍清单

Java书籍是程序员学习提升技能的重要学习渠道,通过书籍Java程序员可以学习当前流行、重要的相关技能。经典的书经受时间的考验,随着岁月的流逝变得越来越重要,让我们不断的学习和进步。 为...

osc_b1kaj6np
5分钟前
12
0
java Collections的reverseOrder(),SynList()

Collections的reverseOrder(比较器)返回相反的比较器,可以逆转比较器。 SynList()可以让非同步变成同步,底层实现synchronized(){}。 swap交换元素位置。 Collections.shuffle()随机重新排序...

osc_2gkfj43j
6分钟前
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部