文档章节

CyclicBarrier、CountDownLatch以及Semaphore使用及其原理分析

申文波
 申文波
发布于 06/23 22:34
字数 2500
阅读 185
收藏 8
点赞 1
评论 0

CyclicBarrier、CountDownLatch以及Semaphore是Java并发包中几个常用的并发组件,这几个组件特点是功能相识很容易混淆。首先我们分别介绍这几个组件的功能然后再通过实例分析和源码分析其中设计原理。

CyclicBarrier

主要功能:

CyclicBarrier的主要功能是使1~(N-1)个线程达到某个屏障后阻塞,直到第N个线程到达该屏障后才会被打开,这是所有的线程才能继续执行下去,CyclicBarrier同时支持一个Runable对象,当所有线程到达该屏障时执行该Runable对象。

用例:

package com.github.wenbo2018.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @author : wenbo.shen
 * @date : 2018/6/22
 */
public class CyclicBarrierUsing {

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new TestThread().start();
        }
    }

    static class TestThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "达到屏障");
            try {
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "离开屏障");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

}

输出:

Thread-0达到屏障
Thread-2达到屏障
Thread-1达到屏障
Thread-1离开屏障
Thread-2离开屏障
Thread-0离开屏障

可以看到三个线程同时达到屏障后所有线程才开始离开屏障继续运行。下面我们将分析其设计原理。

设计原理

CyclicBarrier调用await()方法是线程等待,await()方法源码如下:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

其内部调用的是doWait()方法,await()还有一个带超时的重载方法,功能类似。doWait()方法代码如下:

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //获取锁,显然每次只有一个线程能获取到对象的锁,下面这段代码每次只能被一个线程执行
        lock.lock();
        try {
            //判断是否处于下一代,默认g.broken=false;
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程被中断调用breakBarrier退出屏障并抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //减少线程达到屏障线程数
            int index = --count;
            //如果所有线程到达屏障,唤醒其它线程继续执行
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //获取需要执行的Runnable对象,如果不为null则执行run方法
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    //设置执行方法完成
                    ranAction = true;
                    //通知其它线程继续执行并重置下一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //如果还有其它线程没有到达屏障将执行下面循环
            for (;;) {
                try {
                    //是否是超时等待,不是超时等待立马调用trip.await(),trip是Condition,调用await将会是线程阻塞,否则调用带有超时时间的awaitNanos(nanos)(超时时间大于0的情况下)
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                //如果设置了超时且过了超时时间,查看当前代是否被破坏,破坏抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //不是当前代返回
                if (g != generation)
                    return index;
                //设置了超时且超时时间小于0,设置当前代被破坏同时唤醒其它线程并抛出超时异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

Generation是一个静态内部类,表明CyclicBarrier的代,表明每个CyclicBarrier执行的实例,如果当前CyclicBarrier正常执行完将会重置代,否则将会破坏代。

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

breakBarrier方法会破坏屏障,可以看到起设置了代为破坏状态同时调用Condition的signalAll方法唤醒所有在等待的线程。

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

nextGeneration主要作用为重置下一代,内部也会唤醒正在等待的线程同时将屏障数量复位方便下一次使用。

CountDownLatch

CountDownLatch的主要功能是实现几个计数器,使N个现场执行完成后当前线程才会继续执行下去。比如我们希望将一个事件分成多个线程去执行,执行完后进行汇总这种情景就可以使用CountDownLatch。

用例

package com.github.wenbo2018.concurrent;

import java.util.concurrent.CountDownLatch;

/**
 * Created by shenwenbo on 2018/6/23.
 */
public class CountDownLatchUsing {

    private static CountDownLatch countDownLatch = new CountDownLatch(5);


    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            new Testthread().start();
        }
        countDownLatch.await();
        System.out.println("主线程执行");

    }


    static class Testthread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "执行完毕");
            countDownLatch.countDown();
        }
    }

}

输出:

Thread-0执行完毕
Thread-1执行完毕
Thread-2执行完毕
Thread-3执行完毕
Thread-4执行完毕
主线程执行

原理分析

构造方法

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

CountDownLatch构造函数中调用了Sync构造方法,Sync继承了AQS内容如下:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

设置了state,这个东西再熟悉不过了,在可重入锁中表示是否获取到锁的标志位。

我们首先看await方法,

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

实际调用的是AQS的acquireSharedInterruptibly方法,从名字可以看出采用的是共享模式。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程被中断,直接跑出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试共享模式获取节点
        if (tryAcquireShared(arg) < 0)
            //失败则进一步执行获取节点
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared代码在Sync中被实现如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

