文档章节

[Curator] Double Barrier 的使用与分析

秋雨霏霏
 秋雨霏霏
发布于 2017/06/02 15:40
字数 2478
阅读 62
收藏 0

Double Barrier

双重栅栏能够让客户端在任务的开始和结束阶段更好的同步控制。 当有足够的任务已经进入到栅栏后,一起开始,一旦任务完成则离开栅栏。

1. 关键 API

org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier

2. 机制说明

对于普通栅栏,如果要控制多个任务的开始,结束。需要自己实现控制逻辑。 而双重栅栏,相当于有了开始,结束的边界,在使用时,可以更好的控制。

双重栅栏可以控制着进入栅栏的任务数量,如果数量达到要求,则认为任务可以开始执行。这样就不需要手工控制任务的准备阶段。

结束时,也可以阻塞等待所有任务的退出。

3. 用法

3.1 创建

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)

要注意memberQty就是参与栅栏的任务数量。

3.2 使用

  1. 进入
public void enter();
  1. 离开
public void leave();

使用起来很简单

4. 错误处理

DistributedDoubleBarrier实例会监听链接状态。如果链接丢失则会在enter()leave()方法上抛出异常

5. 源码分析

5.1 类定义

public class DistributedDoubleBarrier {}

DistributedBarrier一样,没有父类和接口

5.2 成员变量

public class DistributedDoubleBarrier
{
    private final CuratorFramework client;
    private final String barrierPath;
    private final int memberQty;
    private final String ourPath;
    private final String readyPath;
    private final AtomicBoolean hasBeenNotified = new AtomicBoolean(false);
    private final AtomicBoolean connectionLost = new AtomicBoolean(false);
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
            notifyFromWatcher();
        }
    };

    private static final String     READY_NODE = "ready";
}

这可就比DistributedBarrier复杂多了。

  • client
  • barrierPath
  • memberQty
    • 参与的任务数量
  • ourPath
    • 当前对应的zk节点path
  • readyPath
    • 准备状态对应的zk节点path
  • hasBeenNotified
    • 是否已通知参与的任务
    • AtomicBoolean原子化包装
  • connectionLost
    • 链接是否已丢失
    • AtomicBoolean原子化包装
  • watcher
    • 链接状态监听器
  • READY_NODE
    • 私用常量
    • 任务节点已准备完成对应的path

5.3 构造器

public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
{
    Preconditions.checkState(memberQty > 0, "memberQty cannot be 0");

    this.client = client;
    this.barrierPath = PathUtils.validatePath(barrierPath);
    this.memberQty = memberQty;
    ourPath = ZKPaths.makePath(barrierPath, UUID.randomUUID().toString());
    readyPath = ZKPaths.makePath(barrierPath, READY_NODE);
}
  1. ourPath在barrierPath后面接一个UUID
  2. readyPath在barrierPath后面接了一个ready

从构造器的处理,大致能够推断出:

  1. 所有的参与的节点在barrierPath下,以UUID为名注册自己
  2. 准备完成后以barrierPath/ready为信号,开始执行

5.4 进入

public void enter() throws Exception
{
    enter(-1, null);
}

public boolean     enter(long maxWait, TimeUnit unit) throws Exception
{
    long            startMs = System.currentTimeMillis();
    boolean         hasMaxWait = (unit != null);
    long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

    boolean         readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);
    client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);

    boolean         result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));
    if ( connectionLost.get() )
    {
        throw new KeeperException.ConnectionLossException();
    }

    return result;
}
  1. 创捷一个临时节点ourPath
    • 注册自己
  2. 判断准备标识节点readyPath)以及internalEnter方法
    • 准备标识节点的存在,表示栅栏已经开始执行任务了,新的任务可以随时进入
      • 判断短路,就不用执行internalEnter处理了
    • 如果true,表示当前任务节点已经进入栅栏
    • 如果false,表示进入失败
