《今天面试了吗》- 并发编程之AQS同步工具类

2020/09/01 08:30
阅读数 45

点击上方 匠心Java选择 设为星标

优质文章,及时送达


前言

上次面试中问到AQS简直不要太痛苦,全是问的源码。但是源码有时间还是要看看的,毕竟对于提升我们的写代码的能力还是有帮助的。今天的面试紧接上回的AQS,内容是基于AQS实现的四大并发工具类:CyclicBarrier,CountDownLatch,Semaphore和Exchanger,简要分析实现原理,着重讲述如何使用。

面试环节

面试官:上次聊到AQS,你在开发过程中用过AQS的几个工具类吗?比如CyclicBarrier...

我: 用过,CyclicBarrier是一个同步辅助类。它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序里,这些线程必须不时的互相等待,此时CyclicBarrier 很有用。因为CyclicBarrier在释放等待线程后可以重用,因此成为循环的屏障。 下面来看下CyclicBarrier的定义:
private final ReentrantLock lock = new ReentrantLock();
    
private final Condition trip = lock.newCondition();
    
//parties变量表示拦截线程的总数量,count变量表示拦截线程的剩余需要数量
private final int parties;
    
//barrierCommand变量为CyclicBarrier接收的Runnable命令,用于在线程到达屏障时,优先执行barrierCommand,用于处理更加复杂的业务场景。
private final Runnable barrierCommand;
    
//generation变量表示CyclicBarrier的更新换代
private Generation generation = new Generation();
可以看出CyclicBarrier内部是使用重入锁和Condition的。它有两个构造函数:
/**
    创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程执行。
    */

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    /**
    创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier时执行预定义的操作。
    */

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

面试官:那CyclicBarrier是怎么让线程到达屏障后处于等待状态的呢?

我:使用await()方法,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。当所有线程都到达了屏障,结束阻塞,所有线程可继续执行后续逻辑。

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException
{
        return dowait(true, unit.toNanos(timeout));
    }


    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException
{
        //获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //分代
            final Generation g = generation;

            //当前generation已损坏,抛出BrokenBarrierException异常
            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程中断,终止CyclicBarrier
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            //进来一个线程,count-1
            int index = --count;
            //如果count==0表示所有线程均已到达屏障,可以触发barrierCommand任务
            if (index == 0) { // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒所有等待线程,并更新generation
                    nextGeneration();
                    return 0;
                } finally {
                    //如果barrierCommand执行失败,终止CyclicBarrier
                    if (!ranAction)
                        breakBarrier();
                }
            }

        
            for (;;) {
                try {
                    //如果不是超时等待,则调用Condition.await()方法等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        //如果是超时等待,则调用Condition.awaitNanos()等待
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                //generation已经更新,返回Index
                if (g != generation)
                    return index;
                //超时等待并且时间已经到了,终止CyclicBarrier,并抛出超时异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放锁
            lock.unlock();
        }
如果该线程不是到达的最后一个线程,则它会一直处于等待状态,除非发生以下情况:
1、最后一个到达:即index=0
2、超出了等待时间。
3、其他的某个线程中断当前线程。
4、其他某个线程中断另一个等待的线程。
5、其他某个线程在等待barrier超时。
6、其他某个线程在此barrier调用reset方法,用于将该屏障置为初始状态。

面试官: 那CyclicBarrier什么场景下用呢?
我: CyclicBarrier适用于多线程合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。举个例子:
public class CyclicBarrierTest {

    private static CyclicBarrier cyclicBarrier;

    private static final Integer THREAD_COUNT = 10;

    static class CyclicBarrierThread implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"到教室了");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {
            @Override
            public void run() {
               System.out.println("同学们都到齐了,开始上课吧...");
            }
        });

        for (int i=0; i< THREAD_COUNT; i++) {
            Thread thread = new Thread(new CyclicBarrierThread());
            thread.start();
        }

    }
}
运行结果如下:

面试官:有一个和CyclicBarrier类似的工具类叫CountDownLatch,你能说下吗?

