文档章节

线程池+阻塞队列 模仿生产者消费者

l
 lintalkliu
发布于 2016/05/12 22:22
字数 447
阅读 106
收藏 1
点赞 2
评论 0

入口点

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.concurrent.BlockingQueue;   
  4. import java.util.concurrent.ExecutorService;   
  5. import java.util.concurrent.Executors;   
  6. import java.util.concurrent.LinkedBlockingQueue;   
  7. import java.util.concurrent.TimeUnit;  
  8.     
  9. /**  
  10.  * Throws exception    Specical value        TimeOut                    Block 
  11.  * add(e)                       offer(e)               offer(e,time,unit)         put 
  12.  * remove                      poll                   poll(time,unit)             take 
  13.  */  
  14. public class BlockingQueueTest {   
  15.     
  16.     public static void main(String[] args) throws InterruptedException {   
  17.         // 声明一个容量为10的缓存队列   
  18.         //SynchronousQueue<String> 只有有人来拿的时候数据才能放的进去  
  19.         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);   
  20.           
  21.         Producer producer1 = new Producer("rain",queue);   
  22.         Producer producer2 = new Producer("tom",queue);   
  23.         Producer producer3 = new Producer("jack",queue);   
  24.         Consumer consumer = new Consumer(queue);   
  25.     
  26.         // 借助Executors 线程池  
  27.         ExecutorService service = Executors.newCachedThreadPool();   
  28.         // 启动线程   
  29.         service.execute(producer1);   
  30.         service.execute(producer2);   
  31.         service.execute(producer3);   
  32.         service.execute(consumer);   
  33.     
  34.         // 执行3s   
  35.         TimeUnit.SECONDS.sleep(3);  
  36.         producer1.stop();   
  37.         producer2.stop();   
  38.         producer3.stop();   
  39.     
  40.         // 退出Executor   
  41.         service.shutdown();   
  42.     }   
  43. }   


 

 

生产者

 

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.Random;   
  4. import java.util.concurrent.BlockingQueue;   
  5. import java.util.concurrent.TimeUnit;   
  6. import java.util.concurrent.atomic.AtomicInteger;  
  7.   
  8. public class Producer implements Runnable {   
  9.     
  10.     private String name;  
  11.     private volatile boolean      isRunning               = true;   
  12.     private BlockingQueue queue;   
  13.     private static AtomicInteger  count                   = new AtomicInteger();   
  14.     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;   
  15.       
  16.     public Producer(String name,BlockingQueue queue) {   
  17.         this.queue = queue;   
  18.         this.name=name;  
  19.     }   
  20.     
  21.     public void run() {   
  22.         String data = null;   
  23.         Random r = new Random();   
  24.     
  25.         System.out.println("启动生产者线程!"+name);   
  26.         try {   
  27.             while (isRunning) {   
  28.                 System.out.println(name+"正在生产数据...");   
  29.                 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));   
  30.     
  31.                 data = "data:" + count.incrementAndGet();   
  32.                 System.out.println(name+"将数据:" + data + "放入队列...");   
  33.                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {   
  34.                     System.out.println(name+"放入数据失败:" + data);   
  35.                 }   
  36.             }   
  37.         } catch (InterruptedException e) {   
  38.             e.printStackTrace();   
  39.             Thread.currentThread().interrupt();   
  40.         } finally {   
  41.             System.out.println(name+"退出生产者线程!");   
  42.         }   
  43.     }   
  44.     
  45.     public void stop() {   
  46.         isRunning = false;   
  47.     }   
  48.     
  49.   
  50.     
  51. }   


 

消费者

 

[java] view plain copy

  1. package multithreading.BlockingQueue;  
  2.   
  3. import java.util.Random;   
  4. import java.util.concurrent.BlockingQueue;   
  5. import java.util.concurrent.TimeUnit;   
  6.   
  7.   
  8. public class Consumer implements Runnable {   
  9.     
  10.         
  11.     private BlockingQueue<String> queue;   
  12.     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;   
  13.       
  14.     public Consumer(BlockingQueue<String> queue) {   
  15.         this.queue = queue;   
  16.     }   
  17.     
  18.     public void run() {   
  19.         System.out.println("启动消费者线程!");   
  20.         Random r = new Random();   
  21.         boolean isRunning = true;   
  22.         try {   
  23.             while (isRunning) {   
  24.                 System.out.println("消费者正从队列获取数据...");   
  25.                 String data = queue.poll(5, TimeUnit.SECONDS);   
  26.                 if (null != data) {   
  27.                     System.out.println("消费者拿到数据:" + data);   
  28.                     System.out.println("消费者正在消费数据:" + data);   
  29.                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));   
  30.                 } else {   
  31.                     // 超过5s还没数据,认为所有生产线程都已经退出,自动退出消费线程。   
  32.                     isRunning = false;   
  33.                 }   
  34.             }   
  35.         } catch (InterruptedException e) {   
  36.             e.printStackTrace();   
  37.             Thread.currentThread().interrupt();   
  38.         } finally {   
  39.             System.out.println("退出消费者线程!");   
  40.         }   
  41.     }   
  42.   

