文档章节

线程池+阻塞队列 模仿生产者消费者

l
 lintalkliu
发布于 2016/05/12 22:22
字数 447
阅读 144
收藏 1

入口点

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.concurrent.BlockingQueue;   
  4. import java.util.concurrent.ExecutorService;   
  5. import java.util.concurrent.Executors;   
  6. import java.util.concurrent.LinkedBlockingQueue;   
  7. import java.util.concurrent.TimeUnit;  
  8.     
  9. /**  
  10.  * Throws exception    Specical value        TimeOut                    Block 
  11.  * add(e)                       offer(e)               offer(e,time,unit)         put 
  12.  * remove                      poll                   poll(time,unit)             take 
  13.  */  
  14. public class BlockingQueueTest {   
  15.     
  16.     public static void main(String[] args) throws InterruptedException {   
  17.         // 声明一个容量为10的缓存队列   
  18.         //SynchronousQueue<String> 只有有人来拿的时候数据才能放的进去  
  19.         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);   
  20.           
  21.         Producer producer1 = new Producer("rain",queue);   
  22.         Producer producer2 = new Producer("tom",queue);   
  23.         Producer producer3 = new Producer("jack",queue);   
  24.         Consumer consumer = new Consumer(queue);   
  25.     
  26.         // 借助Executors 线程池  
  27.         ExecutorService service = Executors.newCachedThreadPool();   
  28.         // 启动线程   
  29.         service.execute(producer1);   
  30.         service.execute(producer2);   
  31.         service.execute(producer3);   
  32.         service.execute(consumer);   
  33.     
  34.         // 执行3s   
  35.         TimeUnit.SECONDS.sleep(3);  
  36.         producer1.stop();   
  37.         producer2.stop();   
  38.         producer3.stop();   
  39.     
  40.         // 退出Executor   
  41.         service.shutdown();   
  42.     }   
  43. }   


 

 

生产者

 

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.Random;   
  4. import java.util.concurrent.BlockingQueue;   
  5. import java.util.concurrent.TimeUnit;   
  6. import java.util.concurrent.atomic.AtomicInteger;  
  7.   
  8. public class Producer implements Runnable {   
  9.     
  10.     private String name;  
  11.     private volatile boolean      isRunning               = true;   
  12.     private BlockingQueue queue;   
  13.     private static AtomicInteger  count                   = new AtomicInteger();   
  14.     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;   
  15.       
  16.     public Producer(String name,BlockingQueue queue) {   
  17.         this.queue = queue;   
  18.         this.name=name;  
  19.     }   
  20.     
  21.     public void run() {   
  22.         String data = null;   
  23.         Random r = new Random();   
  24.     
  25.         System.out.println("启动生产者线程!"+name);   
  26.         try {   
  27.             while (isRunning) {   
  28.                 System.out.println(name+"正在生产数据...");   
  29.                 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));   
  30.     
  31.                 data = "data:" + count.incrementAndGet();   
  32.                 System.out.println(name+"将数据:" + data + "放入队列...");   
  33.                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {   
  34.                     System.out.println(name+"放入数据失败:" + data);   
  35.                 }   
  36.             }   
  37.         } catch (InterruptedException e) {   
  38.             e.printStackTrace();   
  39.             Thread.currentThread().interrupt();   
  40.         } finally {   
  41.             System.out.println(name+"退出生产者线程!");   
  42.         }   
  43.     }   
  44.     
  45.     public void stop() {   
  46.         isRunning = false;   
  47.     }   
  48.     
  49.   
  50.     
  51. }   


 

