文档章节

Semaphore CountDownLatch CyclicBarrier 源码分析

o
 ovirtKg
发布于 2016/10/19 23:12
字数 1948
阅读 59
收藏 2

java5 中 ,提供了几个并发工具类 ,Semaphore CountDownLatch CyclicBarrier,在并发编程中非常实用。前两者通过  内部类sync 继承AQS,使用共享资源的模式,AQS的实现可参考我的另一篇  AQS 实现分析,前两者根据各自功能需求 , 各自内部实现tryAcquireShared(获取资源)、tryReleaseShared(释放)。来定义什么条件 下来获取与释放。而CyclicBarrier内部通过Reentrantlock与Condition组合的方式实现。

Semaphore

与Reentrantlock类似,也有公平和非公平的机制。这里就不在分析了,默认是非公平的。

通过acquire 获取

Semphore
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);   //非独占模式
}
AQS
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)           //当返回值不小于0时,获得资源。
        doAcquireSharedInterruptibly(arg);  //资源获取失败,加入队列尾部,阻塞
}

从AQS的分析中得到,当tryAcquireShared 返回值小于0,那么认为获取失败。而对于tryAcquiredShared的实现中,让其执行时,其返回值需大于0,对于Semaphore信号量,构造时会设置许可的大小 。表示能够获取的资源,一般大于1。

默认是非公平 的,而后会调用如下方法,会返回

Semaphore NonFairSync
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

减去本次尝试的获取的许可数,当结果小于0,直接返回,而如果不小于0 , cas更新许可,如果更新失败,表明其它线程也在更新,而后 进入下一次循环,直到可用的小于0或者cas成功。当返回值小于0时,阻塞,等待唤醒资源释放。返回值大于 0,获取许可成功,继续执行。

释放

当获取许可的线程执行完时,必须释放占有的许可量,

Semaphore
public void release() {
    sync.releaseShared(1); //释放时需CAS更新,独占模式不需cas
}
AQS
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();  //唤醒head的后继节点
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))   //因为是 共享锁,可能有多个线程释放
            return true;
    }
}

示例:

项目中的示例:在开发的项目中,一个数据中心有很多个集群,集群中有很多个虚拟机实例,同一个集群中有多个虚拟机要开机,但是开机前需要使用调度策略,需要更新集群中的信息,但是只允许一个集群只有一个虚拟机能执行策略。这里就可以通过Semaphore实现。

clusterLockMap.putIfAbsent(cluster.getId(), new Semaphore(1)); //为每个集群初始一个许可

clusterLockMap.get(cluster.getId()).acquire();  //尝试获取许可

//执行逻辑

//释放资源,这里调用drainPermits的原因是,在释放前清空许可,
因为其它 在阻塞的线程如果被中断了,会将许可值扩大,所以在释放前,将其清空。

   finally {
            //保证了在任一时刻,只有一个虚拟机能调用策略
        synchronized (clusterLockMap.get(cluster.getId())) {
            clusterLockMap.get(cluster.getId()).drainPermits();    //清空,返回清空数,为空时直接返回0
            clusterLockMap.get(cluster.getId()).release();
         }
     }

上面的示例中如果不加锁,异常发生时可能导致许可大于一。

CountDownLatch

类似于计数器,比如常用于  一个或几个线程,要等待其它的线程执行完才能继续执行。内部只有一个继承了AQS的sync。只能使用一次。

常用方法 await   ,用来等待执行信号。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);  //状态不为1时,阻塞。等待被唤醒
}
boolean await(long timeout, TimeUnit unit)  超时还没有为0,返回false

 

CountDownLatch sync  
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;  
}

判断当前是否等于0 ,也就是说 countDown 的调用次数是否等于初始化的数量。

countDown  信号的释放

CountDownLatch 
public void countDown() {
    sync.releaseShared(1);
}
CountDownLatch sync 
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

p实例:  

F1  比赛前需要等待发车信号,所有的车才能出发。  CountDownLatch(1)    ,车好比线程,发车前await。

信号发出,countDown。

F1车进入修息区,必须 所有的检测 灯都为绿才能出发。车出发前 await,每一个必检项检测完调用countDown,才能出发(线程才能运行)。

CyclicBarrier

CyclicBarrier 最大的特别 之处就是,在构造时可以指定一个线程,并且可以重复使用。

final Runnable barrierCommand;

该线程的作用在于,当调用await时,且许可为0 时,执行完 barrierCommand的线程  才能继续执行。

实现细节,类的结构图如下:

构造方法时,可以指定barrierCommand,如果不指定,则功能类似于CountDownLatch,也有点计数的意思 。其从成员变量 可以看出,内部有ReentrantLock的属性,内部可能是通过ReentrantLock的调用实现。接下来分析源码。

分析源码前,几个关键变量注意下,

lock 同步锁,

trip = lock.newCondition()

count  计数器

generation   内部实现的类,用来表示是否新的计数的开始

构造时指定count,线程调用await,count减1 阻塞,直到count等于0,且barrierCommand执行完如果有的话。

调用方法有两种:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