本文转载自:http://blog.csdn.net/gaogaoshan/article/details/9312603

共有 人打赏支持
l
粉丝 0
博文 6
码字总数 729
作品 0
南京
并发编程(五)——生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式 在线程世界里...

whc20011
2016/10/31
34
0
关于Java多线程的一些常考知识点

前言 多线程也是面试中经常会提起到的一个点。面试官会问:实现多线程的两种方式以及区别,死锁发生的个条件以及如何避免发生死锁,死锁和活锁的区别,常见的线程池以及区别,怎么理解有界队...

cmazxiaoma
2017/12/11
0
0
聊聊并发(十)生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 在线程世界里,生产者就是生产数据的线程,消...

陶邦仁
2015/03/23
0
0
BlockingQueue学习

引言 在Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时...

jiangmitiao
2015/08/30
101
0
java中线程队列BlockingQueue的用法

认识BlockingQueue阻塞队列, 顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入...

landebug
2015/07/30
0
0
线程池的设计(一):半同步半异步线程池的设计

前置问答: 1.为什么要使用线程池? 避免大量创建和销毁线程,提升性能。 2.线程池主要工作流程?一是外部不断地向线程池添加任务,而是线程池内部不断地取任务执行,这是典型的生产者消费者...

苗永超
2016/04/07
466
2
Java并发包中的同步队列SynchronousQueue实现原理

介绍 Java 6的并发编程包中的是SynchronousQueue一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。 不像ArrayBlockingQueue或Lin...

空云万里晴
2016/06/16
51
0
并行化资源池队列 1 —— 部分有界队列

1前言 在并发系统中很多地方都要用到作为资源池的并行化队列,如在大多数应用中,一个或多个生产者线程生产数据,一个或多个消费者消费数据。这些数据元素可以是需要执行的的任务、要解释的键...

武祖林动
2017/06/12
64
0
生产者/消费者问题的多种Java实现方式

生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者/消费者问题的方法可分为两...

HenrySun
2016/05/04
89
0
阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) ...

天外飞鱼
2014/07/28
0
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Activiti - 新一代的开源 BPM 引擎

Activiti 背景简介、服务和功能介绍 董 娜, 狄 浩, 和 张 晓篱 2012 年 7 月 23 日发布 背景介绍 Activiti 其核心是 BPMN 2.0 的流程引擎。BPMN 是目前被各 BPM 厂商广泛接受的 BPM 标准,全...

孟飞阳
11分钟前
0
0
最有效的方式来适配

最有效的方式来进行屏幕适配 在上代码之前先把屏幕相关的几个概念搞清楚:Density、DensityDpi、ScaleDensity。这里我们不过多讲解这些概念知识。 1.0 获取设计图的屏幕尺寸:这里我以360dp...

android-key
13分钟前
0
0
微信授权代码翻译样本

var a,b,c,d = ngx.call(1,2,3)var e = [];var f ;var g = function () {}var h = 1;var c = "abcdefg" + "222";var d = "asdasdasd" + a;var a = ngx >>> log();//......

钟元OSS
15分钟前
0
0
5、二维码生成工具类

一、maven引入依赖jar包 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.1.0</version></dependency><dependency><groupId>com.......

丑陋的皮囊
15分钟前
0
0
正则表达式

以前用正则表达式很少,大部分情况下matches一下就好了,这次遇到的情况比较特殊,因为对截取出来的数据比较敏感(日期),所以就重新熟悉了一下,感觉有必要记录一下: /** * 从字符串中...

lost_keke
18分钟前
1
0
Java语言学习(十一):枚举类型和泛型

Java中一个重要的类型:枚举,它可以用来表示一组取值范围固定的变量,使用 enum 关键字定义枚举类型,其中元素不能重复,通常大写表示。利用Java的反射机制,可以在运行时分析类,如查看枚举...

海岸线的曙光
20分钟前
0
0
XGboost调参

参见 :https://blog.csdn.net/u010665216/article/details/78532619?utm_source=debugrun&utm_medium=referral 待补充...

小叮当_加V
28分钟前
0
0
Vue使用问题解决记录(持续更新)

1 this属性调用无效 this在methods中的方法表面调用,表明调用的是当前vue对象. 但在方法内部的方法中调用时,所指便不再是vue对象,可能式窗口本身. 此时建议,在methods中的方法开始时写: var...

社哥
28分钟前
0
0
美国最新超级计算机Summit顶替中国神威超算榜首位置[图]

美国最新超级计算机Summit顶替中国神威超算榜首位置[图]: 2018年6月8日,美国能源部橡树岭国家实验室宣布,制造出了全世界目前最快的超级计算机Summit,顶替了中国“神威太湖之光”在超算排...

原创小博客
31分钟前
1
0
WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!

背景 ssh登录的时候,出现的错误: zylMBP:Downloads zhangyalin$ ssh root@192.168.56.108@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ WARNING: REMOTE HOST IDENT......

亚林瓜子
33分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部