如果state等于0返回1,否则返回-1;state等于0说明没有对象在同步器中,线程可以继续执行下去,否则进入doAcquireSharedInterruptibly方法中,doAcquireSharedInterruptibly方法如下:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这段代码就是共享模式获取节点,获取不到就进入队列中休眠,这个跟读写锁一样,知道state等于0后被唤醒。

countDown方法如下:

public void countDown() {
     sync.releaseShared(1);
}

调用的是Sync的releaseShared方法,

public final boolean releaseShared(int arg) {
        //尝试释放节点,函数被重写在Sync中
        if (tryReleaseShared(arg)) {
            //释放共享节点
            doReleaseShared();
            return true;
        }
        return false;
    }
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            //循环执行,减少state
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

countDown方法的主要功能就是通过CAS方法减少state的值,减少成功后唤醒队列中的节点。唤醒主节点成功后doAcquireSharedInterruptibly中方法会继续执行接着判断state是否等于0,不等与继续休眠否则继续执行线程。

Semaphore

Semaphore可以控制访问线程的数量。

用例:

package com.github.wenbo2018.concurrent;

import java.util.concurrent.Semaphore;

/**
 * Created by wenbo.shen on 2018/5/6.
 */
public class SemaphoreUseing {

    private static Semaphore semaphore = new Semaphore(1);

    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(count);

    }

}

原理分析

首先看构造方法,

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

默认构造方法采用非公平模式。

acquire方法如下:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

