文档章节

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

 彤哥读源码
发布于 06/28 21:41
字数 1834
阅读 125
收藏 6

问题

(1)CyclicBarrier是什么?

(2)CyclicBarrier具有什么特性?

(3)CyclicBarrier与CountDownLatch的对比?

简介

CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行。它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走。

使用方法

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                System.out.println("before");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after");
            }).start();
        }
    }
}    

这段方法很简单,使用一个CyclicBarrier使得三个线程保持同步,当三个线程同时到达cyclicBarrier.await();处大家再一起往下运行。

源码分析

主要内部类

private static class Generation {
    boolean broken = false;
}

Generation,中文翻译为代,一代人的代,用于控制CyclicBarrier的循环使用。

比如,上面示例中的三个线程完成后进入下一代,继续等待三个线程达到栅栏处再一起执行,而CountDownLatch则做不到这一点,CountDownLatch是一次性的,无法重置其次数。

主要属性

// 重入锁
private final ReentrantLock lock = new ReentrantLock();
// 条件锁,名称为trip,绊倒的意思,可能是指线程来了先绊倒,等达到一定数量了再唤醒
private final Condition trip = lock.newCondition();
// 需要等待的线程数量
private final int parties;
// 当唤醒的时候执行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 当前这一代还需要等待的线程数
private int count;

通过属性可以看到,CyclicBarrier内部是通过重入锁的条件锁来实现的,那么你可以脑补一下这个场景吗?

彤哥来脑补一下:假如初始时count = parties = 3,当第一个线程到达栅栏处,count减1,然后把它加入到Condition的队列中,第二个线程到达栅栏处也是如此,第三个线程到达栅栏处,count减为0,调用Condition的signalAll()通知另外两个线程,然后把它们加入到AQS的队列中,等待当前线程运行完毕,调用lock.unlock()的时候依次从AQS的队列中唤醒一个线程继续运行,也就是说实际上三个线程先依次(排队)到达栅栏处,再依次往下运行。

以上纯属彤哥脑补的内容,真实情况是不是如此呢,且往后看。

构造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
    // 初始化count等于parties
    this.count = parties;
    // 初始化都到达栅栏处执行的命令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

构造方法需要传入一个parties变量,也就是需要等待的线程数。

await()方法

每个需要在栅栏处等待的线程都需要显式地调用await()方法等待其它线程的到来。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 调用dowait方法,不需要超时
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        
        // 检查
        if (g.broken)
            throw new BrokenBarrierException();

        // 中断检查
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        
        // count的值减1
        int index = --count;
        // 如果数量减到0了,走这段逻辑(最后一个线程走这里)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果初始化的时候传了命令,这里执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 调用下一代方法
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 这个循环只有非最后一个线程可以走到
        for (;;) {
            try {
                if (!timed)
                    // 调用condition的await()方法
                    trip.await();
                else if (nanos > 0L)
                    // 超时等待方法
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            
            // 检查
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常来说这里肯定不相等
            // 因为上面打破栅栏的时候调用nextGeneration()方法时generation的引用已经变化了
            if (g != generation)
                return index;
            
            // 超时检查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
private void nextGeneration() {
    // 调用condition的signalAll()将其队列中的等待者全部转移到AQS的队列中
    trip.signalAll();
    // 重置count
    count = parties;
    // 进入下一代
    generation = new Generation();
}

dowait()方法里的整个逻辑分成两部分:

(1)最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration()方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。

(2)非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await()方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换“代”了,这时候返回。

图解

CyclicBarrier

学习过前面的章节,看这个图很简单了,看不懂的同学还需要把推荐的内容好好看看哦^^

总结

(1)CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走;

(2)CyclicBarrier不是直接使用AQS实现的一个同步器;

(3)CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑;

彩蛋

CyclicBarrier与CountDownLatch的异同?

(1)两者都能实现阻塞一组线程等待被唤醒;

(2)前者是最后一个线程到达时自动唤醒;

(3)后者是通过显式地调用countDown()实现的;

(4)前者是通过重入锁及其条件锁实现的,后者是直接基于AQS实现的;

(5)前者具有“代”的概念,可以重复使用,后者只能使用一次;

(6)前者只能实现多个线程到达栅栏处一起运行;

(7)后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见CountDownLatch那章使用案例);

推荐阅读

1、死磕 java同步系列之开篇

2、死磕 java魔法类之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己动手写一个锁Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

9、死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock源码解析

12、死磕 java同步系列之Semaphore源码解析

13、死磕 java同步系列之CountDownLatch源码解析

14、死磕 java同步系列之AQS终篇

15、死磕 java同步系列之StampedLock源码解析


欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

qrcode

© 著作权归作者所有

彤哥读源码

粉丝 268
博文 66
码字总数 163958
作品 0
深圳
私信 提问
死磕 java同步系列之Phaser源码解析

问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶...

彤哥读源码
10/01
30
0
死磕 java同步系列之CountDownLatch源码解析

问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以...

彤哥读源码
06/16
157
0
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
2018/07/22
0
0
死磕 java同步系列之ReentrantLock VS synchronized——结果可能跟你想的不一样

问题 (1)ReentrantLock有哪些优点? (2)ReentrantLock有哪些缺点? (3)ReentrantLock是否可以完全替代synchronized? 简介 synchronized是Java原生提供的用于在多线程环境中保证同步的...

彤哥读源码
06/11
2.1K
6
【死磕Sharding-jdbc】— 死磕 Sharding-jdbc 精品合集

死磕 Sharding-jdbc 是 【阿飞哥】的精心力作,花费 4 个月,总共输出 22 篇文章,全部都是关于 Sharding-jdbc 的原理解析和源码分析,通俗易懂。 下图是 【阿飞哥】 的公众号,欢迎各位关注...

飞哥-Javaer
2018/08/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

idea修改新的git地址

我们在项目变动中通常会遇到更换git地址情况,这里介绍一个在idea项目中简单更换git地址操作: 1、点击VCS; 2、点击Git; 3、点击Remotes; 具体步骤如图 4、点击框中链接即可在右边看到一个...

west_coast
28分钟前
6
0
将规则集传递给mixin

允许包装在mixin中定义的css块。 分离的规则集是一组CSS属性、嵌套规则集、媒体声明或者是存储在变量中的任何其他内容,我们可以将它包含在规则集中或其他结构中,并且所有属性都将复制到那里...

凌兮洛
30分钟前
4
0
玩转阿里云 Terraform(一):Terraform 是什么

从本文起,我将陆续推出一系列有关 Terraform 的文章,从概念,特点,工作机制,用法以及最佳实践等多个方面由浅入深的向大家介绍如何在阿里云上玩转 Terraform。同时也希望借此机会,与感兴...

阿里云官方博客
30分钟前
4
0
科研大数据面临的挑战

近几十年硬件的发展非常迅猛,第一台Macintosh苹果电脑的内存是128KB(0.13MB),现在很多笔记本配的是8GB的内存,硬盘1TB(1024GB),2TB的很常见。大型的数据服务器上还会有更大的储容量,...

英论阁学术院
31分钟前
5
0
python学习10.09:Python列表和元组的底层实现

有关列表(list)和元组(tuple)的底层实现,本节分别从它们的源码来进行分析。 首先来分析 list 列表,它的具体结构如下所示: typedef struct { PyObject_VAR_HEAD /* Vector o...

太空堡垒185
32分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部