内部实现

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier(); //被中断,本次计数无效,并将标志置为broken
            throw new InterruptedException();
        }
        int index = --count; // 计数减一 
        if (index == 0) {  // tripped    
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();  //计数为0时,执行run方法
                ranAction = true;
                nextGeneration(); //本次计数结束,trip.singnalAll(),计数重置。
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();  //出现异常,本次计数无效。唤醒其它await线程,
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();  // 阻塞
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos); // 限时阻塞,调用await(long timeout, TimeUnit unit),
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)  //  是否还是同一个计数周期内
                return index; // 顺利执行,返回

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();  //只有调用超时await方法,且超时,抛出。
            }
        }
    } finally {
        lock.unlock();
    }
}

generation  的操作   

顺利执行后,当计数为0,且周期重置。

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

执行时出现异常或者等待超时,唤醒。计数无效,使唤醒的线程检测到generation的broken 标识为true,抛出BrokenBarrierException 。

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

 

总结:
 

从这上面的分析可以得出以下情况

大部分的并发工具类,是通过构造内部类继承AQS,并根据工具类设计的功能,实现对应的获取与释放资源的方法。来使得 调用线程何时阻塞,何时释放。如CountDownLatch 就不同于Semaphore的设计思路,Semaphore是先尝试获取资源,获取到才能释放,获取不到,阻塞。虽然实现方式(继承AQS)一样。而CountDownLatch  释放资源 前没有通过相应的方法获取,而是直接将当前的许可数减一,实现方法 tryReleaseShared ,而后根据是否计数为0决定是否唤醒线程。而对应的获取资源方法 tryAcquireShared,判断计数器是否已经到达0来判断是否执行。或者阻塞。获取后不需要释放。

而CyclicBarrier 并没有直接通过内部sync继承AQS的方式,而是通过现有的工具类Reentantlock,与Condition组合来实现功能,且还能重用。

© 著作权归作者所有

o
粉丝 3
博文 57
码字总数 48798
作品 0
景德镇
私信 提问
【并发编程】--CountDownLatch、CyclicBarrier、Semaphore

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 https://blog.csdn.net/aiTCR/article/details/97965620 JUC中提供常用的并发工具类,Count...

TTcccCarrie
07/31
0
0
[Java并发系列] 5.Java并发工具类

在J.U.C包中,提供了几个非常有用的并发工具类,通过使用这些工具类,可以有效提高并发编程中,并发流程的控制,以提升效率和代码质量,如下: CountDownLatch CyclicBarrier Semaphore 1. ...

Xiangdong_She
2017/10/27
0
0
并发十二:CountDownLatch、CyclicBarrier、Semaphore实现分析

J.U.C中提供了三个同步工具CountDownLatch、CyclicBarrier、Semaphore,都是共享锁的特殊应用,用来进行线程的任务协调。 CountDownLatch 一个小栗子: 输出:"Thread-0:二级表生成、Thread-...

wangjie2016
2018/04/14
0
0
Java中的并发工具类(CountDownLatch、CyclicBarrie、Exchanger)

在JDK的并发包里提供了很多有意思的并发工具类。CountDownLatch、CyclicBarrier和Semaphore 工具类提供了一种并发流程控制的手段,Exchanger 工具类则提供了在线程间交换数据的一种手段。 1....

孟飞阳
02/22
178
0
Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore   在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们...

明舞
2015/09/01
154
0

没有更多内容

加载失败,请刷新页面

加载更多

vue vue-router beforeRouteEnter

本文转载于:专业的前端网站➬vue vue-router beforeRouteEnter beforeRouteEnter (to, from, next) { // 在渲染该组件的对应路由被 confirm 前调用 // 不!能!获取组件实例 `thi...

前端老手
48分钟前
4
0
Spring Boot 2 实战:结合 Docker 容器化

1. 前言 Docker 是啥?好处是啥? 什么现在如果作为一名开发你还不知道甚至没有用过我都感觉你落伍了。Docker 是一种虚拟化的容器技术,目的为了打造持续集成、版本控制、可移植性、隔离性和...

码农小胖哥
53分钟前
4
0
Linux 常用查询句柄的命令(Too many open files)

摘要 查询命令 进程占用的句柄总数 lsof -n | wc -l 查看哪一个进程占用的句柄最多 lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr|more 第一列 是 占用句柄数; 第二列 是 进程号 PID;...

liangxiao
53分钟前
5
0
JavaScript权威指南笔记5

第五章、语句 0、概述 JS程序是一系列可执行语句的集合。 通过控制语句来改变语句的默认执行顺序。 控制语句:条件、循环和跳转(如return、break和throw) 2、复合语句 当多条语句被当做一条语...

_Somuns
59分钟前
5
0
vmware安装ubuntu18.04总是 panic -not syncing:corrupted stack end detected inside schedule

Vmware 安装ubuntu 总是卡着不动, 提示panic -not synciong 网上提示很多办法,都试了效果不佳, 找到了完美解决办法 你的兼容模式重新选一下为6.0,兼容性对硬件有要求的 即可实现 成功安装...

dragon_tech
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部