文档章节

CyclicBarrier、CountDownLatch以及Semaphore使用及其原理分析

申文波
 申文波
发布于 06/23 22:34
字数 2500
阅读 193
收藏 8

CyclicBarrier、CountDownLatch以及Semaphore是Java并发包中几个常用的并发组件,这几个组件特点是功能相识很容易混淆。首先我们分别介绍这几个组件的功能然后再通过实例分析和源码分析其中设计原理。

CyclicBarrier

主要功能:

CyclicBarrier的主要功能是使1~(N-1)个线程达到某个屏障后阻塞,直到第N个线程到达该屏障后才会被打开,这是所有的线程才能继续执行下去,CyclicBarrier同时支持一个Runable对象,当所有线程到达该屏障时执行该Runable对象。

用例:

package com.github.wenbo2018.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @author : wenbo.shen
 * @date : 2018/6/22
 */
public class CyclicBarrierUsing {

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new TestThread().start();
        }
    }

    static class TestThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "达到屏障");
            try {
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "离开屏障");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

}

输出:

Thread-0达到屏障
Thread-2达到屏障
Thread-1达到屏障
Thread-1离开屏障
Thread-2离开屏障
Thread-0离开屏障

可以看到三个线程同时达到屏障后所有线程才开始离开屏障继续运行。下面我们将分析其设计原理。

设计原理

CyclicBarrier调用await()方法是线程等待,await()方法源码如下:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

其内部调用的是doWait()方法,await()还有一个带超时的重载方法,功能类似。doWait()方法代码如下:

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //获取锁,显然每次只有一个线程能获取到对象的锁,下面这段代码每次只能被一个线程执行
        lock.lock();
        try {
            //判断是否处于下一代,默认g.broken=false;
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程被中断调用breakBarrier退出屏障并抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //减少线程达到屏障线程数
            int index = --count;
            //如果所有线程到达屏障,唤醒其它线程继续执行
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //获取需要执行的Runnable对象,如果不为null则执行run方法
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    //设置执行方法完成
                    ranAction = true;
                    //通知其它线程继续执行并重置下一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //如果还有其它线程没有到达屏障将执行下面循环
            for (;;) {
                try {
                    //是否是超时等待,不是超时等待立马调用trip.await(),trip是Condition,调用await将会是线程阻塞,否则调用带有超时时间的awaitNanos(nanos)(超时时间大于0的情况下)
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                //如果设置了超时且过了超时时间,查看当前代是否被破坏,破坏抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //不是当前代返回
                if (g != generation)
                    return index;
                //设置了超时且超时时间小于0,设置当前代被破坏同时唤醒其它线程并抛出超时异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

Generation是一个静态内部类,表明CyclicBarrier的代,表明每个CyclicBarrier执行的实例,如果当前CyclicBarrier正常执行完将会重置代,否则将会破坏代。

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

breakBarrier方法会破坏屏障,可以看到起设置了代为破坏状态同时调用Condition的signalAll方法唤醒所有在等待的线程。

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

nextGeneration主要作用为重置下一代,内部也会唤醒正在等待的线程同时将屏障数量复位方便下一次使用。

CountDownLatch

CountDownLatch的主要功能是实现几个计数器,使N个现场执行完成后当前线程才会继续执行下去。比如我们希望将一个事件分成多个线程去执行,执行完后进行汇总这种情景就可以使用CountDownLatch。

用例

package com.github.wenbo2018.concurrent;

import java.util.concurrent.CountDownLatch;

/**
 * Created by shenwenbo on 2018/6/23.
 */
public class CountDownLatchUsing {

    private static CountDownLatch countDownLatch = new CountDownLatch(5);


    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            new Testthread().start();
        }
        countDownLatch.await();
        System.out.println("主线程执行");

    }


    static class Testthread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "执行完毕");
            countDownLatch.countDown();
        }
    }

}

输出:

Thread-0执行完毕
Thread-1执行完毕
Thread-2执行完毕
Thread-3执行完毕
Thread-4执行完毕
主线程执行

原理分析

构造方法

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

CountDownLatch构造函数中调用了Sync构造方法,Sync继承了AQS内容如下:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

设置了state,这个东西再熟悉不过了,在可重入锁中表示是否获取到锁的标志位。

我们首先看await方法,

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

实际调用的是AQS的acquireSharedInterruptibly方法,从名字可以看出采用的是共享模式。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程被中断,直接跑出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试共享模式获取节点
        if (tryAcquireShared(arg) < 0)
            //失败则进一步执行获取节点
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared代码在Sync中被实现如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

如果state等于0返回1,否则返回-1;state等于0说明没有对象在同步器中,线程可以继续执行下去,否则进入doAcquireSharedInterruptibly方法中,doAcquireSharedInterruptibly方法如下:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这段代码就是共享模式获取节点,获取不到就进入队列中休眠,这个跟读写锁一样,知道state等于0后被唤醒。

countDown方法如下:

public void countDown() {
     sync.releaseShared(1);
}

调用的是Sync的releaseShared方法,

public final boolean releaseShared(int arg) {
        //尝试释放节点,函数被重写在Sync中
        if (tryReleaseShared(arg)) {
            //释放共享节点
            doReleaseShared();
            return true;
        }
        return false;
    }
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            //循环执行,减少state
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

countDown方法的主要功能就是通过CAS方法减少state的值,减少成功后唤醒队列中的节点。唤醒主节点成功后doAcquireSharedInterruptibly中方法会继续执行接着判断state是否等于0,不等与继续休眠否则继续执行线程。

Semaphore

Semaphore可以控制访问线程的数量。

用例:

package com.github.wenbo2018.concurrent;

import java.util.concurrent.Semaphore;

/**
 * Created by wenbo.shen on 2018/5/6.
 */
public class SemaphoreUseing {

    private static Semaphore semaphore = new Semaphore(1);

    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(count);

    }

}

原理分析

首先看构造方法,

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

默认构造方法采用非公平模式。

acquire方法如下:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

很熟悉默认采用的是共享模式获取节点信息,跟读锁类似。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取共享获取节点,返回结果小于0说明有超过线程正在访问,需要对线程进行休眠。
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //获取当前状态
                int available = getState();
                //设置剩下的值,如果剩余值小于0或者case设置成功返回剩余值
                int remaining = available - acquires;
                if (remaining < 0 ||
                    //cas设置state
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

 

release方法如下:

public void release() {
    sync.releaseShared(1);
}

acquireSharedInterruptibly方法如下:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared方法如下:

    public final boolean releaseShared(int arg) {
        //尝试释放节点,在子类中被重写,释放成功后唤醒正在休眠的线程
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

 

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取当前state值
                int current = getState();
                //恢复当前值
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

 

总结

CyclicBarrier主要用于N个线程之间互斥,当且仅当N个线程都执行到屏障处所有线程才能继续执行下去,CyclicBarrier可以被重复使用,CyclicBarrier通过可冲入锁+AQS+Condition实现,CyclicBarrier调用await方法获取可重入锁同时减少state的值,state==0时唤醒所有正在等待的线程,否则线程处于等待状态,线程间的通信主要通过Condition机制来实现。

CountDownLatch主要用于某个线程等待N个线程执行完后等待的线程接着继续执行下去,不能够重复执行,CountDownLatch通过设施AQS state值来实现,每次调用counDown方法后都去唤醒正在等待的线程,等待的线程判断state是否等于0,等于0就继续执行。

Semaphore用于控制访问线程的数量,Semaphore通过设置AQS state值来实现,调用require方法后cas减少state的值,如果state值为负数说明有更多线程正在访问代码块,这是后需要把这些线程休眠,调用release方法后重新增加state值,重新增加state值后去唤醒正在等待的线程。

© 著作权归作者所有

共有 人打赏支持
申文波
粉丝 15
博文 43
码字总数 46529
作品 0
长宁
程序员
[Java并发系列] 5.Java并发工具类

在J.U.C包中,提供了几个非常有用的并发工具类,通过使用这些工具类,可以有效提高并发编程中,并发流程的控制,以提升效率和代码质量,如下: CountDownLatch CyclicBarrier Semaphore 1. ...

Xiangdong_She
2017/10/27
0
0
并发十二:CountDownLatch、CyclicBarrier、Semaphore实现分析

J.U.C中提供了三个同步工具CountDownLatch、CyclicBarrier、Semaphore,都是共享锁的特殊应用,用来进行线程的任务协调。 CountDownLatch 一个小栗子: 输出:"Thread-0:二级表生成、Thread-...

wangjie2016
04/14
0
0
Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore   在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们...

明舞
2015/09/01
0
0
Semaphore CountDownLatch CyclicBarrier 源码分析

java5 中 ,提供了几个并发工具类 ,Semaphore CountDownLatch CyclicBarrier,在并发编程中非常实用。前两者通过 内部类sync 继承AQS,使用共享资源的模式,AQS的实现可参考我的另一篇 AQS ...

ovirtKg
2016/10/19
18
0
学习笔记六:线程间的协作与通信之并发工具类

继续 上一篇 《 学习笔记五:线程间的协作与通信》 在jdk的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Excha...

刘祖鹏
07/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

AIX 系统 (资源)

AIX--zabbix agent的编译安装以及遇到的“坑” 荐 : http://blog.51cto.com/lythjq/2128354 AIX--操作系统安装(AIX 6.1) : http://blog.51cto.com/lythjq/2128119...

寰宇01
32分钟前
0
0
Spring 转换 model 为 json 时增加属性

缘起 目前的项目中有个需求是在附件对象转换成 json 时增加个 url 属性,以前的方式是在返回附件对象或列表时候做一次统一处理,这次想看看 spring 或者 jackson fasterxml 是否自带类似功能...

郁也风
32分钟前
0
0
10大PHP比特币开源项目

如果你是一个Phper,如果你希望学习区块链,那么本文列出的 10个开源的Php比特币项目,将有助于你了解在自己的应用中 如何加入对比特币的支持。 如果你希望快速掌握使用Php对接比特币钱包的方...

汇智网教程
今天
1
0
springclould feign客户端添加全局参数

用springclould feign作为调用服务的客户端,一般来说参数可以写在feignclient的方法参数里 有时需要所有feign请求都统一添加一些参数,例如token用于鉴权等,可以这样做: 添加一个配置类,...

canneljls
今天
1
0
win32截屏并rgb24转yuv420

//最终f的内存布局为BGRA格式,需要保证buf长度足够(>w*h*4)void ScreenCap(void* buf, int w, int h){ HWND hDesk = GetDesktopWindow(); HDC hScreen = GetDC(hDesk); ......

styleman
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部