private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
    boolean result = true;
    do
    {
        List<String>    children = getChildrenForEntering();
        int             count = (children != null) ? children.size() : 0;
        if ( count >= memberQty )
        {
            try
            {
                client.create().forPath(readyPath);
            }
            catch ( KeeperException.NodeExistsException ignore )
            {
                // ignore
            }
            break;
        }

        if ( hasMaxWait && !hasBeenNotified.get() )
        {
            long        elapsed = System.currentTimeMillis() - startMs;
            long        thisWaitMs = maxWaitMs - elapsed;
            if ( thisWaitMs <= 0 )
            {
                result = false;
            }
            else
            {
                wait(thisWaitMs);
            }

            if ( !hasBeenNotified.get() )
            {
                result = false;
            }
        }
        else
        {
            wait();
        }
    } while ( false );

    return result;
}
  1. 获得barrierPath下的子节点
    • 获得参与者列表
  2. 如果参与者的数量已经大于预设值memberQty,则认为可以开始执行任务了
  3. 如果还不够数,则进行等待
  • do...while(false)循环
    • 可以利用break达到类似的goto的效果

5.5 退出

public synchronized void     leave() throws Exception
{
    leave(-1, null);
}

public synchronized boolean     leave(long maxWait, TimeUnit unit) throws Exception
{
    long            startMs = System.currentTimeMillis();
    boolean         hasMaxWait = (unit != null);
    long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

    return internalLeave(startMs, hasMaxWait, maxWaitMs);
}

逻辑都在internalLeave方法中:

private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception
{
    String          ourPathName = ZKPaths.getNodeFromPath(ourPath);
    boolean         ourNodeShouldExist = true;
    boolean         result = true;
    for(;;)
    {
        if ( connectionLost.get() )
        {
            throw new KeeperException.ConnectionLossException();
        }

        List<String> children;
        try
        {
            children = client.getChildren().forPath(barrierPath);
        }
        catch ( KeeperException.NoNodeException dummy )
        {
            children = Lists.newArrayList();
        }
        children = filterAndSortChildren(children);
        if ( (children == null) || (children.size() == 0) )
        {
            break;
        }

        int                 ourIndex = children.indexOf(ourPathName);
        if ( (ourIndex < 0) && ourNodeShouldExist )
        {
            if ( connectionLost.get() )
            {
                break;  // connection was lost but we've reconnected. However, our ephemeral node is gone
            }
            else
            {
                throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName));
            }
        }

        if ( children.size() == 1 )
        {
            if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) )
            {
                throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName));
            }
            checkDeleteOurPath(ourNodeShouldExist);
            break;
        }

        Stat            stat;
        boolean         IsLowestNode = (ourIndex == 0);
        if ( IsLowestNode )
        {
            String  highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1));
            stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath);
        }
        else
        {
            String  lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0));
            stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath);

            checkDeleteOurPath(ourNodeShouldExist);
            ourNodeShouldExist = false;
        }

        if ( stat != null )
        {
            if ( hasMaxWait )
            {
                long        elapsed = System.currentTimeMillis() - startMs;
                long        thisWaitMs = maxWaitMs - elapsed;
                if ( thisWaitMs <= 0 )
                {
                    result = false;
                }
                else
                {
                    wait(thisWaitMs);
                }
            }
            else
            {
                wait();
            }
        }
    }

    try
    {
        client.delete().forPath(readyPath);
    }
    catch ( KeeperException.NoNodeException ignore )
    {
        // ignore
    }

    return result;
}

整体上就是一个for(;;)不断重试的过程

  1. 检查链接状态
  2. 获取参与者列表(也可能带有准备标识节点readyPath))
  3. 如果没有参与者了,则退出循环,返回true
    • 说明参与者都已退出
  4. 再从列表中获得当前节点的顺位
  5. 如果自己已经不再列别中了
    • 检查链接,如果链接丢失,则也认为退出成功
  6. 如果只有自己在列表中
    • 则删除自己节点,退出成功
    • 说明其他参与者都已退出
  7. 如果有多个参与者,且自己处在第一顺位
    • 则检查最后顺位的状态
  8. 如果不是第一顺位
    • 则检查第一顺位的状态
    • 并删除自身节点
  9. 最后等待参与者逐个退出后,删除readyPath节点

