JUC包学习之CyclicBarrier

原创
2014/05/04 21:54
阅读数 178
package cyclicBarrier;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/** 
 * CyclicBarrier类似于CountDownLatch也是个计数器, 
 * 不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, 
 * 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 
 * CyclicBarrier就象它名字的意思一样,可看成是个障碍, 
 * 所有的线程必须到齐后才能一起通过这个障碍,也就是一起开始完成run方法的运行。 
 * 作用在每个卡在每个子线程上
 * CyclicBarrier初始时还可带一个Runnable的参数, 
 * 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。 
 */ 
public class CyclicBarrierTest {
    private CyclicBarrier barrier;
    //启动多个线程,多线程汇总二维数组每行的结果,然后汇总所有结果
    private int[][] data = new int[][]{
            {1,3,5,7,9},{2,4,6,8,10}
    };
    
    public CyclicBarrierTest() throws InterruptedException, BrokenBarrierException{
        //汇总数
        int total = 0;
        List<MyCaculate> threadList = new ArrayList<MyCaculate>();
        //启动二维数组行数个线程
        int threadCount = data.length;
        barrier = new CyclicBarrier(threadCount);
        for(int i=0;i<threadCount;i++){
            MyCaculate caculate = new MyCaculate(data[i],barrier);
            caculate.setName("caculate-thread-"+i);
            caculate.start();
            //System.out.println(caculate.isDone());
            threadList.add(caculate);
        }
        //
        for(MyCaculate c:threadList){
            while(true){
                if(c.isDone()){
                    total+=c.getSum();
                    break;
                }
            }
            //total+=c.getSum();
        }
        System.out.println("--------------------------------------");
        System.out.println("各行汇总结果操作完成,汇总结果:"+total);
        System.out.println("-------------------CyclicBarrier可以重复使用,再来一次--------------");
        total = 0;
        threadList.clear();
        for(int i=0;i<threadCount;i++){
            MyCaculate caculate = new MyCaculate(data[i],barrier);
            caculate.setName("caculate-thread-"+i);
            caculate.start();
            //System.out.println(caculate.isDone());
            threadList.add(caculate);
        }
        //
        for(MyCaculate c:threadList){
            while(true){
                if(c.isDone()){
                    total+=c.getSum();
                    break;
                }
            }
            //total+=c.getSum();
        }
        System.out.println("--------------------------------------");
        System.out.println("各行汇总结果操作完成,汇总结果:"+total);
    }
    

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        CyclicBarrierTest c = new CyclicBarrierTest();
    }
}
class MyCaculate extends Thread
{
    private CyclicBarrier barrier;
    private int[] row;
    private int sum;
    private boolean isDone = false;
    
    public MyCaculate(int[] row,CyclicBarrier barrier){
        this.row = row;
        this.barrier = barrier;
    }
    
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" 开始计算");
        for(int i:row){
            sum+=i;
        }
        isDone = true;
        try {
            //阻塞计算完成标志,直到所有线程都计算完成
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        
    }
    public int getSum(){
        return sum;
    }
    public boolean isDone(){
        return isDone;
    }
}
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部