Java并发编程中级篇(五):更强大的多阶段并发控制Phaser

原创
2016/11/26 16:55
阅读数 737

Java API还提供了一个强大的同步辅助类Pahser,它可以控制多阶段并发辅助任务。当我们有并发任务,并且需要分阶段执行,每阶段都需要等待所有线程执行本阶段执行完毕才能够继续执行,这种机制就非常好用。Phaser类同样需要一个整形作为初始化参数来确定有几个线程参与执行。

下面我们来看一个例子,在这个例子中我们把一个并发任务分为三个阶段,每一阶段都需要所有线程完成后才能继续执行下一阶段的任务。

我们来定义一个带有Phaser机制的线程类PhaserRunnable,在线程开始运行的时候调用arriverAndAwaitAdvance()方法来代表线程已经进入执行状态,其实这个也可以算作一步,就是等待所有任务线程都启动后大家一起执行。然后开始执行第一步任务,每个线程随机休眠一段时间来模拟任务执行耗时,执行完毕后调用arriverAndAwaitAdvance()来表示任务执行完毕,然后等待其他线程,等所有线程都滴啊用arriverAndAwaitAdvance()方法后,所有休眠的线程都被唤醒然后继续执行第二步。也是随机休眠一段时间后,第二部执行完毕,但是这里有一个不同,如果休眠时间是一个双数那么线程将不再执行第三步操作而是直接返回,这里要调用phaser.arriveAndDeregister()方法来表示线程已经结束,以后不再需要等待我了。

public class PhaserRunnable implements Runnable{
    private Phaser phaser;

    public PhaserRunnable(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: start.\n", Thread.currentThread().getName());

        long duration1 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step1 start duration %d seconds.\n", Thread.currentThread().getName(), duration1);
        try {
            TimeUnit.SECONDS.sleep(duration1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: step1 done.\n", Thread.currentThread().getName());
        phaser.arriveAndAwaitAdvance();

        long duration2 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step2 start duration %d seconds.\n", Thread.currentThread().getName(), duration2);
        try {
            TimeUnit.SECONDS.sleep(duration2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (duration2 % 2 == 0) {
            System.out.printf("%s: step2 done and task finished.\n", Thread.currentThread().getName());
            phaser.arriveAndDeregister();
            return;
        } else {
            System.out.printf("%s: step2 done.\n", Thread.currentThread().getName());
            phaser.arriveAndAwaitAdvance();
        }

        long duration3 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step3 start duration %d seconds.\n", Thread.currentThread().getName(), duration3);
        try {
            TimeUnit.SECONDS.sleep(duration3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: step3 done.\n", Thread.currentThread().getName());
        phaser.arriveAndDeregister();
    }
}

主方法类中启动三个任务线程,如果第二步都是休眠单数秒你可以尝试多运行几次,最后打印Phaser状态。

public class Main {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread[] threads = new Thread[3];

        for (int i = 0; i < 3; i++) {
            threads[i] = new Thread(new PhaserRunnable(phaser));
            threads[i].start();
        }

        for (int i = 0; i < 3; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.printf("%s: Phaser terminated.\n", Thread.currentThread().getName());
    }
}

查看控制台日志: 

Thread-2: start.
Thread-1: start.
Thread-0: start.
Thread-2: step1 start duration 9 seconds.
Thread-0: step1 start duration 2 seconds.
Thread-1: step1 start duration 6 seconds.
Thread-0: step1 done.
Thread-1: step1 done.
Thread-2: step1 done.
Thread-2: step2 start duration 7 seconds.
Thread-0: step2 start duration 2 seconds.
Thread-1: step2 start duration 1 seconds.
Thread-1: step2 done.
Thread-0: step2 done and task finished.
Thread-2: step2 done.
Thread-2: step3 start duration 3 seconds.
Thread-1: step3 start duration 7 seconds.
Thread-2: step3 done.
Thread-1: step3 done.
main: Phaser terminated.

Phaser类有两种状态:Active和Termination。有任务参与的时候Phaser状态为Active;当所有参与同步的线程都结束后Phaser也就没有参与者了,这时Phaser进入了Termination态。当Phaser处于终止态,同步方法arriveAndAwaitAdvance()会立即返回。

Phaser类的一个重大特性就是不必对它的方法进行异常处理。不像其他通不辅助类,被Phaser类置于休眠状态的线程不会响应中断事件,也不会抛出InterruptedException异常。

Phaser类还提供了一些改变Phaser对象的方法,这些方法如下。

  • arriver():这个方法通知phaser对象一个参与者已经完成了当前阶段,但是他等待其他参与者都完成当前阶段。必须小心使用这个方法,因为他不会与其它线程同步。
  • awaitAdvance(int phase):如果传入的阶段参数与当前阶段一致,那么这个方法会将当前线程置于休眠,直到这个阶段的所有参与者都完成运行。如果传入阶段参数与当前阶段不一致,这个方法会立即返回。
  • awaitAdvanceInterruptibly(int phaser):这个方法跟awaitAdvance(int phase)一样,不同之处在于,如果这个方法中休眠的线程被中断,它将抛出InterruptedException异常。

Phaser类可以动态地改变参与线程的数量:

  • register():这个方法可以将一个新的线程注册到Phaser中,这个新的参与者被当成本阶段的任务来执行。
  • bulkRegister(int Parties):这个方法将指定数目的参与者注册到Phaser中,所有这些新的参与者都将被当成本阶段的任务来执行。

Phaser提供了一个方法forceTermination()方法来强制Phaser进入Termination状态,这个方法不管Phaser中是否还有注册的线程。当一个参与的线程出现错误,强制phaser终止是有意义的。当一个Phaser处于Termination状态的时候,awaitAdvance()和arriveAndAwaitAdvance()方法都立刻返回一个负数,这样就可以验证Phaser状态是不是终止了,可以根据这个状态来终止线程的执行,直接返回或者做一些处理,比如数据回滚什么的。

展开阅读全文
打赏
0
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
0
分享
返回顶部
顶部