6. 测试

使用DistributedDoubleBarrier来改写DistributedBarrier中赛马游戏

package com.roc.curator.demo.barriers

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.barriers.DistributedBarrier
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

/**
 * Created by roc on 2017/6/2.
 */
class DoubleBarriersTest {

    val PATH: String = "/test/barrier/double/horse/"

    var client: CuratorFramework = CuratorFrameworkFactory.builder()
            .connectString("0.0.0.0:8888")
            .connectionTimeoutMs(5000)
            .retryPolicy(ExponentialBackoffRetry(1000, 10))
            .sessionTimeoutMs(3000)
            .build()

    @Before fun init() {
        client.start()
    }

    @Test fun runTest() {
        val name: String = "HORSE-"
        val count: AtomicInteger = AtomicInteger()

        val threadFactory: ThreadFactory = ThreadFactory { r ->
            val t: Thread = Thread(r, name + count.incrementAndGet())
            t
        }

        var champion: AtomicReference<String> = AtomicReference()
        champion.set("")

        val member: Int = 5

        val executorService: ExecutorService = Executors.newFixedThreadPool(member, threadFactory)

        val isOver: AtomicBoolean = AtomicBoolean(false)
        var i: Int = 0
        val target: Int = 100
        while (i < 5) {
            i++
            executorService.execute(Runnable {
                var round: Int = 0
                var sumSetp: Int = 0
                run {
                    var doubleBarrier: DistributedDoubleBarrier = DistributedDoubleBarrier(client, PATH + round, member)
                    //设置栅栏
                    println("${Thread.currentThread()} 准备完毕 ${Date()}")
                    //等待开赛
                    doubleBarrier.enter()

                    //需要对线程是否中断进行判断
                    try {
                        while (!isOver.get() && sumSetp < target) {
                            //新一轮
                            ++round
                            var roundBarrier: DistributedDoubleBarrier = DistributedDoubleBarrier(client, PATH + round, member)
                            //进入等待区
                            roundBarrier.enter()

                            var setp: Int = (Math.random() * 25).toInt()
                            setp = maxOf(setp, 1)
                            setp = minOf(setp, target - sumSetp)
                            sumSetp += setp
                            println("${Thread.currentThread()} 第 $round 轮,跑了 $setp 步,合计 $sumSetp / $target ")

                            //如果已经到达终点,自行标记冠军
                            //示例而已,如果要严谨一点的话,这个应该交给裁判来判断
                            if (sumSetp >= target) {
                                //使用CAS操作,避免同一轮两个完赛马,出现并发问题
                                if (champion.compareAndSet("", Thread.currentThread().name)) {
                                    println(" 撞线成功!!!")
                                } else {
                                    println(" 撞线失败~~~~")
                                }
                            }

                            //等待下一轮
                            roundBarrier.leave()
                        }
                    } finally {
                        println("${Thread.currentThread()} 结束 ")
                        //比赛结束
                        doubleBarrier.leave()
                    }
                }
            })
        }

        while (champion.get().isBlank()) {
            TimeUnit.MILLISECONDS.sleep(500)
        }
        println("裁判: 冠军是 $champion")
        isOver.set(true)

        TimeUnit.MILLISECONDS.sleep(5000)
    }
}

输出:

Thread[HORSE-4,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-3,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-2,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-1,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-5,5,main] 准备完毕 Fri Jun 02 15:40:22 CST 2017
Thread[HORSE-4,5,main] 第 1 轮,跑了 20 步,合计 20 / 100 
Thread[HORSE-5,5,main] 第 1 轮,跑了 15 步,合计 15 / 100 
Thread[HORSE-3,5,main] 第 1 轮,跑了 15 步,合计 15 / 100 
Thread[HORSE-2,5,main] 第 1 轮,跑了 8 步,合计 8 / 100 
Thread[HORSE-1,5,main] 第 1 轮,跑了 4 步,合计 4 / 100 
Thread[HORSE-2,5,main] 第 2 轮,跑了 10 步,合计 18 / 100 
Thread[HORSE-1,5,main] 第 2 轮,跑了 18 步,合计 22 / 100 
Thread[HORSE-5,5,main] 第 2 轮,跑了 21 步,合计 36 / 100 
Thread[HORSE-4,5,main] 第 2 轮,跑了 12 步,合计 32 / 100 
Thread[HORSE-3,5,main] 第 2 轮,跑了 3 步,合计 18 / 100 
Thread[HORSE-3,5,main] 第 3 轮,跑了 15 步,合计 33 / 100 
Thread[HORSE-5,5,main] 第 3 轮,跑了 17 步,合计 53 / 100 
Thread[HORSE-4,5,main] 第 3 轮,跑了 14 步,合计 46 / 100 
Thread[HORSE-2,5,main] 第 3 轮,跑了 23 步,合计 41 / 100 
Thread[HORSE-1,5,main] 第 3 轮,跑了 5 步,合计 27 / 100 
Thread[HORSE-2,5,main] 第 4 轮,跑了 12 步,合计 53 / 100 
Thread[HORSE-3,5,main] 第 4 轮,跑了 3 步,合计 36 / 100 
Thread[HORSE-1,5,main] 第 4 轮,跑了 21 步,合计 48 / 100 
Thread[HORSE-4,5,main] 第 4 轮,跑了 19 步,合计 65 / 100 
Thread[HORSE-5,5,main] 第 4 轮,跑了 5 步,合计 58 / 100 
Thread[HORSE-2,5,main] 第 5 轮,跑了 15 步,合计 68 / 100 
Thread[HORSE-5,5,main] 第 5 轮,跑了 4 步,合计 62 / 100 
Thread[HORSE-1,5,main] 第 5 轮,跑了 1 步,合计 49 / 100 
Thread[HORSE-3,5,main] 第 5 轮,跑了 8 步,合计 44 / 100 
Thread[HORSE-4,5,main] 第 5 轮,跑了 9 步,合计 74 / 100 
Thread[HORSE-4,5,main] 第 6 轮,跑了 6 步,合计 80 / 100 
Thread[HORSE-2,5,main] 第 6 轮,跑了 6 步,合计 74 / 100 
Thread[HORSE-3,5,main] 第 6 轮,跑了 24 步,合计 68 / 100 
Thread[HORSE-5,5,main] 第 6 轮,跑了 7 步,合计 69 / 100 
Thread[HORSE-1,5,main] 第 6 轮,跑了 16 步,合计 65 / 100 
Thread[HORSE-5,5,main] 第 7 轮,跑了 17 步,合计 86 / 100 
Thread[HORSE-4,5,main] 第 7 轮,跑了 15 步,合计 95 / 100 
Thread[HORSE-2,5,main] 第 7 轮,跑了 1 步,合计 75 / 100 
Thread[HORSE-3,5,main] 第 7 轮,跑了 8 步,合计 76 / 100 
Thread[HORSE-1,5,main] 第 7 轮,跑了 2 步,合计 67 / 100 
Thread[HORSE-2,5,main] 第 8 轮,跑了 2 步,合计 77 / 100 
Thread[HORSE-1,5,main] 第 8 轮,跑了 16 步,合计 83 / 100 
Thread[HORSE-5,5,main] 第 8 轮,跑了 13 步,合计 99 / 100 
Thread[HORSE-3,5,main] 第 8 轮,跑了 8 步,合计 84 / 100 
Thread[HORSE-4,5,main] 第 8 轮,跑了 5 步,合计 100 / 100 
 撞线成功!!!