很熟悉默认采用的是共享模式获取节点信息,跟读锁类似。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取共享获取节点,返回结果小于0说明有超过线程正在访问,需要对线程进行休眠。
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //获取当前状态
                int available = getState();
                //设置剩下的值,如果剩余值小于0或者case设置成功返回剩余值
                int remaining = available - acquires;
                if (remaining < 0 ||
                    //cas设置state
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

 

release方法如下:

public void release() {
    sync.releaseShared(1);
}

acquireSharedInterruptibly方法如下:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared方法如下:

    public final boolean releaseShared(int arg) {
        //尝试释放节点,在子类中被重写,释放成功后唤醒正在休眠的线程
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

 

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取当前state值
                int current = getState();
                //恢复当前值
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

 

总结

CyclicBarrier主要用于N个线程之间互斥,当且仅当N个线程都执行到屏障处所有线程才能继续执行下去,CyclicBarrier可以被重复使用,CyclicBarrier通过可冲入锁+AQS+Condition实现,CyclicBarrier调用await方法获取可重入锁同时减少state的值,state==0时唤醒所有正在等待的线程,否则线程处于等待状态,线程间的通信主要通过Condition机制来实现。

CountDownLatch主要用于某个线程等待N个线程执行完后等待的线程接着继续执行下去,不能够重复执行,CountDownLatch通过设施AQS state值来实现,每次调用counDown方法后都去唤醒正在等待的线程,等待的线程判断state是否等于0,等于0就继续执行。

Semaphore用于控制访问线程的数量,Semaphore通过设置AQS state值来实现,调用require方法后cas减少state的值,如果state值为负数说明有更多线程正在访问代码块,这是后需要把这些线程休眠,调用release方法后重新增加state值,重新增加state值后去唤醒正在等待的线程。

© 著作权归作者所有

共有 人打赏支持
申文波
粉丝 11
博文 40
码字总数 43979
作品 0
长宁
程序员
[Java并发系列] 5.Java并发工具类

在J.U.C包中,提供了几个非常有用的并发工具类,通过使用这些工具类,可以有效提高并发编程中,并发流程的控制,以提升效率和代码质量,如下: CountDownLatch CyclicBarrier Semaphore 1. ...

Xiangdong_She
2017/10/27
0
0
并发十二:CountDownLatch、CyclicBarrier、Semaphore实现分析

J.U.C中提供了三个同步工具CountDownLatch、CyclicBarrier、Semaphore,都是共享锁的特殊应用,用来进行线程的任务协调。 CountDownLatch 一个小栗子: 输出:"Thread-0:二级表生成、Thread-...

wangjie2016
04/14
0
0
Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore   在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们...

明舞
2015/09/01
0
0
Semaphore CountDownLatch CyclicBarrier 源码分析

java5 中 ,提供了几个并发工具类 ,Semaphore CountDownLatch CyclicBarrier,在并发编程中非常实用。前两者通过 内部类sync 继承AQS,使用共享资源的模式,AQS的实现可参考我的另一篇 AQS ...

ovirtKg
2016/10/19
18
0
Java中的并发工具

在JDK的并发包里面提供了几个非常有用的并发工具,CountDownLatch、CyclicBarrier、Semaphore工具类提供了一种并发控制流程的手段。 一、CountDownLatch CountDownLatch是一个同步辅助类,在...

Dreyer
2016/05/15
153
0
CountDownLatch、CyclicBarrier、Semaphore、Exchanger

CountDownLatch: 允许N个线程等待其他线程完成执行。无法进行重复使用,只能用一次。 比如有2个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功...

jephon
2016/08/14
0
0
JUC整体架构图

JUC相关整体框架图 整体架构.png JUC相关UML图 reentrantlock uml图 reentrantreadwritelock uml图 countdownlatch uml图 semaphore uml图 cyclicbarrier uml图...

小鱼嘻嘻
01/18
0
0
java的concurrent包工具类

java的concurrent包工具类 concurrent的工具包类主要用来协调不同线程的运行状态(完成状态、完成步调)、对同步资源的访问限制。 1、CountDownLatch countDownLatch是通过一个计数器来实现线...

GITTODO
2016/03/28
65
0
concurrent包的同步器

concurrent包的同步器:CountDownLatch、CyclicBarrier、Semaphore 同步器简介 名称 功能 构成 主要方法 CountDownLatch(闭锁) 一个线程等待其它线程完成各自工作后在执行 继承aqs await()...

GITTODO
04/25
0
0
AbstractQueuedSynchronizer在工具类Semaphore、CountDownLatch、ReentrantLock中的应用和CyclicBarrier

在上篇文章本人粗略地整理了AbstractQueuedSynchronizer和ReentrantLock的源码要点。其实,在java.util.concurrent包中,AbstractQueuedSynchronizer的应用非常广泛,而不局限于在Reentrant...

pczhangtl
2013/11/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

SpringBoot | 第十章:Swagger2的集成和使用

前言 前一章节介绍了mybatisPlus的集成和简单使用,本章节开始接着上一章节的用户表,进行Swagger2的集成。现在都奉行前后端分离开发和微服务大行其道,分微服务及前后端分离后,前后端开发的...

oKong
今天
2
0
Python 最小二乘法 拟合 二次曲线

Python 二次拟合 随机生成数据,并且加上噪声干扰 构造需要拟合的函数形式,使用最小二乘法进行拟合 输出拟合后的参数 将拟合后的函数与原始数据绘图后进行对比 import numpy as npimport...

阿豪boy
今天
1
0
云拿 无人便利店

附近(上海市-航南路)开了家无人便利店.特意进去体验了一下.下面把自己看到的跟大家分享下. 经得现场工作人员同意后拍了几张照片.从外面看是这样.店门口的指导里强调:不要一次扫码多个人进入....

周翔
昨天
1
0
Java设计模式学习之工厂模式

在Java(或者叫做面向对象语言)的世界中,工厂模式被广泛应用于项目中,也许你并没有听说过,不过也许你已经在使用了。 简单来说,工厂模式的出现源于增加程序序的可扩展性,降低耦合度。之...

路小磊
昨天
161
1
npm profile 新功能介绍

转载地址 npm profile 新功能介绍 npm新版本新推来一个功能,npm profile,这个可以更改自己简介信息的命令,以后可以不用去登录网站来修改自己的简介了 具体的这个功能的支持大概是在6这个版...

durban
昨天
1
0
Serial2Ethernet Bi-redirection

Serial Tool Serial Tool is a utility for developing serial communications, custom protocols or device testing. You can set up bytes to send accordingly to your protocol and save......

zungyiu
昨天
1
0
python里求解物理学上的双弹簧质能系统

物理的模型如下: 在这个系统里有两个物体,它们的质量分别是m1和m2,被两个弹簧连接在一起,伸缩系统为k1和k2,左端固定。假定没有外力时,两个弹簧的长度为L1和L2。 由于两物体有重力,那么...

wangxuwei
昨天
0
0
apolloxlua 介绍

##项目介绍 apolloxlua 目前支持javascript到lua的翻译。可以在openresty和luajit里使用。这个工具分为两种模式, 一种是web模式,可以通过网页使用。另外一种是tool模式, 通常作为大规模翻...

钟元OSS
昨天
2
0
Mybatis入门

简介: 定义:Mybatis是一个支持普通SQL查询、存储过程和高级映射的持久层框架。 途径:MyBatis通过XML文件或者注解的形式配置映射,实现数据库查询。 特性:动态SQL语句。 文件结构:Mybat...

霍淇滨
昨天
2
0
开发技术瓶颈期,如何突破

前言 读书、学习的那些事情,以前我也陆续叨叨了不少,但总觉得 “学习方法” 就是一个永远在路上的话题。个人的能力、经验积累与习惯方法不尽相同,而且一篇文章甚至一本书都很难将学习方法...

_小迷糊
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部