文档章节

Java并发编程中级篇(四):使用CyclicBarrier实现并发线程在集合点同步

阿拉德大陆的魔法师
 阿拉德大陆的魔法师
发布于 2016/11/26 12:54
字数 1466
阅读 128
收藏 2

上一节讲到了CountDownLatch这个并发辅助类,它能够让一个线程等待其他并发线程执行完一组任务后再继续执行,也可以说是实现了并发线程在集合点同步。但是Java又给出了一个更强大的并发辅助类CyclicBarrier。

CyclicBarrier也使用一个整形参数进行初始化,这个参数是需要再某点同步的线程数。当线程调用await()方法后CyclicBarrier类将把这个线程编为WAITING状态,并等待直到其他线程都到达集合点。当最后一个线程到达集合点后,调用CyclicBarrier类的await()方法时,CyclicBarrieer对象将唤醒所有通过await()方法进入等待的线程。

CyclicBarrier与CountDownLatch不同的地方在于,CountDownLatch通过countDown()方法对计数器减1来标记一个线程已经到达集合点,并且这个线程不会阻塞会继续执行。而CyclicBarrier类则通过await()方法标记线程到达集合点,并且这个到达集合点的线程会被阻塞。另外CyclicBarrier还支持把一个Runnable对象作为一个初始化参数,当所有的线程都到达集合点的时候,这个线程会被启动。这非常类似以一个分治算法的实现,把一个大任务拆分成若干个子任务,并等待所有子任务结束后,输出执行结果。

下面我们用一个实例来演示如何使用CyclicBarrier来模拟使用分治算法在一个矩阵中查找一个数字出现的次数。

首先我们创建一个矩阵类,构造函数接受矩阵的维度以及需要查找的数字,并采用随机数的方式构建这个矩阵然后记录下来构建矩阵过程中这个待查找数字出现的次数。

public class MatrixMock {
    private int[][] data;

    public MatrixMock(int size, int length, int number) {
        int count = 0;
        data = new int[size][length];
        Random random = new Random();
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < length; j++) {
                int temp = random.nextInt(10);
                data[i][j] = temp;
                if (temp == number) {
                    count++;
                }
            }
        }
        System.out.printf("Mock: There are %d occurences of %d in generate\n", count, number);
    }

    public int[] getRow(int row) {
        if(row >= 0 && row < data.length) {
            return data[row];
        }
        return null;
    }
}

查找结果类,定义了每行中待查找数字出现的次数。

public class Results {
    int data[];

    public Results(int size) {
        data = new int[size];
    }

    public void setData(int position, int value) {
        data[position] = value;
    }

    public int[] getData() {
        return data;
    }
}

接下来我们创建查找任务线程,这个类接受几个参数作为构造方法。分别是MatrixMock(待查找矩阵),Results(查找结果),firstRow(此任务分配的开始查找位置),lastRow(此任务分配的结束查找位置),number(待查找数字),CyclicBarrier(并发辅助类)。然后我们从firstRow开始查找,直到lastRow结束查找每行中待查找数字出现的次数并保存到Result中。

public class Searcher implements Runnable{
    private MatrixMock matrixMock;
    private Results results;
    private int firstRow;
    private int lastRow;
    private int number;
    private final CyclicBarrier cyclicBarrier;

    public Searcher(CyclicBarrier cyclicBarrier, MatrixMock matrixMock, Results results, int firstRow, int lastRow, int number) {
        this.cyclicBarrier = cyclicBarrier;
        this.matrixMock = matrixMock;
        this.results = results;
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.number = number;
    }