裁判: 冠军是 HORSE-4
Thread[HORSE-2,5,main] 结束 
Thread[HORSE-1,5,main] 结束 
Thread[HORSE-4,5,main] 结束 
Thread[HORSE-3,5,main] 结束 
Thread[HORSE-5,5,main] 结束 

Process finished with exit code 0

ZK节点:

ls /test/barrier/double/horse/0
[f758a642-e478-495e-8ba1-e15711e56684, 
3dba7601-96d8-4743-8766-0730af37f5ac, 
545cd633-9b71-4dd2-b477-ea4e6dc8dd8c, 
ready, 
136859a1-fe3d-48ef-865a-398fd56e8ede, 
57cac84d-8286-4956-8485-5ee819e1ff82]

7. 小结

通过和DistributedBarrier的示例进行对比,就可以发现:

  1. DistributedDoubleBarrier有明确的开始,结束的边界
  2. DistributedBarrier需要自行实现控制逻辑

© 著作权归作者所有

共有 人打赏支持
秋雨霏霏
粉丝 149
博文 91
码字总数 160620
作品 0
杭州
CTO(技术副总裁)
分布式利器Zookeeper(三)

前言 《分布式利器Zookeeper(一)》 《分布式利器Zookeeper(二):分布式锁》 本篇博客是分布式利器Zookeeper系列的最后一篇,涉及的话题是:Zookeeper分布式锁的代码实现、zkclient的使用、...

zfz_linux_boy
07/01
0
0
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh
04/28
0
0
跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下: Recipes Framework Utilities Client Errors Extensions 前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Clien...

longbadx
2015/02/11
0
0
Apache Curator 2.7.1 发布

Apache Curator 2.7.1 发布了,zookeeper 的客户端调用过于复杂,Curator提供了对zookeeper客户端的封装,Apache Curator 就是为了简化zookeeper客户端调用而生,利用它,可以更好的使用zoo...

凯文加内特
2015/03/02
0
0
使用Zookeeper解决微服务架构下分布式事务问题

准备工作 单机调试zookeeper集群的话,我们需要在虚拟机里虚拟出几台“微服务器“,做这一步操作之前需要在系统中预留出来8G以上磁盘空间,4G以上物理内存。 [if !supportLists]1. [endif]虚...

A尚学堂Nancy老师
09/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

利用责任链模式设计一个拦截器

前言 近期在做 Cicada 的拦截器功能,正好用到了责任链模式。 这个设计模式在日常使用中频率还是挺高的,借此机会来分析分析。 责任链模式 先来看看什么是责任链模式。 引用一段维基百科对其...

编程SHA
18分钟前
0
0
IDE,SATA,SCSI,SAS,FC,SSD说明与区别

DE是俗称的并口,SATA是俗称的串口,这两种硬盘是个人电脑和低端服务器常见的硬盘。SCSI是”小型计算机系统专用接口”的简称,SCSI硬盘就是采用这种接口的硬盘。SAS就是串口的SCSI接口。一般...

mskk
20分钟前
0
0
MySQL面试题集锦

什么是数据库索引?索引有哪几种类型?什么是最左前缀原则?索引算法有哪些?有什么区别? 索引是对数据库表中一列或多列的值进行排序的一种结构。一个非常恰当的比喻就是书的目录页与书的正...

老道士
55分钟前
0
0
使用 LogStash 归集日志

elastic 官网: https://www.elastic.co/ 为了便于集中查看多台主机的业务日志,使用 Filebeat, Redis, Logstash的方式进行收集: (1) Filebeat 监控日志文件的变化, 将新增部分写入redis中, 每...

ouhoo
59分钟前
0
0
java序列化(六) - protostuff序列化

添加依赖 <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.5.9</version> </de......

晨猫
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部