文档章节

[Curator] Barrier 的使用与分析

秋雨霏霏
 秋雨霏霏
发布于 2017/06/01 20:27
字数 1731
阅读 108
收藏 0
点赞 1
评论 0

Barrier

在分布式系统中,可以使用栅栏,对多个节点上的任务进行阻塞等待;直到满足某个定制的条件,所有的节点才可以继续执行下一步任务。

1. 关键 API

org.apache.curator.framework.recipes.barriers.DistributedBarrier

2. 机制说明

控制多节点上的多任务执行步进。

类似java中的java.util.concurrent.CyclicBarrier分布式实现。

3. 用法

3.1 创建

public DistributedBarrier(CuratorFramework client,
                          String barrierPath)

3.2 使用

3.2.1 等待栅栏

public void waitOnBarrier()

3.2.2 设置/移除栅栏

setBarrier();
removeBarrier();

4. 错误处理

DistributedBarrier实例会监听链接丢失。在waitOnBarrier()时,如果发生丢失时,则会抛出异常。

5. 源码分析

5.1 类定义

public class DistributedBarrier {}

没有继承父类,也没有实现任何接口

5.2 成员变量

public class DistributedBarrier
{
    private final CuratorFramework client;
    private final String barrierPath;
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            notifyFromWatcher();
        }
    };
}
  • client
  • barrierPath
    • 用作栅栏的zk节点path
  • watcher
    • 用作zk链接的监听器

5.3 构造器

public DistributedBarrier(CuratorFramework client, String barrierPath)
{
    this.client = client;
    this.barrierPath = PathUtils.validatePath(barrierPath);
}

简单赋值

5.3 设置栅栏

public synchronized void setBarrier() throws Exception
{
    try
    {
        client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
    }
    catch ( KeeperException.NodeExistsException ignore )
    {
        // ignore
    }
}
  • synchronized同步控制

可见,设置栅栏的过程,就是创建barrierPath节点(普通节点)

5.4 等待

public synchronized void waitOnBarrier() throws Exception
{
    waitOnBarrier(-1, null);
}

public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
{
    long            startMs = System.currentTimeMillis();
    boolean         hasMaxWait = (unit != null);
    long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

    boolean         result;
    for(;;)
    {
        result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
        if ( result )
        {
            break;
        }

        if ( hasMaxWait )
        {
            long        elapsed = System.currentTimeMillis() - startMs;
            long        thisWaitMs = maxWaitMs - elapsed;
            if ( thisWaitMs <= 0 )
            {
                break;
            }
            wait(thisWaitMs);
        }
        else
        {
            wait();
        }
    }
    return result;
}
  • 使用synchronized同步控制
  • 不断检查barrierPath栅栏节点是否存在
    • 如果栅栏不存在了,则栅栏放开了,返回true
    • 如果栅栏还在,则进行等待

5.5 移除栅栏

public synchronized void removeBarrier() throws Exception
{
    try
    {
        client.delete().forPath(barrierPath);
    }
    catch ( KeeperException.NoNodeException ignore )
    {
        // ignore
    }
}

直接删除栅栏节点

6. 测试

对于栅栏的示例,最先想到的就是赛马游戏的场景。所以,这里用DistributedBarrier来实现一个赛马游戏。

package com.roc.curator.demo.barriers

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.barriers.DistributedBarrier
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Test
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

/**
 * Created by roc on 2017/6/1.
 */
class HorseRaceLampTest {

    val PATH: String = "/test/barrier/horse"

    // 由于栅栏是监听链接,所以对于一个链接上的多个栅栏监听,会存在问题
    // 这里为每一个任务创建一个链接
    fun connect(): CuratorFramework {
        val client: CuratorFramework = CuratorFrameworkFactory.builder()
                .connectString("0.0.0.0:8888")
                .connectionTimeoutMs(5000)
                .retryPolicy(ExponentialBackoffRetry(1000, 10))
                .sessionTimeoutMs(3000)
                .build()
        client.start()
        return client
    }

