[Curator] Double Barrier 的使用与分析
[Curator] Double Barrier 的使用与分析
秋雨霏霏 发表于6个月前
[Curator] Double Barrier 的使用与分析
  • 发表于 6个月前
  • 阅读 54
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: Zookeeper Curator : Double Barrier 栅栏

Double Barrier

双重栅栏能够让客户端在任务的开始和结束阶段更好的同步控制。 当有足够的任务已经进入到栅栏后,一起开始,一旦任务完成则离开栅栏。

1. 关键 API

org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier

2. 机制说明

对于普通栅栏,如果要控制多个任务的开始,结束。需要自己实现控制逻辑。 而双重栅栏,相当于有了开始,结束的边界,在使用时,可以更好的控制。

双重栅栏可以控制着进入栅栏的任务数量,如果数量达到要求,则认为任务可以开始执行。这样就不需要手工控制任务的准备阶段。

结束时,也可以阻塞等待所有任务的退出。

3. 用法

3.1 创建

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)

要注意memberQty就是参与栅栏的任务数量。

3.2 使用

  1. 进入
public void enter();
  1. 离开
public void leave();

使用起来很简单

4. 错误处理

DistributedDoubleBarrier实例会监听链接状态。如果链接丢失则会在enter()leave()方法上抛出异常

5. 源码分析

5.1 类定义

public class DistributedDoubleBarrier {}

DistributedBarrier一样,没有父类和接口

5.2 成员变量

public class DistributedDoubleBarrier
{
    private final CuratorFramework client;
    private final String barrierPath;
    private final int memberQty;
    private final String ourPath;
    private final String readyPath;
    private final AtomicBoolean hasBeenNotified = new AtomicBoolean(false);
    private final AtomicBoolean connectionLost = new AtomicBoolean(false);
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
            notifyFromWatcher();
        }
    };

    private static final String     READY_NODE = "ready";
}

这可就比DistributedBarrier复杂多了。

  • client
  • barrierPath
  • memberQty
    • 参与的任务数量
  • ourPath
    • 当前对应的zk节点path
  • readyPath
    • 准备状态对应的zk节点path
  • hasBeenNotified
    • 是否已通知参与的任务
    • AtomicBoolean原子化包装
  • connectionLost
    • 链接是否已丢失
    • AtomicBoolean原子化包装
  • watcher
    • 链接状态监听器
  • READY_NODE
    • 私用常量
    • 任务节点已准备完成对应的path

5.3 构造器

public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
{
    Preconditions.checkState(memberQty > 0, "memberQty cannot be 0");

    this.client = client;
    this.barrierPath = PathUtils.validatePath(barrierPath);
    this.memberQty = memberQty;
    ourPath = ZKPaths.makePath(barrierPath, UUID.randomUUID().toString());
    readyPath = ZKPaths.makePath(barrierPath, READY_NODE);
}
  1. ourPath在barrierPath后面接一个UUID
  2. readyPath在barrierPath后面接了一个ready

从构造器的处理,大致能够推断出:

  1. 所有的参与的节点在barrierPath下,以UUID为名注册自己
  2. 准备完成后以barrierPath/ready为信号,开始执行

5.4 进入

public void enter() throws Exception
{
    enter(-1, null);
}

public boolean     enter(long maxWait, TimeUnit unit) throws Exception
{
    long            startMs = System.currentTimeMillis();
    boolean         hasMaxWait = (unit != null);
    long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

    boolean         readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);
    client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);

    boolean         result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));
    if ( connectionLost.get() )
    {
        throw new KeeperException.ConnectionLossException();
    }

    return result;
}
  1. 创捷一个临时节点ourPath
    • 注册自己
  2. 判断准备标识节点readyPath)以及internalEnter方法
    • 准备标识节点的存在,表示栅栏已经开始执行任务了,新的任务可以随时进入
      • 判断短路,就不用执行internalEnter处理了
    • 如果true,表示当前任务节点已经进入栅栏
    • 如果false,表示进入失败