    @Override
    public void run() {
        System.out.printf("%s: Processing lines from %d to %d.\n",
                Thread.currentThread().getName(), firstRow, lastRow);

        for (int i = firstRow; i < lastRow; i++) {
            int row[] = matrixMock.getRow(i);
            int counter = 0;
            for (int j = 0; j < row.length; j++) {
                if(row[j] == number) {
                    counter++;
                }
            }
            results.setData(i, counter);
        }

        System.out.printf("%s: Line processed.\n", Thread.currentThread().getName());

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

查找任务合并线程,也就是等待所有所有查找子任务完成后,把所有子任务查找的结果做一个合计,并打印出一共找到多少个待查找数字。

public class Grouper implements Runnable{
    private Results results;

    public Grouper(Results results) {
        this.results = results;
    }

    @Override
    public void run() {
        System.out.printf("Grouper: Processing results...\n");
        int count = 0;
        for (int i = 0; i < results.getData().length; i++) {
            count += results.getData()[i];
        }
        System.out.printf("Grouper: Total result: %d\n", count);
    }
}

在主方法类中创建一个10000*10000的矩阵,并启动5个Searcher线程去查找数字5在每行中出现的次数。然后使用Grouper线程作为CyclicBarrier的初始化参数,等待所有Searcher任务执行完毕后执行Grouper线程来合计每个Searcher线程的查找结果。

public class Main {
    public static void main(String[] args) {
        int size = 10000;
        int length = 10000;
        int search = 5;
        int participants = 5;
        int lines_participants = size / participants;

        MatrixMock matrixMock = new MatrixMock(size, length, search);
        Results results = new Results(size);
        Grouper grouper = new Grouper(results);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(participants, grouper);
        Searcher[] searchers = new Searcher[participants];
        Thread[] searcherThreads = new Thread[participants];
        for (int i = 0; i < participants; i++) {
            int firstRow = i * lines_participants;
            int lastRow = (i + 1) * lines_participants;
            searchers[i] = new Searcher(cyclicBarrier, matrixMock, results, firstRow, lastRow, search);
            searcherThreads[i] = new Thread(searchers[i]);
            searcherThreads[i].start();
        }
    }
}

查看任务执行日志,我们发现每个Searcher任务执行2000行数据的查找工作,最终找到9997834个带查找数字。

Mock: There are 9997834 occurences of 5 in generate
Thread-0: Processing lines from 0 to 2000.
Thread-1: Processing lines from 2000 to 4000.
Thread-2: Processing lines from 4000 to 6000.
Thread-3: Processing lines from 6000 to 8000.
Thread-0: Line processed.
Thread-2: Line processed.
Thread-3: Line processed.
Thread-4: Processing lines from 8000 to 10000.
Thread-1: Line processed.
Thread-4: Line processed.
Grouper: Processing results...
Grouper: Total result: 9997834

CyclicBarrier类提供了两个方法用来查看在CyclicBarrier上面等待的线程数和同步的任务数。

System.out.printf("CyclicBarrier: %d, %d\n", cyclicBarrier.getNumberWaiting(), cyclicBarrier.getParties());

CyclicBarrier类还支持重置,通过reset()方法完成操作。当重置发生后,await()方法将接受一个BrokenBarrierException异常,你捕获这个异常后可以用来执行一些复杂的操作,比如回滚数据或者重新执行。

CyclicBarrier类有一个特有的状态为Broken,当多线程并发等待的时候,有一个线程被中断,这个线程抛出InterruptedException异常,其他等待的线程将抛出BrokenBarrierException异常,于是CyclicBarrier对象属于Broken状态。你可以使用isBroken()方法来判断CyclicBarrier对象是否处于损坏状态。

© 著作权归作者所有

阿拉德大陆的魔法师
粉丝 27
博文 91
码字总数 83019
作品 0
西城
程序员
私信 提问
java多线程系列:通过对战游戏学习CyclicBarrier

CyclicBarrier是java.util.concurrent包下面的一个工具类,字面意思是可循环使用(Cyclic)的屏障(Barrier),通过它可以实现让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一...

huangzd
2018/01/06
0
0
计算机科学中抽象的好处与问题—伪共享实例分析

David John Wheeler有一句名言“计算机科学中的任何问题都可以通过加上一层间接层来解决”,一层不够就再加一层。后半句是我加的 (* ̄︶ ̄) ,虽然有点玩笑的意思,但是也的确能说明一些问题...

MageekChiu
2018/01/10
0
0
【Java并发专题】27篇文章详细总结Java并发基础知识

努力的意义,就是,在以后的日子里,放眼望去全是自己喜欢的人和事! github:https://github.com/CL0610/Java-concurrency,欢迎题issue和Pull request。所有的文档都是自己亲自码的,如果觉...

你听___
2018/05/06
0
0
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
2018/07/22
0
0
Java线程面试题 Top 50

不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员的欢迎。大多数待遇丰厚的Java开发职位都要求开发者...

loda0128
2015/05/29
995
1

没有更多内容

加载失败,请刷新页面

加载更多

java通过ServerSocket与Socket实现通信

首先说一下ServerSocket与Socket. 1.ServerSocket ServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态. ServetSocket有三个构造方法: (1) ServerSocket(int port);...

Blueeeeeee
今天
6
0
用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
5
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
16
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
18
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部