文档章节

Java并发编程中级篇(四):使用CyclicBarrier实现并发线程在集合点同步

阿拉德大陆的魔法师
 阿拉德大陆的魔法师
发布于 2016/11/26 12:54
字数 1466
阅读 87
收藏 2

上一节讲到了CountDownLatch这个并发辅助类,它能够让一个线程等待其他并发线程执行完一组任务后再继续执行,也可以说是实现了并发线程在集合点同步。但是Java又给出了一个更强大的并发辅助类CyclicBarrier。

CyclicBarrier也使用一个整形参数进行初始化,这个参数是需要再某点同步的线程数。当线程调用await()方法后CyclicBarrier类将把这个线程编为WAITING状态,并等待直到其他线程都到达集合点。当最后一个线程到达集合点后,调用CyclicBarrier类的await()方法时,CyclicBarrieer对象将唤醒所有通过await()方法进入等待的线程。

CyclicBarrier与CountDownLatch不同的地方在于,CountDownLatch通过countDown()方法对计数器减1来标记一个线程已经到达集合点,并且这个线程不会阻塞会继续执行。而CyclicBarrier类则通过await()方法标记线程到达集合点,并且这个到达集合点的线程会被阻塞。另外CyclicBarrier还支持把一个Runnable对象作为一个初始化参数,当所有的线程都到达集合点的时候,这个线程会被启动。这非常类似以一个分治算法的实现,把一个大任务拆分成若干个子任务,并等待所有子任务结束后,输出执行结果。

下面我们用一个实例来演示如何使用CyclicBarrier来模拟使用分治算法在一个矩阵中查找一个数字出现的次数。

首先我们创建一个矩阵类,构造函数接受矩阵的维度以及需要查找的数字,并采用随机数的方式构建这个矩阵然后记录下来构建矩阵过程中这个待查找数字出现的次数。

public class MatrixMock {
    private int[][] data;

    public MatrixMock(int size, int length, int number) {
        int count = 0;
        data = new int[size][length];
        Random random = new Random();
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < length; j++) {
                int temp = random.nextInt(10);
                data[i][j] = temp;
                if (temp == number) {
                    count++;
                }
            }
        }
        System.out.printf("Mock: There are %d occurences of %d in generate\n", count, number);
    }

    public int[] getRow(int row) {
        if(row >= 0 && row < data.length) {
            return data[row];
        }
        return null;
    }
}

查找结果类,定义了每行中待查找数字出现的次数。

public class Results {
    int data[];

    public Results(int size) {
        data = new int[size];
    }

    public void setData(int position, int value) {
        data[position] = value;
    }

    public int[] getData() {
        return data;
    }
}

接下来我们创建查找任务线程,这个类接受几个参数作为构造方法。分别是MatrixMock(待查找矩阵),Results(查找结果),firstRow(此任务分配的开始查找位置),lastRow(此任务分配的结束查找位置),number(待查找数字),CyclicBarrier(并发辅助类)。然后我们从firstRow开始查找,直到lastRow结束查找每行中待查找数字出现的次数并保存到Result中。

public class Searcher implements Runnable{
    private MatrixMock matrixMock;
    private Results results;
    private int firstRow;
    private int lastRow;
    private int number;
    private final CyclicBarrier cyclicBarrier;

    public Searcher(CyclicBarrier cyclicBarrier, MatrixMock matrixMock, Results results, int firstRow, int lastRow, int number) {
        this.cyclicBarrier = cyclicBarrier;
        this.matrixMock = matrixMock;
        this.results = results;
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.number = number;
    }

