文档章节

线程池+阻塞队列 模仿生产者消费者

l
 lintalkliu
发布于 2016/05/12 22:22
字数 447
阅读 174
收藏 1

入口点

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.concurrent.BlockingQueue;   
  4. import java.util.concurrent.ExecutorService;   
  5. import java.util.concurrent.Executors;   
  6. import java.util.concurrent.LinkedBlockingQueue;   
  7. import java.util.concurrent.TimeUnit;  
  8.     
  9. /**  
  10.  * Throws exception    Specical value        TimeOut                    Block 
  11.  * add(e)                       offer(e)               offer(e,time,unit)         put 
  12.  * remove                      poll                   poll(time,unit)             take 
  13.  */  
  14. public class BlockingQueueTest {   
  15.     
  16.     public static void main(String[] args) throws InterruptedException {   
  17.         // 声明一个容量为10的缓存队列   
  18.         //SynchronousQueue<String> 只有有人来拿的时候数据才能放的进去  
  19.         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);   
  20.           
  21.         Producer producer1 = new Producer("rain",queue);   
  22.         Producer producer2 = new Producer("tom",queue);   
  23.         Producer producer3 = new Producer("jack",queue);   
  24.         Consumer consumer = new Consumer(queue);   
  25.     
  26.         // 借助Executors 线程池  
  27.         ExecutorService service = Executors.newCachedThreadPool();   
  28.         // 启动线程   
  29.         service.execute(producer1);   
  30.         service.execute(producer2);   
  31.         service.execute(producer3);   
  32.         service.execute(consumer);   
  33.     
  34.         // 执行3s   
  35.         TimeUnit.SECONDS.sleep(3);  
  36.         producer1.stop();   
  37.         producer2.stop();   
  38.         producer3.stop();   
  39.     
  40.         // 退出Executor   
  41.         service.shutdown();   
  42.     }   
  43. }   


 

 

生产者

 

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.Random;   
  4. import java.util.concurrent.BlockingQueue;   
  5. import java.util.concurrent.TimeUnit;   
  6. import java.util.concurrent.atomic.AtomicInteger;  
  7.   
  8. public class Producer implements Runnable {   
  9.     
  10.     private String name;  
  11.     private volatile boolean      isRunning               = true;   
  12.     private BlockingQueue queue;   
  13.     private static AtomicInteger  count                   = new AtomicInteger();   
  14.     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;   
  15.       
  16.     public Producer(String name,BlockingQueue queue) {   
  17.         this.queue = queue;   
  18.         this.name=name;  
  19.     }   
  20.     
  21.     public void run() {   
  22.         String data = null;   
  23.         Random r = new Random();   
  24.     
  25.         System.out.println("启动生产者线程!"+name);   
  26.         try {   
  27.             while (isRunning) {   
  28.                 System.out.println(name+"正在生产数据...");   
  29.                 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));   
  30.     
  31.                 data = "data:" + count.incrementAndGet();   
  32.                 System.out.println(name+"将数据:" + data + "放入队列...");   
  33.                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {   
  34.                     System.out.println(name+"放入数据失败:" + data);   
  35.                 }   
  36.             }   
  37.         } catch (InterruptedException e) {   
  38.             e.printStackTrace();   
  39.             Thread.currentThread().interrupt();   
  40.         } finally {   
  41.             System.out.println(name+"退出生产者线程!");   
  42.         }   
  43.     }   
  44.     
  45.     public void stop() {   
  46.         isRunning = false;   
  47.     }   
  48.     
  49.   
  50.     
  51. }   


 

消费者

 

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.Random;   
  4. import java.util.concurrent.BlockingQueue;   
  5. import java.util.concurrent.TimeUnit;   
  6.   
  7.   
  8. public class Consumer implements Runnable {   
  9.     
  10.         
  11.     private BlockingQueue<String> queue;   
  12.     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;   
  13.       
  14.     public Consumer(BlockingQueue<String> queue) {   
  15.         this.queue = queue;   
  16.     }   
  17.     
  18.     public void run() {   
  19.         System.out.println("启动消费者线程!");   
  20.         Random r = new Random();   
  21.         boolean isRunning = true;   
  22.         try {   
  23.             while (isRunning) {   
  24.                 System.out.println("消费者正从队列获取数据...");   
  25.                 String data = queue.poll(5, TimeUnit.SECONDS);   
  26.                 if (null != data) {   
  27.                     System.out.println("消费者拿到数据:" + data);   
  28.                     System.out.println("消费者正在消费数据:" + data);   
  29.                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));   
  30.                 } else {   
  31.                     // 超过5s还没数据,认为所有生产线程都已经退出,自动退出消费线程。   
  32.                     isRunning = false;   
  33.                 }   
  34.             }   
  35.         } catch (InterruptedException e) {   
  36.             e.printStackTrace();   
  37.             Thread.currentThread().interrupt();   
  38.         } finally {   
  39.             System.out.println("退出消费者线程!");   
  40.         }   
  41.     }   
  42.   

本文转载自:http://blog.csdn.net/gaogaoshan/article/details/9312603

共有 人打赏支持
l
粉丝 0
博文 6
码字总数 729
作品 0
南京
私信 提问
并发编程(五)——生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式 在线程世界里...

whc20011
2016/10/31
34
0
关于Java多线程的一些常考知识点

前言 多线程也是面试中经常会提起到的一个点。面试官会问:实现多线程的两种方式以及区别,死锁发生的个条件以及如何避免发生死锁,死锁和活锁的区别,常见的线程池以及区别,怎么理解有界队...

cmazxiaoma
2017/12/11
0
0
BlockingQueue学习

引言 在Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时...

jiangmitiao
2015/08/30
101
0
聊聊并发(十)生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 在线程世界里,生产者就是生产数据的线程,消...

陶邦仁
2015/03/23
0
0
Java并发编程:阻塞队列BlockingQueue

阻塞队列BlockingQueue简介 阻塞队列BlockingQueue是JDK1.5并发新特性中的内容,阻塞队列首先是一个队列,同样实现了Collection接口。阻塞队列提供了可阻塞的put和take方法,以及支持定时的p...

TonyStarkSir
08/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

中高级面试知识点:缓存

前言 几乎所有的项目都做了缓存,但是缓存做的怎么样,其实只有我们自己知道。缓存做的好,没有网络也能流畅的使用;再多的数据请求都不会出现卡顿延迟等待很久的情况。 程序中除了图片缓存(...

Mr_zebra
28分钟前
2
0
Poco官方PPT_190-Applications双语对照翻译

因工作需要用到这一块的功能,所以直接翻译了一下 此PPT来源于官方文件,地址https://pocoproject.org/documentation.html

CHONGCHEN
32分钟前
5
1
使用idea开发servlet,引用maven后触发的class not found的问题的解决方案

需要将maven下载的依赖加入到lib目录,具体操作方法如下:打开Projrct Settings->Artifacts->右边的Output Layout,双击maven的依赖,就可以加载上去...

shatian
33分钟前
2
0
SpringMVC 拦截器

拦截器 是指通过统一拦截从浏览器发往服务器的请求来完成功能的增强 SpringMVC拦截器实现过程 1.编写拦截器,实现 org.springframework.web.servlet.HandlerInterceptor 接口 2.将拦截器注册...

晨猫
36分钟前
2
0
RabbitMQ+PHP演示实例

新建rabbit_consumer.php作为消费者 <?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'admin', 'password' => ......

hansonwong
37分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部