我:CyclicBarrier描述的是“允许一组线程相互等待,直到到达某个公共屏障点,才会进行后续任务”,而CountDownLatch所描述的是“在完成一组正在其他线程中执行的操作之前,它允许 一个或多个线程一直等待”。在API中是这样描述的:用给定的计数初始化CountDownLatch。由于调用了countDown方法,所以在当前计数到达零之前,await方法会一直受阻塞。之后,会释放 所有等待的线程,await的所有后续调用都将立即返回。这种现象只出现一次(计数无法被重置。如果需要重置计数,请考虑使用CyclicBarrier)

CountDownLatch是通过一个计数器来实现的,当我们在new一个CountDownLatch对象的时候,需要传入计数器的值,该值表示线程的数量。每当一个线程完成自己的任务后,计数器的值就会 减一。当计数器的值变为0时,就表示所有线程均已完成任务,然后就可以恢复等待的线程继续执行了。
CountDownLatch和CyclicBarrier还是有一点区别的:
1、CountDownLatch的作用是允许1或多个线程等待其他线程完成执行;而CyclicBarrier则是允许多个线程互相等待。
2、CountDownLatch的计数器无法被重置。CyclicBarrier的计数器可以被重置后使用。

面试官:你能说下CountDownlatch是怎么实现的吗?

我:CountDownlatch内部依赖Sync实现,而Sync继承AQS。如下图:

CountDownlatch仅提供了一个构造方法,如下:

public class CyclicBarrierTest {

    private static CyclicBarrier cyclicBarrier;

    private static final Integer THREAD_COUNT = 10;

    static class CyclicBarrierThread implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"到教室了");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {
            @Override
            public void run() {
               System.out.println("同学们都到齐了,开始上课吧...");
            }
        });

        for (int i=0; i< THREAD_COUNT; i++) {
            Thread thread = new Thread(new CyclicBarrierThread());
            thread.start();
        }

    }
}

再来看看Sync,是CountDownlatch的一个内部类。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        //获取同步状态
        int getCount() {
            return getState();
        }

        //尝试获取同步状态
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        //尝试释放同步状态
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
CountDownLatch内部通过共享锁实现:
1、在创建CountDownLatch实例时,需要传递一个int型参数:count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。
2、当某个线程调用await()方法,程序首先判断count的值是否为0,如果不为0的话,则会一直等待直到为0为止。
3、当其他线程调用countDown()方法时,则执行释放共享锁状态,使count-1。
4、注意CountDownLatch不能回滚重置。

面试官:那你说下CountDownLatch是怎么用的?

我:
1、CountDownlatch提供了await()方法,来使当前线程在锁存器递减倒数至0以前一直等待,除非线程被中断,当前线程可以是我们的一个主线程。2、CountDownlatch提供了countDown()方法,在子线程执行完后进行操作,递减锁存器的计数,如果计数到达0,则唤醒所有等待的线程(我们的主线程)。说完我拿起笔刷刷的写起来:
public class CountDownLatchTest {

    private static final Integer STUDENT_COUNT = 10;

    private static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT);

    static class TeacherThread implements Runnable {
        @Override
        public void run() {
            System.out.println("老师来了,等"+ STUDENT_COUNT+"位同学都到教室了才开始上课");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(STUDENT_COUNT+"位同学都到齐了,开始上课!");
        }
    }

    static class StudentThread implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"进了教室");
            countDownLatch.countDown();
        }
    }

    public static void main(String [] args) {
        Thread teacher = new Thread(new TeacherThread());
        teacher.start();
        for (int i=0; i<STUDENT_COUNT; i++) {
            Thread student = new Thread(new StudentThread());
            student.start();
        }
    }
}

面试官:很好。懂得活学活用。你了解信号量Semaphore吗?

我: 信号量Semaphore是一个控制访问多个共享资源的计数器,和CountDownLatch一样,其本质上是一个“共享锁”。在API是这么介绍信号量的:一个计数信号量,从概念上讲,信号量维护了一个许可集。
1、如有必要,在许可可用前会阻塞每一个acquire,然后再获取该许可。
2、每个release添加一个许可,从而可能释放一个正在阻塞的获取者。但是不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动。
下面以一个停车场的例子来阐述Semaphore:
1、假设停车场有5个停车位,一开始车位都空着,然后先后来了三辆车,车位够,安排进去停车,然后又来三辆,这个时候由于只有两个车位,所以只能停两辆,有一辆需要在外面候着,直到 停车场有空位。
2、从程序角度讲,停车场就相当于信号量Semaphore,其中许可数为5,车辆相当于线程,当来一辆车,许可数就会减1。当停车场没车位了(许可数==0),其他来的车辆必须等待。如果 有一辆车开车停车场,则许可数+1,然后放进来一辆车。

从上面的分析可以看出:信号量Semaphore是一个非负整数(>=1)。当一个线程想要访问某个共享资源时,它必须先获取Semaphore。当Semaphore>0时,获取该资源并使Semaphore-1。如果Semaphore的值==0,则表示全部的共享资源已经被线程全部占用,新来的线程必须等待其他线程释放资源。当线程释放资源时,Semaphore则+1。

面试官:你能用Semaphore实现这个停车的例子吗?

我(又刷刷的写起来)

public class SemaphoreTest {

    static class Parking {

        private Semaphore semaphore;

        Parking(int count) {
            semaphore = new Semaphore(count);
        }

        public void park() {
            try {
                //获取信号量
                semaphore.acquire();
                long time = (long) (Math.random()*10+1);
                System.out.println(Thread.currentThread().getName()+"进入停车场停车,停车时间:"+time+"秒");
                //模拟停车时间
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName()+"开出停车场...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放信号量(跟lock的用法差不多)
                semaphore.release();
            }
        }
    }
    
    static class Car implements Runnable{

        private Parking parking;

        Car(Parking parking) {
            this.parking = parking;
        }

        /**
         * 每辆车相当于一个线程,线程的任务就是停车
         */

        @Override
        public void run()
{
            parking.park();
        }
    }

    public static void main(String [] args) {
        //假设有3个停车位
        Parking parking = new Parking(3);

        //这时候同时来了5辆车,只有3辆车可以进去停车,其余2辆车需要等待有空余车位之后才能进去停车。
        for (int i=0; i<5; i++) {
            Thread thread = new Thread(new Car(parking));
            thread.start();
        }
    }
}

运行结果:

面试官: 很好,那我再问你一个,Exchanger交换器知道不?

我:Exchanger是一个同步器,字面上就可以看出这个类的主要作用是交换数据。Exchanger有点类似CyclicBarrier,前面说到CyclicBarrier是一个栅栏,到达栅栏的 线程需要等待一定数量的线程到达后,才能通过栅栏。Exchanger可以看成是一个双向的栅栏。线程1到达栅栏后,会首先观察有没有其他线程已经到达栅栏,如果没有就会等待。如果已经有其他线程(比如线程2)到达了,就会以成对的方式交换各自携带的信息,因此Exchanger非常适合两个线程之间的数据交换。 如下图:

面试官:那你能跟我举个例子说下Exchanger怎么用吗?

我: 当然可以。
public class ExchangerTest {

    static class ThreadA implements Runnable {
        private Exchanger<String> exchanger;

        ThreadA (Exchanger<String> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                //模拟业务代码
                Long time = (long)(Math.random()*10+1)*10;
                System.out.println("线程A等待了"+time+"秒");
                Thread.sleep(time);
                //线程间数据交换
                System.out.println("在线程A得到线程B的值:"+ exchanger.exchange("我是线程A"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadB implements Runnable {
        private Exchanger<String> exchanger;

        ThreadB(Exchanger<String> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                //模拟业务代码
                Long time = (long)(Math.random()*10+1)*10;
                System.out.println("线程B等待了"+time+"秒");
                Thread.sleep(time);
                //线程间数据交换
                System.out.println("在线程B得到线程A的值:"+ exchanger.exchange("我是线程B"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        //线程A和线程B要使用同一个exchanger才有用
        Thread threadA = new Thread(new ThreadA(exchanger));
        Thread threadB = new Thread(new ThreadB(exchanger));
        threadA.start();
        threadB.start();
    }
}

运行结果:



-END-

如果看到这里,说明你喜欢这篇文章,请 转发、点赞。同时 标星(置顶)本公众号可以第一时间接受到博文推送。

喜欢文章,点个在看 

本文分享自微信公众号 - 匠心Java(code-the-world)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部