    @Test fun runTest() {
        val name: String = "HORSE-"
        val count: AtomicInteger = AtomicInteger()

        val threadFactory: ThreadFactory = ThreadFactory { r ->
            val t: Thread = Thread(r, name + count.incrementAndGet())
            t
        }

        var champion: AtomicReference<String> = AtomicReference()
        champion.set("")

        val executorService: ExecutorService = Executors.newFixedThreadPool(5, threadFactory)
        //使用一个计步器来判定每一匹马是否完成当轮比赛
        val countStep: AtomicInteger = AtomicInteger()
        var round: Int = 0
        var i: Int = 0
        val target: Int = 100
        while (i < 5) {
            i++
            executorService.execute(Runnable {
                var sumSetp: Int = 0
                run {
                    val client: CuratorFramework = connect()
                    val barrier: DistributedBarrier = DistributedBarrier(client, PATH)
                    //设置栅栏
                    barrier.setBarrier()
                    println("${Thread.currentThread()} 准备完毕 ${Date()}")
                    //通知准备完成
                    countStep.incrementAndGet()
                    //等待开赛
                    barrier.waitOnBarrier()

                    //需要对线程是否中断进行判断
                    while (!Thread.interrupted() || sumSetp < target) {
                        var setp: Int = (Math.random() * 20).toInt()
                        setp = maxOf(setp, 1)
                        setp = minOf(setp, target - sumSetp)
                        sumSetp += setp
                        println("${Thread.currentThread()} 第 $round 轮,跑了 $setp 步,合计 $sumSetp / $target ")

                        //如果已经到达终点,自行标记冠军
                        //示例而已,如果要严谨一点的话,这个应该交给裁判来判断
                        if (sumSetp >= target) {
                            champion.set(Thread.currentThread().name)
                        }
                        //设立栅栏
                        barrier.setBarrier()
                        //通知完成此轮比赛
                        countStep.incrementAndGet()
                        //等待下一轮
                        barrier.waitOnBarrier()
                    }
                }
            })
        }

        val client: CuratorFramework = connect()
        val barrier: DistributedBarrier = DistributedBarrier(client, PATH)
        var pStep: Int = countStep.get()
        while (champion.get().isBlank()) {
            if (countStep.get() > pStep && countStep.get() % 5 == 0) {
                round++
                println("------------------------------------------------")
                println("裁判: 开始第 $round 轮 ${Date()}")
                println("------------------------------------------------")
                pStep = countStep.get()
                barrier.removeBarrier()
            }

            TimeUnit.MILLISECONDS.sleep(500)
        }
        println("裁判: 冠军是 $champion")
        executorService.shutdown()
        barrier.removeBarrier()

    }
}

输出:

