Java并发编程中级篇(四):使用CyclicBarrier实现并发线程在集合点同步

原创
2016/11/26 12:54
阅读数 423

上一节讲到了CountDownLatch这个并发辅助类,它能够让一个线程等待其他并发线程执行完一组任务后再继续执行,也可以说是实现了并发线程在集合点同步。但是Java又给出了一个更强大的并发辅助类CyclicBarrier。

CyclicBarrier也使用一个整形参数进行初始化,这个参数是需要再某点同步的线程数。当线程调用await()方法后CyclicBarrier类将把这个线程编为WAITING状态,并等待直到其他线程都到达集合点。当最后一个线程到达集合点后,调用CyclicBarrier类的await()方法时,CyclicBarrieer对象将唤醒所有通过await()方法进入等待的线程。

CyclicBarrier与CountDownLatch不同的地方在于,CountDownLatch通过countDown()方法对计数器减1来标记一个线程已经到达集合点,并且这个线程不会阻塞会继续执行。而CyclicBarrier类则通过await()方法标记线程到达集合点,并且这个到达集合点的线程会被阻塞。另外CyclicBarrier还支持把一个Runnable对象作为一个初始化参数,当所有的线程都到达集合点的时候,这个线程会被启动。这非常类似以一个分治算法的实现,把一个大任务拆分成若干个子任务,并等待所有子任务结束后,输出执行结果。

下面我们用一个实例来演示如何使用CyclicBarrier来模拟使用分治算法在一个矩阵中查找一个数字出现的次数。

首先我们创建一个矩阵类,构造函数接受矩阵的维度以及需要查找的数字,并采用随机数的方式构建这个矩阵然后记录下来构建矩阵过程中这个待查找数字出现的次数。

public class MatrixMock {
    private int[][] data;

    public MatrixMock(int size, int length, int number) {
        int count = 0;
        data = new int[size][length];
        Random random = new Random();
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < length; j++) {
                int temp = random.nextInt(10);
                data[i][j] = temp;
                if (temp == number) {
                    count++;
                }
            }
        }
        System.out.printf("Mock: There are %d occurences of %d in generate\n", count, number);
    }

    public int[] getRow(int row) {
        if(row >= 0 && row < data.length) {
            return data[row];
        }
        return null;
    }
}

查找结果类,定义了每行中待查找数字出现的次数。

public class Results {
    int data[];

    public Results(int size) {
        data = new int[size];
    }

    public void setData(int position, int value) {
        data[position] = value;
    }

    public int[] getData() {
        return data;
    }
}

接下来我们创建查找任务线程,这个类接受几个参数作为构造方法。分别是MatrixMock(待查找矩阵),Results(查找结果),firstRow(此任务分配的开始查找位置),lastRow(此任务分配的结束查找位置),number(待查找数字),CyclicBarrier(并发辅助类)。然后我们从firstRow开始查找,直到lastRow结束查找每行中待查找数字出现的次数并保存到Result中。

public class Searcher implements Runnable{
    private MatrixMock matrixMock;
    private Results results;
    private int firstRow;
    private int lastRow;
    private int number;
    private final CyclicBarrier cyclicBarrier;

    public Searcher(CyclicBarrier cyclicBarrier, MatrixMock matrixMock, Results results, int firstRow, int lastRow, int number) {
        this.cyclicBarrier = cyclicBarrier;
        this.matrixMock = matrixMock;
        this.results = results;
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.number = number;
    }

    @Override
    public void run() {
        System.out.printf("%s: Processing lines from %d to %d.\n",
                Thread.currentThread().getName(), firstRow, lastRow);

        for (int i = firstRow; i < lastRow; i++) {
            int row[] = matrixMock.getRow(i);
            int counter = 0;
            for (int j = 0; j < row.length; j++) {
                if(row[j] == number) {
                    counter++;
                }
            }
            results.setData(i, counter);
        }

        System.out.printf("%s: Line processed.\n", Thread.currentThread().getName());

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

查找任务合并线程,也就是等待所有所有查找子任务完成后,把所有子任务查找的结果做一个合计,并打印出一共找到多少个待查找数字。

public class Grouper implements Runnable{
    private Results results;

    public Grouper(Results results) {
        this.results = results;
    }

    @Override
    public void run() {
        System.out.printf("Grouper: Processing results...\n");
        int count = 0;
        for (int i = 0; i < results.getData().length; i++) {
            count += results.getData()[i];
        }
        System.out.printf("Grouper: Total result: %d\n", count);
    }
}

在主方法类中创建一个10000*10000的矩阵,并启动5个Searcher线程去查找数字5在每行中出现的次数。然后使用Grouper线程作为CyclicBarrier的初始化参数,等待所有Searcher任务执行完毕后执行Grouper线程来合计每个Searcher线程的查找结果。

public class Main {
    public static void main(String[] args) {
        int size = 10000;
        int length = 10000;
        int search = 5;
        int participants = 5;
        int lines_participants = size / participants;

        MatrixMock matrixMock = new MatrixMock(size, length, search);
        Results results = new Results(size);
        Grouper grouper = new Grouper(results);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(participants, grouper);
        Searcher[] searchers = new Searcher[participants];
        Thread[] searcherThreads = new Thread[participants];
        for (int i = 0; i < participants; i++) {
            int firstRow = i * lines_participants;
            int lastRow = (i + 1) * lines_participants;
            searchers[i] = new Searcher(cyclicBarrier, matrixMock, results, firstRow, lastRow, search);
            searcherThreads[i] = new Thread(searchers[i]);
            searcherThreads[i].start();
        }
    }
}

查看任务执行日志,我们发现每个Searcher任务执行2000行数据的查找工作,最终找到9997834个带查找数字。

Mock: There are 9997834 occurences of 5 in generate
Thread-0: Processing lines from 0 to 2000.
Thread-1: Processing lines from 2000 to 4000.
Thread-2: Processing lines from 4000 to 6000.
Thread-3: Processing lines from 6000 to 8000.
Thread-0: Line processed.
Thread-2: Line processed.
Thread-3: Line processed.
Thread-4: Processing lines from 8000 to 10000.
Thread-1: Line processed.
Thread-4: Line processed.
Grouper: Processing results...
Grouper: Total result: 9997834

CyclicBarrier类提供了两个方法用来查看在CyclicBarrier上面等待的线程数和同步的任务数。

System.out.printf("CyclicBarrier: %d, %d\n", cyclicBarrier.getNumberWaiting(), cyclicBarrier.getParties());

CyclicBarrier类还支持重置,通过reset()方法完成操作。当重置发生后,await()方法将接受一个BrokenBarrierException异常,你捕获这个异常后可以用来执行一些复杂的操作,比如回滚数据或者重新执行。

CyclicBarrier类有一个特有的状态为Broken,当多线程并发等待的时候,有一个线程被中断,这个线程抛出InterruptedException异常,其他等待的线程将抛出BrokenBarrierException异常,于是CyclicBarrier对象属于Broken状态。你可以使用isBroken()方法来判断CyclicBarrier对象是否处于损坏状态。

展开阅读全文
打赏
0
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部