private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
    boolean result = true;
    do
    {
        List<String>    children = getChildrenForEntering();
        int             count = (children != null) ? children.size() : 0;
        if ( count >= memberQty )
        {
            try
            {
                client.create().forPath(readyPath);
            }
            catch ( KeeperException.NodeExistsException ignore )
            {
                // ignore
            }
            break;
        }

        if ( hasMaxWait && !hasBeenNotified.get() )
        {
            long        elapsed = System.currentTimeMillis() - startMs;
            long        thisWaitMs = maxWaitMs - elapsed;
            if ( thisWaitMs <= 0 )
            {
                result = false;
            }
            else
            {
                wait(thisWaitMs);
            }

            if ( !hasBeenNotified.get() )
            {
                result = false;
            }
        }
        else
        {
            wait();
        }
    } while ( false );

    return result;
}
  1. 获得barrierPath下的子节点
    • 获得参与者列表
  2. 如果参与者的数量已经大于预设值memberQty,则认为可以开始执行任务了
  3. 如果还不够数,则进行等待
  • do...while(false)循环
    • 可以利用break达到类似的goto的效果

5.5 退出

public synchronized void     leave() throws Exception
{
    leave(-1, null);
}

public synchronized boolean     leave(long maxWait, TimeUnit unit) throws Exception
{
    long            startMs = System.currentTimeMillis();
    boolean         hasMaxWait = (unit != null);
    long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

    return internalLeave(startMs, hasMaxWait, maxWaitMs);
}

逻辑都在internalLeave方法中:

private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
    String          ourPathName = ZKPaths.getNodeFromPath(ourPath);
    boolean         ourNodeShouldExist = true;
    boolean         result = true;
    for(;;)
    {
        if ( connectionLost.get() )
        {
            throw new KeeperException.ConnectionLossException();
        }

        List<String> children;
        try
        {
            children = client.getChildren().forPath(barrierPath);
        }
        catch ( KeeperException.NoNodeException dummy )
        {
            children = Lists.newArrayList();
        }
        children = filterAndSortChildren(children);
        if ( (children == null) || (children.size() == 0) )
        {
            break;
        }

        int                 ourIndex = children.indexOf(ourPathName);
        if ( (ourIndex < 0) && ourNodeShouldExist )
        {
            if ( connectionLost.get() )
            {
                break;  // connection was lost but we've reconnected. However, our ephemeral node is gone
            }
            else
            {
                throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName));
            }
        }

        if ( children.size() == 1 )
        {
            if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) )
            {
                throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName));
            }
            checkDeleteOurPath(ourNodeShouldExist);
            break;
        }

        Stat            stat;
        boolean         IsLowestNode = (ourIndex == 0);
        if ( IsLowestNode )
        {
            String  highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1));
            stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath);
        }
        else
        {
            String  lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0));
            stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath);

            checkDeleteOurPath(ourNodeShouldExist);
            ourNodeShouldExist = false;
        }

        if ( stat != null )
        {
            if ( hasMaxWait )
            {
                long        elapsed = System.currentTimeMillis() - startMs;
                long        thisWaitMs = maxWaitMs - elapsed;
                if ( thisWaitMs <= 0 )
                {
                    result = false;
                }
                else
                {
                    wait(thisWaitMs);
                }
            }
            else
            {
                wait();
            }
        }
    }

    try
    {
        client.delete().forPath(readyPath);
    }
    catch ( KeeperException.NoNodeException ignore )
    {
        // ignore
    }

    return result;
}

整体上就是一个for(;;)不断重试的过程

  1. 检查链接状态
  2. 获取参与者列表(也可能带有准备标识节点readyPath))
  3. 如果没有参与者了,则退出循环,返回true
    • 说明参与者都已退出
  4. 再从列表中获得当前节点的顺位
  5. 如果自己已经不再列别中了
    • 检查链接,如果链接丢失,则也认为退出成功
  6. 如果只有自己在列表中
    • 则删除自己节点,退出成功
    • 说明其他参与者都已退出
  7. 如果有多个参与者,且自己处在第一顺位
    • 则检查最后顺位的状态
  8. 如果不是第一顺位
    • 则检查第一顺位的状态
    • 并删除自身节点
  9. 最后等待参与者逐个退出后,删除readyPath节点

6. 测试

使用DistributedDoubleBarrier来改写DistributedBarrier中赛马游戏