Thread[HORSE-2,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-1,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-3,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-5,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-4,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
------------------------------------------------
裁判: 开始第 1 轮 Thu Jun 01 20:18:20 CST 2017
------------------------------------------------
Thread[HORSE-4,5,main] 第 1 轮,跑了 17 步,合计 17 / 100 
Thread[HORSE-3,5,main] 第 1 轮,跑了 13 步,合计 13 / 100 
Thread[HORSE-2,5,main] 第 1 轮,跑了 8 步,合计 8 / 100 
Thread[HORSE-5,5,main] 第 1 轮,跑了 14 步,合计 14 / 100 
Thread[HORSE-1,5,main] 第 1 轮,跑了 18 步,合计 18 / 100 
------------------------------------------------
裁判: 开始第 2 轮 Thu Jun 01 20:18:21 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 2 轮,跑了 11 步,合计 24 / 100 
Thread[HORSE-4,5,main] 第 2 轮,跑了 1 步,合计 18 / 100 
Thread[HORSE-5,5,main] 第 2 轮,跑了 14 步,合计 28 / 100 
Thread[HORSE-2,5,main] 第 2 轮,跑了 1 步,合计 9 / 100 
Thread[HORSE-1,5,main] 第 2 轮,跑了 19 步,合计 37 / 100 
------------------------------------------------
裁判: 开始第 3 轮 Thu Jun 01 20:18:21 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 3 轮,跑了 10 步,合计 34 / 100 
Thread[HORSE-4,5,main] 第 3 轮,跑了 13 步,合计 31 / 100 
Thread[HORSE-1,5,main] 第 3 轮,跑了 10 步,合计 47 / 100 
Thread[HORSE-2,5,main] 第 3 轮,跑了 8 步,合计 17 / 100 
Thread[HORSE-5,5,main] 第 3 轮,跑了 18 步,合计 46 / 100 
------------------------------------------------
裁判: 开始第 4 轮 Thu Jun 01 20:18:22 CST 2017
------------------------------------------------
Thread[HORSE-2,5,main] 第 4 轮,跑了 3 步,合计 20 / 100 
Thread[HORSE-4,5,main] 第 4 轮,跑了 9 步,合计 40 / 100 
Thread[HORSE-3,5,main] 第 4 轮,跑了 16 步,合计 50 / 100 
Thread[HORSE-1,5,main] 第 4 轮,跑了 14 步,合计 61 / 100 
Thread[HORSE-5,5,main] 第 4 轮,跑了 12 步,合计 58 / 100 
------------------------------------------------
裁判: 开始第 5 轮 Thu Jun 01 20:18:22 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 5 轮,跑了 6 步,合计 56 / 100 
Thread[HORSE-1,5,main] 第 5 轮,跑了 17 步,合计 78 / 100 
Thread[HORSE-2,5,main] 第 5 轮,跑了 12 步,合计 32 / 100 
Thread[HORSE-4,5,main] 第 5 轮,跑了 10 步,合计 50 / 100 
Thread[HORSE-5,5,main] 第 5 轮,跑了 3 步,合计 61 / 100 
------------------------------------------------
裁判: 开始第 6 轮 Thu Jun 01 20:18:23 CST 2017
------------------------------------------------
Thread[HORSE-4,5,main] 第 6 轮,跑了 6 步,合计 56 / 100 
Thread[HORSE-2,5,main] 第 6 轮,跑了 3 步,合计 35 / 100 
Thread[HORSE-1,5,main] 第 6 轮,跑了 5 步,合计 83 / 100 
Thread[HORSE-5,5,main] 第 6 轮,跑了 7 步,合计 68 / 100 
Thread[HORSE-3,5,main] 第 6 轮,跑了 1 步,合计 57 / 100 
------------------------------------------------
裁判: 开始第 7 轮 Thu Jun 01 20:18:23 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 7 轮,跑了 14 步,合计 71 / 100 
Thread[HORSE-2,5,main] 第 7 轮,跑了 3 步,合计 38 / 100 
Thread[HORSE-1,5,main] 第 7 轮,跑了 14 步,合计 97 / 100 
Thread[HORSE-4,5,main] 第 7 轮,跑了 1 步,合计 57 / 100 
Thread[HORSE-5,5,main] 第 7 轮,跑了 11 步,合计 79 / 100 
------------------------------------------------
裁判: 开始第 8 轮 Thu Jun 01 20:18:24 CST 2017
------------------------------------------------
Thread[HORSE-4,5,main] 第 8 轮,跑了 15 步,合计 72 / 100 
Thread[HORSE-2,5,main] 第 8 轮,跑了 8 步,合计 46 / 100 
Thread[HORSE-3,5,main] 第 8 轮,跑了 10 步,合计 81 / 100 
Thread[HORSE-5,5,main] 第 8 轮,跑了 9 步,合计 88 / 100 
Thread[HORSE-1,5,main] 第 8 轮,跑了 3 步,合计 100 / 100 
裁判: 冠军是 HORSE-1

zk节点:

get /test/barrier/horse
192.168.60.165
cZxid = 0x1e991
ctime = Thu Jun 01 20:18:23 CST 2017
mZxid = 0x1e991
mtime = Thu Jun 01 20:18:23 CST 2017
pZxid = 0x1e991
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0

© 著作权归作者所有

共有 人打赏支持
秋雨霏霏
粉丝 143
博文 91
码字总数 160569
作品 0
杭州
CTO(技术副总裁)
分布式利器Zookeeper(三)

前言 《分布式利器Zookeeper(一)》 《分布式利器Zookeeper(二):分布式锁》 本篇博客是分布式利器Zookeeper系列的最后一篇,涉及的话题是:Zookeeper分布式锁的代码实现、zkclient的使用、...

zfz_linux_boy
07/01
0
0
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh
04/28
0
0
跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下: Recipes Framework Utilities Client Errors Extensions 前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Clien...

longbadx
2015/02/11
0
0
Apache Curator 2.7.1 发布

Apache Curator 2.7.1 发布了,zookeeper 的客户端调用过于复杂,Curator提供了对zookeeper客户端的封装,Apache Curator 就是为了简化zookeeper客户端调用而生,利用它,可以更好的使用zoo...

凯文加内特
2015/03/02
0
0
Maven构建ZooKeeper项目

执行mvn命令: mvn archetype:create -DgroupId=com.astute.strong -DartifactId=learn_zookeeper -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false 修改pom,添加......

智深
2012/11/24
0
5
Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,...

ZeroOne01
04/29
0
0
Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,...

ZeroOne01
04/29
0
0
使用Apache Curator监控Zookeeper的Node和Path的状态

 1.Zookeeper经常被我们用来做配置管理,配置的管理在分布式应用环境中很常见,例如同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相...

孟飞阳
07/06
0
0
ZooKeeper典型使用场景

先需要下载 curator 的依赖包 curator-framework curator-recipes Guava-14.0.1 分布式锁 在分布式环境中,为了保证数据的一致性,经常在程序的某个运行点需要进行同步控制。 类是一个同步计...

兔之
2015/10/29
0
0
zookeeper实现分布式锁

背景:这是一个老生长谈的问题了,可以用redis、数据库、zookeeper。这里今天就用zookeeper来实现下。 这个版本也是参考CSDN公众号来实现的,原理很简单:只要利用apache curator封装后的cli...

Germmy
2017/11/04
0
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

聊聊ribbon的超时时间设置

序 本文主要研究一下ribbon的超时时间设置 配置 实例 ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 eureka: enabled: ......

go4it
5分钟前
0
0
一行代码结果叹为观止,能做到这么极致的也只有python了

Python 这门语言非常的有趣,不仅可以做高大上的人工智能、大数据、机器学习。还可以用来做 Web、爬虫。还有其它很多的应用。今天我就给大家展示下一行 Python 代码都可以做些什么。 一行打印...

猫咪编程
8分钟前
0
0
KingShard使用

对于kingshard的功能,在git中可以看到明确的功能说明,目前使用的企业较少,但公司这边由于有go的架构师,即使踩坑,我们依然可以通过修改源码进行维护,但是mycat之类的中间件,研发这边未必有很...

mickelfeng
10分钟前
0
0
Linux 下 查找某个字符串

如果你想在当前项目下 查找 "test" 这个字符串,可以这样: grep -rn "test" * * : 表示当前目录所有文件,也可以是某个文件名-r 是递归查找-n 是显示行号-R ...

nsns
10分钟前
0
0
数据结构 之 B树与红黑树

https://blog.csdn.net/v_july_v/article/details/6530142 http://www.cnblogs.com/CarpenterLee/p/5503882.html...

晨猫
10分钟前
0
0
Linux查看服务器总内存和总硬盘大小

一、linux CPU大小; 其实应该通过Physical Processor ID来区分单核和双核。而Physical Processor ID可以从cpuinfo或者dmesg中找到. flags 如果有 ht 说明支持超线程技术 判断物理CPU的个数可...

浮躁的码农
11分钟前
0
0
Postfix命令行说明

Postfix tips and Troubleshooting Commands Here's a list of stuff I user everyday and other email admins will also be using, Let me know if I missed anything List/Print current m......

mingle
17分钟前
0
0
是时候使用Helm了:Helm, Kubernetes的包管理工具

目前我们的一个产品共有4套环境:dev环境、test环境、staging环境、production环境。 其中dev, test, staging环境在一个Kubernetes集群上以不同namespace部署,production环境部署在另一个Kub...

xiaomin0322
24分钟前
0
0
常见的redis的序列化方式

概括 一般redis的序列化方式主要有:字符串序列化、json序列化、xml序列化、jdk序列化,具体可查阅org.springframework.data.redis.serializer.RedisSerializer 的实现类,其中对于json序列化...

菜蚜
39分钟前
1
0
Linux bash入门

一、Linux Bash介绍 Bash是UNIX系统下的一个命令解析器,全称为Bourne-Again Shell是一个为GNU开源项目编写的Unix shell。bash功能强大,尤其是在处理自动循环或者耗时大的任务方面可以节省大...

老韭菜
42分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部