java7 concurrency cookbook(第三章)

原创
2014/12/07 20:58
阅读数 50

本书的第一章非常的基础。第二章开始通过一些实例讲并发关键字使用。包括了condition、ReentrantLock等。

第三章讲解线程并发工具。

3.1 控制并发访问资源

3.2控制并发访问一个资源的多个副本。

3.3在一个点上控制并发

3.4控制并发阶段任务的阶段改变。

3.5在并发任务中改变数据的值。

首先和大家区别个概念:

发和并行的区别

并发是在单个处理器中某段时间内存在多个任务等待执行。

并行是在多个处理器中同一个任务在多线程环境下同时执行。

这样定义不准确,但是至少让大家从一个侧面区别两者。

在基础的并发机制中,最为关键的:

1、synchronized关键字

2、lock接口和它的实现:ReentrantLock、ReentrantReadWriteLock、ReadLock、ReentrantReadWriteLock、WriteLock。

本章讲解高水平的多线程并发机制。包括:

Semaphore(信号量)

CountDownLatch()

CyclicBarrier

Phaser(java7新特性)多个线程必须都完成一个阶段的所以任务才能够去完成下一阶段的任务。

Exchanger:两个线程完成数据交换的点。


二进制信号量:

就是个Sempphore的变量。它就是一个0、1.如果有线程使用了信号量,它的值就变成0.其它同时请求资源的线程等待其它线程释放信号量。它有两个实用的方法:acquireUninterruptible()、tryAquire()两个方法。acquireUninterruptible()方法使得那些等待信号量的线程不能够被打断。正常情况下,等待信号量的线程如果被打断就会抛出异常。

tryAquire()方法首先尝试获取信号量。如果未获取到信号量,返回false。线程可以根据这个布尔值选择在未获得信号量的时候如何做相应的操作。

信号量公平性策略,是在释放信号量的时候选择等待时间最久的线程来获取信号量。

同时执行资源副本。

如果有三个打印机同时执行打印功能,那么如何并发使用三个打印机呢?

首先打印功能不能够三个机器同时执行,否则一页纸的东西就会打印到三个打印机上去了,岂不乱套的了?这样打印函数就会需要一个锁。如何轮流使用打印机呢?就需要旗帜标示那个打印机可以使用,以便线程去寻找可以使用的打印机。

等待同时执行的事件:

如果有20个客人等着过来,那么我们可以设置变量CountDownLatch=20,如果来一个客人(线程),那么就会有-1.当20个线程同时执行完毕,那么就会解除await()。开始下一步(节目开始)。

如果有个一客人不来怎么办?CountDownLatch有一个机制await(long time,TimeUnit timeunit);在一定范围内不来,那么自动终断,开始下一步(下面的节目)

在一个点上并行任务(CylicBarrier)

在需要多个线程来执行某个任务的时候 ,如果有一个线程完成了,那么就自动调用await()方法,等待其它线程的到来,如果所有的线程都来了,那么所有的线程将被唤醒去做下面的任务。比如:我们要计算10个数组各行的数值,那么第一步我们就是要开10个线程去计算每个数组每行的数组,等十个线程都计算完毕了,那么就继续下一个阶段,算出所有行的数据和。

与CountDownLatch不同,CylicBarrier不是让所有的线程休眠,而是让所有的线程都去完成自己的任务,然后到达所有线程都到达这个点才能够继续接下来的任务。

phaser 是java7之后出现的新特性。

phaser将一项任务划分为不同的阶段:

当并发的任务需要分多个步骤执行的时候,phaser机制能够得到很好的发挥。phaser类能够在每个阶段同步所有的任务线程。所有的线程都不执行下一阶段的任务指导大家都将第一阶段的任务执行完毕。

我们将使用任务数量初始化pahser类以便动态增减这个值。phaser类通过将任务按阶段同步达到多个线程同时执行任务的目的。phaser是控制线程执行阶段的机制。如果多个线程跑多个任务,那么控制线程“跑速”的就是pahser的phaser.arriveAndAwaitAdvance()方法,它控制线程等待其它线程一同完成这个阶段的任务进入下一阶段。如果线程到达了任务的最后阶段,准备跳出任务,那么执行pahser.arriveAndDeregister()方法跳出所有阶段。当main方法的phaser得知所有的任务都跳出了,那么它才使用interrupt方法打断自己。

如果要想在不同阶段做出不同的提示,那么则使用以下的方法:

public class MyPhaser extends Phaser{

 onAdvance(int phase,int registeredParties){

case(pahse){

 case 1:

return methods1();

case2 methods2();

..........

}

}

}

比如某次做练习题,一共三道题,每道题需要等到所有的同学完成才能够进入下一阶段,那么用phase控制多个线程在三个计算做同步。

exchanger可以在两个线程之间交换数据,不支持在多线程中不支持。可以在生产者-消费者模式中使用。

代码如下:

package com.china.zj;


import java.util.List;

import java.util.concurrent.Exchanger;


public class Productor implements Runnable {

 private List<String> buffer;

 private final Exchanger<List<String>> exchanger;

     public Productor(List<String> buffer,Exchanger<List<String>> exchanger){

       this.buffer=buffer;

       this.exchanger=exchanger;

     

     }

 @Override

 public void run() {

  int cycle=1;

  for(int i=0;i<10;i++){

   System.out.println("cycle"+cycle);

   for (int j=0;j<10;j++){

    String message="Event"+((i*10)+j);

    System.out.println("message"+message);

    buffer.add(message);

   }

   try {

    buffer=exchanger.exchange(buffer);

   } catch (InterruptedException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

   }

   System.out.println("productor size"+buffer.size());

   cycle++;

  }

  // TODO Auto-generated method stub


 }


}


------消费者-------------


package com.china.zj;


import java.util.List;

import java.util.concurrent.Exchanger;


public class Consumer implements Runnable {

    private List<String> buffer;

    private Exchanger<List<String>> exchanger;

    public Consumer(List<String> buffer,Exchanger<List<String>> exchanger){

     this.buffer=buffer;

     this.exchanger=exchanger;

    }

 

 public void run() {

  // TODO Auto-generated method stubri

 

   int cycle=1;

   for(int i=0;i<10;i++){

    System.out.println("cycle"+cycle);

    try {

     buffer=exchanger.exchange(buffer);

    } catch (InterruptedException e) {

     // TODO Auto-generated catch block

     e.printStackTrace();

    }

   

   

    for (int j=0;j<10;j++){

     String message=buffer.get(0);

     System.out.println("Consumer"+message);

     buffer.remove(0);

    }

   

    System.out.println("productor size"+buffer.size());

    cycle++;

   }

   // TODO Auto-generated method stub


  }


 }



-------core------------


package com.china.zj;


import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Exchanger;


public class Core {


 /**

  * @param  args

  */

 public static void main(String[] args) {

  // TODO Auto-generated method stub

        List<String> buffer1=new ArrayList<String>();

        List<String> buffer2=new ArrayList<String>();

        Exchanger<List<String>> exchanger=new Exchanger<List<String>>();

        Productor pro=new Productor(buffer1,exchanger);

        Consumer cosum=new Consumer(buffer2,exchanger);

        Thread threadProducer=new Thread(pro);

        Thread threadConsumer=new Thread(cosum);

        threadProducer.start();

        threadConsumer.start();

 }


}


注意红字部分代码的位置,如果放在后面会报错。!!


展开阅读全文
打赏
1
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
1
分享
返回顶部
顶部