消费者

 

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.Random;   
  4. import java.util.concurrent.BlockingQueue;   
  5. import java.util.concurrent.TimeUnit;   
  6.   
  7.   
  8. public class Consumer implements Runnable {   
  9.     
  10.         
  11.     private BlockingQueue<String> queue;   
  12.     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;   
  13.       
  14.     public Consumer(BlockingQueue<String> queue) {   
  15.         this.queue = queue;   
  16.     }   
  17.     
  18.     public void run() {   
  19.         System.out.println("启动消费者线程!");   
  20.         Random r = new Random();   
  21.         boolean isRunning = true;   
  22.         try {   
  23.             while (isRunning) {   
  24.                 System.out.println("消费者正从队列获取数据...");   
  25.                 String data = queue.poll(5, TimeUnit.SECONDS);   
  26.                 if (null != data) {   
  27.                     System.out.println("消费者拿到数据:" + data);   
  28.                     System.out.println("消费者正在消费数据:" + data);   
  29.                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));   
  30.                 } else {   
  31.                     // 超过5s还没数据,认为所有生产线程都已经退出,自动退出消费线程。   
  32.                     isRunning = false;   
  33.                 }   
  34.             }   
  35.         } catch (InterruptedException e) {   
  36.             e.printStackTrace();   
  37.             Thread.currentThread().interrupt();   
  38.         } finally {   
  39.             System.out.println("退出消费者线程!");   
  40.         }   
  41.     }   
  42.   

本文转载自:http://blog.csdn.net/gaogaoshan/article/details/9312603

共有 人打赏支持
l
粉丝 0
博文 6
码字总数 729
作品 0
南京
并发编程(五)——生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式 在线程世界里...

whc20011
2016/10/31
34
0
关于Java多线程的一些常考知识点

前言 多线程也是面试中经常会提起到的一个点。面试官会问:实现多线程的两种方式以及区别,死锁发生的个条件以及如何避免发生死锁,死锁和活锁的区别,常见的线程池以及区别,怎么理解有界队...

cmazxiaoma
2017/12/11
0
0
BlockingQueue学习

引言 在Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时...

jiangmitiao
2015/08/30
101
0
聊聊并发(十)生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 在线程世界里,生产者就是生产数据的线程,消...

陶邦仁
2015/03/23
0
0
Java并发编程:阻塞队列BlockingQueue

阻塞队列BlockingQueue简介 阻塞队列BlockingQueue是JDK1.5并发新特性中的内容,阻塞队列首先是一个队列,同样实现了Collection接口。阻塞队列提供了可阻塞的put和take方法,以及支持定时的p...

TonyStarkSir
08/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java 并发编程原理

为什么要使用多线程? 通过多线程提高吞吐量 伸缩性比较好,可以增加 CPU 核心来提高程序性能 什么场景下使用多线程? 如:tomcat BIO Java 如何实现多线程? Thread、Runnable、ExecutorSer...

aelchao
37分钟前
2
0
谨慎的覆盖clone方法

说在前面 有些专家级程序员干脆从来不去覆盖clone方法,也从来不去调用它,除非拷贝数组。 其他方式 可以提供一个构造函数或者工厂去实现clone功能。 相比于clone,它们有如下优势: 不依赖于...

XuePeng77
38分钟前
1
0
什么是最适合云数据库的架构设计?

分布式数据库技术发展多年,但是在应用、业务的驱动下,分布式数据库的架构一直在不断发展和演进。 开源金融级分布式数据库SequoiaDB,经过6年的研发,坚持从零开始打造数据库核心引擎。在技...

巨杉数据库
46分钟前
4
0
源码模仿之RPC

源码模仿之RPC RPC - 远程过程调用,概念不多赘述,可自行百度。 场景 统一api接口 生产者(提供远程接口调用方) 使用者(主动调用远程接口) 代码实现 API接口(公共依赖包) DemoEntity (...

GMarshal
47分钟前
1
0
Linux之安装Tomcat8

最近要在Linux上安装Tomcat,记录下 1.进入tomcat8的安装目录 List-1 root@iZwz9bjiawhqzfsklyht4rZ bin]# pwd/opt/app/tomcat8/bin[root@iZwz9bjiawhqzfsklyht4rZ bin]# ll总用量 83......

克虏伯
48分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部