package com.roc.curator.demo.barriers

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.barriers.DistributedBarrier
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

/**
 * Created by roc on 2017/6/2.
 */
class DoubleBarriersTest {

    val PATH: String = "/test/barrier/double/horse/"

    var client: CuratorFramework = CuratorFrameworkFactory.builder()
            .connectString("0.0.0.0:8888")
            .connectionTimeoutMs(5000)
            .retryPolicy(ExponentialBackoffRetry(1000, 10))
            .sessionTimeoutMs(3000)
            .build()

    @Before fun init() {
        client.start()
    }

    @Test fun runTest() {
        val name: String = "HORSE-"
        val count: AtomicInteger = AtomicInteger()

        val threadFactory: ThreadFactory = ThreadFactory { r ->
            val t: Thread = Thread(r, name + count.incrementAndGet())
            t
        }

        var champion: AtomicReference<String> = AtomicReference()
        champion.set("")

        val member: Int = 5

        val executorService: ExecutorService = Executors.newFixedThreadPool(member, threadFactory)

        val isOver: AtomicBoolean = AtomicBoolean(false)
        var i: Int = 0
        val target: Int = 100
        while (i < 5) {
            i++
            executorService.execute(Runnable {
                var round: Int = 0
                var sumSetp: Int = 0
                run {
                    var doubleBarrier: DistributedDoubleBarrier = DistributedDoubleBarrier(client, PATH + round, member)
                    //设置栅栏
                    println("${Thread.currentThread()} 准备完毕 ${Date()}")
                    //等待开赛
                    doubleBarrier.enter()

                    //需要对线程是否中断进行判断
                    try {
                        while (!isOver.get() && sumSetp < target) {
                            //新一轮
                            ++round
                            var roundBarrier: DistributedDoubleBarrier = DistributedDoubleBarrier(client, PATH + round, member)
                            //进入等待区
                            roundBarrier.enter()

                            var setp: Int = (Math.random() * 25).toInt()
                            setp = maxOf(setp, 1)
                            setp = minOf(setp, target - sumSetp)
                            sumSetp += setp
                            println("${Thread.currentThread()} 第 $round 轮,跑了 $setp 步,合计 $sumSetp / $target ")

                            //如果已经到达终点,自行标记冠军
                            //示例而已,如果要严谨一点的话,这个应该交给裁判来判断
                            if (sumSetp >= target) {
                                //使用CAS操作,避免同一轮两个完赛马,出现并发问题
                                if (champion.compareAndSet("", Thread.currentThread().name)) {
                                    println(" 撞线成功!!!")
                                } else {
                                    println(" 撞线失败~~~~")
                                }
                            }

                            //等待下一轮
                            roundBarrier.leave()
                        }
                    } finally {
                        println("${Thread.currentThread()} 结束 ")
                        //比赛结束
                        doubleBarrier.leave()
                    }
                }
            })
        }

        while (champion.get().isBlank()) {
            TimeUnit.MILLISECONDS.sleep(500)
        }
        println("裁判: 冠军是 $champion")
        isOver.set(true)

        TimeUnit.MILLISECONDS.sleep(5000)
    }
}

输出:

Thread[HORSE-4,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-3,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-2,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-1,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-5,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-4,5,main] 第 1 轮,跑了 20 步,合计 20 / 100 
Thread[HORSE-5,5,main] 第 1 轮,跑了 15 步,合计 15 / 100 
Thread[HORSE-3,5,main] 第 1 轮,跑了 15 步,合计 15 / 100 
Thread[HORSE-2,5,main] 第 1 轮,跑了 8 步,合计 8 / 100 
Thread[HORSE-1,5,main] 第 1 轮,跑了 4 步,合计 4 / 100 
Thread[HORSE-2,5,main] 第 2 轮,跑了 10 步,合计 18 / 100 
Thread[HORSE-1,5,main] 第 2 轮,跑了 18 步,合计 22 / 100 
Thread[HORSE-5,5,main] 第 2 轮,跑了 21 步,合计 36 / 100 
Thread[HORSE-4,5,main] 第 2 轮,跑了 12 步,合计 32 / 100 
Thread[HORSE-3,5,main] 第 2 轮,跑了 3 步,合计 18 / 100 
Thread[HORSE-3,5,main] 第 3 轮,跑了 15 步,合计 33 / 100 
Thread[HORSE-5,5,main] 第 3 轮,跑了 17 步,合计 53 / 100 
Thread[HORSE-4,5,main] 第 3 轮,跑了 14 步,合计 46 / 100 
Thread[HORSE-2,5,main] 第 3 轮,跑了 23 步,合计 41 / 100 
Thread[HORSE-1,5,main] 第 3 轮,跑了 5 步,合计 27 / 100 
Thread[HORSE-2,5,main] 第 4 轮,跑了 12 步,合计 53 / 100 
Thread[HORSE-3,5,main] 第 4 轮,跑了 3 步,合计 36 / 100 
Thread[HORSE-1,5,main] 第 4 轮,跑了 21 步,合计 48 / 100 
Thread[HORSE-4,5,main] 第 4 轮,跑了 19 步,合计 65 / 100 
Thread[HORSE-5,5,main] 第 4 轮,跑了 5 步,合计 58 / 100 
Thread[HORSE-2,5,main] 第 5 轮,跑了 15 步,合计 68 / 100 
Thread[HORSE-5,5,main] 第 5 轮,跑了 4 步,合计 62 / 100 
Thread[HORSE-1,5,main] 第 5 轮,跑了 1 步,合计 49 / 100 
Thread[HORSE-3,5,main] 第 5 轮,跑了 8 步,合计 44 / 100 
Thread[HORSE-4,5,main] 第 5 轮,跑了 9 步,合计 74 / 100 
Thread[HORSE-4,5,main] 第 6 轮,跑了 6 步,合计 80 / 100 
Thread[HORSE-2,5,main] 第 6 轮,跑了 6 步,合计 74 / 100 
Thread[HORSE-3,5,main] 第 6 轮,跑了 24 步,合计 68 / 100 
Thread[HORSE-5,5,main] 第 6 轮,跑了 7 步,合计 69 / 100 
Thread[HORSE-1,5,main] 第 6 轮,跑了 16 步,合计 65 / 100 
Thread[HORSE-5,5,main] 第 7 轮,跑了 17 步,合计 86 / 100 
Thread[HORSE-4,5,main] 第 7 轮,跑了 15 步,合计 95 / 100 
Thread[HORSE-2,5,main] 第 7 轮,跑了 1 步,合计 75 / 100 
Thread[HORSE-3,5,main] 第 7 轮,跑了 8 步,合计 76 / 100 
Thread[HORSE-1,5,main] 第 7 轮,跑了 2 步,合计 67 / 100 
Thread[HORSE-2,5,main] 第 8 轮,跑了 2 步,合计 77 / 100 
Thread[HORSE-1,5,main] 第 8 轮,跑了 16 步,合计 83 / 100 
Thread[HORSE-5,5,main] 第 8 轮,跑了 13 步,合计 99 / 100 
Thread[HORSE-3,5,main] 第 8 轮,跑了 8 步,合计 84 / 100 
Thread[HORSE-4,5,main] 第 8 轮,跑了 5 步,合计 100 / 100 
 撞线成功!!!
裁判: 冠军是 HORSE-4
Thread[HORSE-2,5,main] 结束 
Thread[HORSE-1,5,main] 结束 
Thread[HORSE-4,5,main] 结束 
Thread[HORSE-3,5,main] 结束 
Thread[HORSE-5,5,main] 结束 

Process finished with exit code 0

ZK节点:

ls /test/barrier/double/horse/0
[f758a642-e478-495e-8ba1-e15711e56684, 
3dba7601-96d8-4743-8766-0730af37f5ac, 
545cd633-9b71-4dd2-b477-ea4e6dc8dd8c, 
ready, 
136859a1-fe3d-48ef-865a-398fd56e8ede, 
57cac84d-8286-4956-8485-5ee819e1ff82]

7. 小结

通过和DistributedBarrier的示例进行对比,就可以发现:

  1. DistributedDoubleBarrier有明确的开始,结束的边界
  2. DistributedBarrier需要自行实现控制逻辑
共有 人打赏支持
粉丝 114
博文 75
码字总数 138343
×
秋雨霏霏
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: