文档章节

多线程-AQS-CyclicBarrier

重城重楼
 重城重楼
发布于 05/08 13:14
字数 1055
阅读 23
收藏 0

1、CyclicBarrier和CountDownLatch的区别
    CountDownLatch是闭锁,只能使用一次,而CyclicBarrier的计数器会重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

    CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
            PS:有一个线程broken了,整组broken;
                CyclicBarrier是基于独占锁和阻塞队列实现的,所以并发性能在基因上就有缺陷,应对高并发场景时应谨慎考虑是否使用

    CountDownLatch允许一个或多个线程等待一组事件完成而继续,而CyclicBarrier允许一个事件等待一个或多个线程完成而继续。
    --------------------- 

2、CountDownLatch是使用AQS框架共享锁实现的同步,队列采用的sync同步FIFO队列
     CyclicBarrier是使用AQS框架独占锁实现的同步,队列采用了condition阻塞block队列--详见AQS-condition阻塞队列
     基于AQS框架的解读,本次正好将AQS的阻塞队列的模式补上。始于栅栏,始于源码
3、源码:

package com.ysma.test;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.*;

/**
 * 在最后一个线程抵达并且其他线程也都抵达或者broken了的时候,整个阻塞就盘活了,不在阻塞
 * @since 1.5
 * @see CountDownLatch
 * @author Doug Lea  又是这哥们写的,保留这个注释
 */
public class CyclicBarrier {

    /**一个栅栏就是一代,Generation变化一次就代表栅栏完成了一次*/
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    private int count;

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // 唤醒通知上一代已经完成
        trip.signalAll();
        // 重置计数器开启新时代
        count = parties;
        generation = new Generation();
    }

    /**
     * 设置当前代中断,唤醒所有
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;//因故中断,标识一下
        count = parties;//重置计数器,唤醒所有阻塞线程;
        trip.signalAll();//PS:并没有开启新时代!
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//获取或等待获取资源,ysma-1
        try {
            final Generation g = generation;

            if (g.broken)//任一broken则break所有
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {//获取资源后发现自己被中断了
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;//获取资源,计数器减一
           if (index == 0) {  // 达到临界点,执行barrierCommand,nextGeneration,结束=>放行所有线程
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//不设置超时,wait,释放cpu
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//wait指定时间
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {//发生异常,判断自己为第一个发起中断者
                        breakBarrier();
                        throw ie;
                    } else {//发生异常,自己非第一个中断者
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)//被唤醒后,检测中断标志broken
                    throw new BrokenBarrierException();

                if (g != generation)//如果栅栏已经开启了下一代,结束,放行
                    return index;

                if (timed && nanos <= 0L) {//被唤醒后,发现超时了,broken中断
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//解锁,释放资源
        }
    }

    /**构造器,略*/
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**构造器,略*/
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**获取资源/线程数*/
    public int getParties() {
        return parties;
    }

    /**不限制等待*/
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

    /**限时等待*/
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**
     * 查询栅栏是否已经broken了
     * PS:重入锁方式进入查看
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**重置
     * 重入锁方式进入,break栅栏,开启新时代
     * */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    /**获取还有多少资源没有就绪*/
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

 

© 著作权归作者所有

重城重楼
粉丝 4
博文 55
码字总数 40339
作品 0
南京
程序员
私信 提问
AbstractQueuedSynchronizer在工具类Semaphore、CountDownLatch、ReentrantLock中的应用和CyclicBarrier

在上篇文章本人粗略地整理了AbstractQueuedSynchronizer和ReentrantLock的源码要点。其实,在java.util.concurrent包中,AbstractQueuedSynchronizer的应用非常广泛,而不局限于在Reentrant...

pczhangtl
2013/11/18
78
0
多线程-AQS-CountDownLatch

介绍: CountDownLatch--发令枪 Java1.5之后引入的Java并发工具类,谈到CountDownLatch需要先介绍一个概念:闭锁/门栓[latch] latch:一种同步方法,可以延迟线程的进度直到线程到达某个终点...

重城重楼
04/15
7
0
死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个...

彤哥读源码
06/28
86
0
111 多线程JUC包下代码分析

Java多线程系列目录(共43篇) AtomicLongFieldUpdater:通过反射+CAS实现对传入对象的指定long字段实现类似AtomicLong的操作 http://www.cnblogs.com/skywang12345/p/javathreadscategory.ht...

素雷
2017/10/31
34
0
Semaphore CountDownLatch CyclicBarrier 源码分析

java5 中 ,提供了几个并发工具类 ,Semaphore CountDownLatch CyclicBarrier,在并发编程中非常实用。前两者通过 内部类sync 继承AQS,使用共享资源的模式,AQS的实现可参考我的另一篇 AQS ...

ovirtKg
2016/10/19
58
0

没有更多内容

加载失败,请刷新页面

加载更多

lopatkin俄大神Windows精简版系统 安装教程 简单版

1.制作U盘启动盘 或 安装pe到电脑 下载微pe工具箱.(为什么用这个呢,因为这个无毒,无广告,无后门.其它pe在安装完系统会安装一堆木马,垃圾软件,后门什么的) pe制作工具下载http://www.wepe.com...

xiaogg
28分钟前
4
0
【0917】Linux shell基础知识2

【0917】Linux shell基础知识2 8.7/8.8 shell变量 8.9 环境变量配置文件 8.10 shell特殊符号cut命令 8.11 sort_wc_uniq命令 8.12 tee_tr_split命令 8.13 shell特殊符号 一、shell变量 1、使用...

飞翔的竹蜻蜓
30分钟前
3
0
管理角色认知-新晋管理常常犯的错

背景 管理是一门实践科学,从知道到做到,需要长时间的刻意练习,提前知道那些坑,可以提前规避。 坑1:被动执行 现象: 不主动找活干,等上级派活; 上级有了安排,指望上级替他决定实现方案...

春天spring
32分钟前
4
0
MongoDB4.0.2集群搭建

MongoDB4.0.2集群搭建 2019.02.01 01:02 619浏览 MongoDB4.0.2集群搭建 根据对象存储平台Django+MongoDB+Ceph的需求,现搭建部署一个十节点的MongoDB集群,主要以下关键点: 根据最新版本Mon...

linjin200
35分钟前
5
0
面试官问你B树和B+树,就把这篇文章丢给他

原文链接:面试官问你B树和B+树,就把这篇文章丢给他 1 B树 在介绍B+树之前, 先简单的介绍一下B树,这两种数据结构既有相似之处,也有他们的区别,最后,我们也会对比一下这两种数据结构的区...

欧阳思海
39分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部