    @Override
    public void run() {
        System.out.printf("%s: Processing lines from %d to %d.\n",
                Thread.currentThread().getName(), firstRow, lastRow);

        for (int i = firstRow; i < lastRow; i++) {
            int row[] = matrixMock.getRow(i);
            int counter = 0;
            for (int j = 0; j < row.length; j++) {
                if(row[j] == number) {
                    counter++;
                }
            }
            results.setData(i, counter);
        }

        System.out.printf("%s: Line processed.\n", Thread.currentThread().getName());

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

查找任务合并线程,也就是等待所有所有查找子任务完成后,把所有子任务查找的结果做一个合计,并打印出一共找到多少个待查找数字。

public class Grouper implements Runnable{
    private Results results;

    public Grouper(Results results) {
        this.results = results;
    }

    @Override
    public void run() {
        System.out.printf("Grouper: Processing results...\n");
        int count = 0;
        for (int i = 0; i < results.getData().length; i++) {
            count += results.getData()[i];
        }
        System.out.printf("Grouper: Total result: %d\n", count);
    }
}

在主方法类中创建一个10000*10000的矩阵,并启动5个Searcher线程去查找数字5在每行中出现的次数。然后使用Grouper线程作为CyclicBarrier的初始化参数,等待所有Searcher任务执行完毕后执行Grouper线程来合计每个Searcher线程的查找结果。

public class Main {
    public static void main(String[] args) {
        int size = 10000;
        int length = 10000;
        int search = 5;
        int participants = 5;
        int lines_participants = size / participants;

        MatrixMock matrixMock = new MatrixMock(size, length, search);
        Results results = new Results(size);
        Grouper grouper = new Grouper(results);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(participants, grouper);
        Searcher[] searchers = new Searcher[participants];
        Thread[] searcherThreads = new Thread[participants];
        for (int i = 0; i < participants; i++) {
            int firstRow = i * lines_participants;
            int lastRow = (i + 1) * lines_participants;
            searchers[i] = new Searcher(cyclicBarrier, matrixMock, results, firstRow, lastRow, search);
            searcherThreads[i] = new Thread(searchers[i]);
            searcherThreads[i].start();
        }
    }
}

查看任务执行日志,我们发现每个Searcher任务执行2000行数据的查找工作,最终找到9997834个带查找数字。

Mock: There are 9997834 occurences of 5 in generate
Thread-0: Processing lines from 0 to 2000.
Thread-1: Processing lines from 2000 to 4000.
Thread-2: Processing lines from 4000 to 6000.
Thread-3: Processing lines from 6000 to 8000.
Thread-0: Line processed.
Thread-2: Line processed.
Thread-3: Line processed.
Thread-4: Processing lines from 8000 to 10000.
Thread-1: Line processed.
Thread-4: Line processed.
Grouper: Processing results...
Grouper: Total result: 9997834

CyclicBarrier类提供了两个方法用来查看在CyclicBarrier上面等待的线程数和同步的任务数。

System.out.printf("CyclicBarrier: %d, %d\n", cyclicBarrier.getNumberWaiting(), cyclicBarrier.getParties());

CyclicBarrier类还支持重置,通过reset()方法完成操作。当重置发生后,await()方法将接受一个BrokenBarrierException异常,你捕获这个异常后可以用来执行一些复杂的操作,比如回滚数据或者重新执行。

CyclicBarrier类有一个特有的状态为Broken,当多线程并发等待的时候,有一个线程被中断,这个线程抛出InterruptedException异常,其他等待的线程将抛出BrokenBarrierException异常,于是CyclicBarrier对象属于Broken状态。你可以使用isBroken()方法来判断CyclicBarrier对象是否处于损坏状态。

© 著作权归作者所有

共有 人打赏支持
阿拉德大陆的魔法师
粉丝 25
博文 91
码字总数 83019
作品 0
西城
程序员
私信 提问
java多线程系列:通过对战游戏学习CyclicBarrier

CyclicBarrier是java.util.concurrent包下面的一个工具类,字面意思是可循环使用(Cyclic)的屏障(Barrier),通过它可以实现让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一...

huangzd
01/06
0
0
计算机科学中抽象的好处与问题—伪共享实例分析

David John Wheeler有一句名言“计算机科学中的任何问题都可以通过加上一层间接层来解决”,一层不够就再加一层。后半句是我加的 (* ̄︶ ̄) ,虽然有点玩笑的意思,但是也的确能说明一些问题...

MageekChiu
01/10
0
0
【Java并发专题】27篇文章详细总结Java并发基础知识

努力的意义,就是,在以后的日子里,放眼望去全是自己喜欢的人和事! github:https://github.com/CL0610/Java-concurrency,欢迎题issue和Pull request。所有的文档都是自己亲自码的,如果觉...

你听___
05/06
0
0
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
07/22
0
0
Java线程面试题 Top 50

不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员的欢迎。大多数待遇丰厚的Java开发职位都要求开发者...

loda0128
2015/05/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

dubbo 搭建与使用

官网:http://dubbo.apache.org/en-us/ 一,安装监控中心(可以不安装) admin管理控制台,monitor监控中心 下载 bubbo ops 这个是新版的,需要node.js环境,我没有就用老版的了...

小兵胖胖
7分钟前
0
0
mac 下 mysql 8.0.13 安装并记录遇到的问题 以便以后查看

安装 官网mysql 下载地址 安装过程 省去 安装好之后 下载navicat 错误1 链接 遇到 mysql 2003 - Can't connect to MySQL server 错误, 解决方案 重启mysql 服务 #错误2 ERROR 1045: Acces...

杭州-IT攻城狮
昨天
5
0

中国龙-扬科
昨天
1
0
[Spring4.x]基于spring4.x纯注解的Web工程搭建

在前文中已经说明了如何基于 Spring4.x+ 版本开发纯注解的非web项目,链接如下: https://my.oschina.net/morpheusWB/blog/2985600 本文则主要说明,如何在Web项目中,"基于spring纯注解方式...

morpheusWB
昨天
16
0
基础编程题目集-7-13 日K蜡烛图

股票价格涨跌趋势,常用蜡烛图技术中的K线图来表示,分为按日的日K线、按周的周K线、按月的月K线等。以日K线为例,每天股票价格从开盘到收盘走完一天,对应一根蜡烛小图,要表示四个价格:开...

niithub
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部