JDK中的常用并发工具类详解 - 疫情不断,学习不断

原创
2020/02/21 21:37
阅读数 1.2K

Java中的并发工具类

疫情不断,学习不断,今日主要学习如下:

JDK并发包下几个常用并发工具类

Java中的并发工具类 1、CountDownLatch:2、CyclicBarrier:3、Semaphore:4、Exchanger:

1、CountDownLatch:

  • 允许一个或多个线程等待其他线程完成操作之后,再继续执行。用来协调多个线程之间的同步(线程间的通信)。

  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

  • CountDownLatch的计数只能使用一次,不会自动重新开始。

  • 源码解读:

    构造器:
    //  count 为计数器
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    方法:
    //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
    public void await() throws InterruptedException { };   
    //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
    //将count值减1
    public void countDown() { };  
  • 使用场景:

    1)、作为一个开关,N个子线程准备就绪,等待主线程的放行,然后子线程一起开始执行。

    class  Task implements Runnable {
        private CountDownLatch countDownLatch;
        private Integer count;
        public Task(CountDownLatch countDownLatch,Integer count) {
            this.countDownLatch = countDownLatch;
            this.count = count;
        }
        @Override
        public void run() {
            try {
                System.out.println("线程:"+count+"等待开始");
                countDownLatch.await();
                System.out.println("线程:"+count+"执行完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class CountDownLatchDemo {
        @Test
        public void countDownLatchAwaitTest() throws InterruptedException {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            new Thread(new Task(countDownLatch,1)).start();
            new Thread(new Task(countDownLatch,2)).start();
            Thread.sleep(1000);
            System.out.println("主线程执行完毕,放行");
            countDownLatch.countDown();
            System.out.println("主线程执行完毕");
        }
    }

    2)、主线程等待N个子线程的处理结果,子线程都完成后,主线程继续执行。

        @Test
        public void countDownLatchTest() {
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            new Thread(()->{
                System.out.println("1");
                countDownLatch.countDown();
                System.out.println("2");
            }).start();
            new Thread(()->{
                System.out.println("3");
                countDownLatch.countDown();
                System.out.println("4");
            }).start();
            try {
                countDownLatch.await();
                System.out.println("5");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

2、CyclicBarrier:

  • 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门(处理并发测试常用)

  • 一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务

  • 源码解读

        构造器
        // barrierAction 最后一个到达线程要做的任务
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
        // parties 线程的个数
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
        方法
        // 线程调用 await() 表示自己已经到达栅栏
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
  • 例子

    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            int threadNum = 5;
            CyclicBarrier barrier = new CyclicBarrier(threadNum, ()->{
                System.out.println(Thread.currentThread().getName() + " 完成最后任务");
            });
            for(int i = 0; i < threadNum; i++) {
               new Thread(()->{
                   try {
                       String name  = Thread.currentThread().getName();
                       Thread.sleep(1000);
                       System.out.println(name + " 到达栅栏 A");
                       barrier.await();
                       System.out.println(name + " 冲破栅栏 A");
                       Thread.sleep(2000);
                       System.out.println(name + " 到达栅栏 B");
                       barrier.await();
                       System.out.println(name + " 冲破栅栏 B");
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }).start();
            }
        }
    
    }
    

CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的

  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

3、Semaphore:

  • 信号量,用来控制同时访问特定资源的线程数量。它通过协调各个线程,以保证合理使用公共资源。

  • 可用于流量控制,限制最大的并发访问数(如数据库连接池)。。

  • 源码解读

    构造器
    // permits 表示许可线程的数量
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    方法 
    // 阻塞并获取许可
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    // 释放许可
    public void release() {
        sync.releaseShared(1);
    }
    ...
  • 例子

    public class SemaphoreDemo {
        public static void main(String[] args) {
            int threadNum = 5;
            Semaphore semaphore = new Semaphore(2);
            for (int i = 0; i < threadNum; i++) {
                new Thread(() -> {
                    String name  = Thread.currentThread().getName();
                    try {
                        semaphore.acquire();
                        System.out.println(name + " acquire");
                        Thread.sleep(1000);
                        semaphore.release();
                        System.out.println(name + " release ");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

4、Exchanger:

  • 交换者,用于线程间的数据交换

  • 使用场景如,A、B岗录入相同的数据,对A、B刚录入的数据进行校验

  • 例子

    public class ExchangerDemo {
        private static final Exchanger<String> exc = new Exchanger<>();
        public static void main(String[] args) {
            new Thread(()->{
                String name  = Thread.currentThread().getName();
                String A = "银行流水A";
                try {
                    String B = exc.exchange(A);
                    System.out.println("A:"+name+":"+ B);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            new Thread(()->{
                String B = "银行流水B";
                try {
                    String name  = Thread.currentThread().getName();
                    String A = exc.exchange(B);
                    System.out.println("B:"+name+":"+ A);
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    结果:
    B:Thread-1:银行流水A
    A:Thread-0:银行流水B
展开阅读全文
JDK
加载中

作者的其它热门文章

打赏
0
1 收藏
分享
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部