Java并发编程之CyclicBarrier源码分析

原创
2019/01/14 10:58
阅读数 58

CyclicBarrier介绍

CyclicBarrier是JDK1.5提供允许一组线程等待彼此都达到一个共同的障碍点的同步的工具。CyclicBarrier适用于固定大小线程池,可以设置一个Runnable任务,当各线程达到共同的障碍点时触发这个任务。

例子

	//创建线程池
	private static ExecutorService executorService =  Executors.newFixedThreadPool(10);
	//创建屏障
	static CyclicBarrier cb = new CyclicBarrier(10,new Runnable() {
		public void run() {
			System.out.println("到达屏障");
		}
	});
	public static void main(String[] args)  {
	//提交任务
		for (int i = 0; i < 10; i++) {
			executorService.submit(new Runnable() {
				@Override
				public void run() {
					try {
						cb.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			});
		}
	}

如上例子创建线程为10的固定线程池,创建值为10的屏障,并设置一个Runnable任务。运行main方法提交任务当任务提交到10的时候到达屏障点,会运行Runnable任务并输出"到达屏障"。

特点

  • (1)可重用,正常情况下可重复使用,中断情况下在调用reset()后重复使用
  • (2)可中断,运行过程中可中断执行
  • (3)支持配置Runnable任务,当达到障碍点时可触发

与CountDownLatch比较

  • 相同点:都是同步屏障,都可以中断
  • 不同点: CyclicBarrier到达屏障后唤醒全部线程;而CountDownLatch到达屏障后是一个一个传播唤醒。CyclicBarrier支持配置Runnable任务CountDownLatch不支持;CyclicBarrier可重用CountDownLatch不可重用

CyclicBarrier原理分析

CyclicBarrier是利用ReentrantLock和Condition对扣减屏障值操作进行加锁,加锁后释放锁然后阻塞直到屏障值为0被唤醒。

下面来看下CyclicBarrier的成员变量

 private final ReentrantLock lock = new ReentrantLock();

对扣减屏障值操作进行加锁用。

 private final Condition trip = lock.newCondition();

对扣减屏障值操作后阻塞线程用,

private final int parties;

屏障值最大值,不可修改。

private final Runnable barrierCommand;

各线程到达屏障执行的任务。

private Generation generation = new Generation();

Generation类型对象,此类型里成员只有一个boolean类型的变量,作用是判断屏障是否被打破。

private int count;

屏障值,操作扣减用。

await()源码分析

await()源码如下

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

直接调用dowait(false, 0L)方法,第一个参数表示是否支持等待超时,第二个参数表示超时时长。await()不需要超时这里传了false和0L。因为await()不需要超时TimeoutException这个异常也不可能发生。

下面来看下dowait(false, 0L)的源码

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
		//加锁
        lock.lock();
        try {
            final Generation g = generation;
		//1.如果屏障被打破抛出屏障打破异常
            if (g.broken)
                throw new BrokenBarrierException();
		//2.如果当前线程被中断抛出中断异常
            if (Thread.interrupted()) {
		//3.打破屏障
                breakBarrier();
                throw new InterruptedException();
            }
		//4.屏障值减一
            int index = --count;
		//5.如果减一以后屏障值等于0,就要唤醒所有的阻塞线程
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
		//6.是否配置了任务,如果配置了则执行
                    if (command != null)
                        command.run();
                    ranAction = true;
		//7.如果任务正常运行结束,所有的阻塞线程,并重置屏障值
                    nextGeneration();
                    return 0;
                } finally {
		//8.如果任务运行出现异常,则打破屏障
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
			//9.如果不支持等待超时,则调用await()一直等待
                    if (!timed)
                        trip.await();
			//10.如果支持等待超时,则等待nanos时间
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
				//11.如果线程被中断。如果g == generation不成立说明当前线程已经被唤醒,这里说明还没被唤醒的中断就要打破屏障,否则就标记中断让上层处理。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
		//11.如果已经打破屏障,抛出BrokenBarrierException异常
                if (g.broken)
                    throw new BrokenBarrierException();
		//12.g != generation成立说明已经被激活,这里正常结束
                if (g != generation)
                    return index;
		//13.如果超时,则打破屏障抛出超时异常。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
		//释放锁
            lock.unlock();
        }
    }

dowait(false, 0L)主要逻辑就是将屏障值count减1,然后进入等待,直到count等于0到达屏障点被唤醒。 这里需要注意的是如果一个线程打破屏障,则所有的线程都会被打破抛出BrokenBarrierException异常,并且屏障被打破后如果想继续使用必须调用reset()方法重置。

下面来看下breakBarrier()方法

    private void breakBarrier() {
	//设置打破屏障状态
        generation.broken = true;
	//将count设置成原来的值
        count = parties;
	//唤醒所有其他线程
        trip.signalAll();
    }

breakBarrier()就是设置打破屏障状态为ture,然后唤醒所以其他阻塞线程,其他阻塞唤醒后会抛出BrokenBarrierException异常。

下面来看下nextGeneration()方法

 private void nextGeneration() {
        // 唤醒所有其他线程
        trip.signalAll();
        // 将count设置成原来的值
        count = parties;
		//初始化屏障状态
        generation = new Generation();
    }

nextGeneration()跟breakBarrier()类似,但是nextGeneration()是重新初始化屏障状态的,所以调用这个方法后CyclicBarrier可重用。

reset()源码分析

reset()源码如下

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
		//打破屏障
            breakBarrier();   // break the current generation
		//重置CyclicBarrier状态
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

reset()很简单,先打破屏障,终止各线程等待状态使其他线程抛出BrokenBarrierException异常,然后重置CyclicBarrier状态,使其可重用。这里官方推荐不要重用,重新创建一个CyclicBarrier使用,官方给的原因也比较含糊。

展开阅读全文
打赏
0
0 收藏
分享
加载中
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部