文档章节

多线程模式(5):生产者-消费者模式

小七酱
 小七酱
发布于 2015/07/24 13:14
字数 855
阅读 16
收藏 0
  1. 定义共享数据封装

package com.xqi.p_c;

/**
 * 请求数据封装(以不变模式定义)
 * 
 * @author mike <br>
 *         2015年7月24日
 */
public final class PCData {

    private final int intData;

    public PCData(int d) {
        intData = d;
    }

    public PCData(String d) {
        intData = Integer.valueOf(d);
    }
    
    public int getData(){
        return this.intData;
    }
    
    @Override
    public String toString() {
        return "data:" + intData;
    }

}

 2. 定义生产者

package com.xqi.p_c;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 定义生产者
 * 
 * @author mike <br>
 *         2015年7月24日
 */
public class Produer implements Runnable {

    /* 用volatile来修饰的变量,表示随时都可能被其他的线程来改变,并使其他的线程直接访问此变量,而不保存其他的备份! */
    private volatile boolean isRunning = true;
    /* 定义内存缓冲区,来存放生产者提交的请求与数据, PCData就是数据的封装 */
    private BlockingQueue<PCData> queue;
    /* 总数,原子操作 ,AtomicInteger提供线程安全的加减操作接口 */
    private static AtomicInteger count = new AtomicInteger();
    /* 休眠时间 */
    private static final int SLEEPTIME = 1000;

    /**
     * 构造方法,注入缓冲队列
     * 
     * @param queue
     */
    public Produer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }

    public void run() {
        PCData data = null;
        Random r = new Random();
        System.out.println("start produer id = " + Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME)); // 使用随机产生时间差(消费者中一样)
                data = new PCData(count.incrementAndGet());// incrementAndGet表示+1
                System.out.println(data + " is put into queue!");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {// 设定等待的时间为2秒,如果在指定的时间内,还不能往队列中加入,则返回失败。
                    System.err.println("failed to put data : " + data);
                }
            }
        } catch (InterruptedException e) {// 如果发生了异常就中断这个线程
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    //
    public void stop() {
        isRunning = false;
    }
}

 3. 定义消费者

package com.xqi.p_c;

import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;

/**
 * 消费者
 * 
 * @author mike <br>
 *         2015年7月24日
 */
public class Consumer implements Runnable {

    private BlockingQueue<PCData> queue;
    private static final int SLEEPTIME = 1000;

    public Consumer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }

    public void run() {
        System.out.println("start consumer id = " + Thread.currentThread().getId());
        Random r = new Random();

        try {
            while (true) {
                PCData data = queue.take();// 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
                if (null != data) {
                    int re = data.getData() * data.getData();
                    System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(), data.getData(), re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }

    }

}

 4. 测试主类

package com.xqi.p_c;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 测试主类
 * 
 * @author mike <br>
 *         2015年7月24日
 */
public class PCTest {

    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        // 创建生产者
        Produer p1 = new Produer(queue);
        Produer p2 = new Produer(queue);
        Produer p3 = new Produer(queue);
        // 创建消费者
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);

        // 创建线程池,newCachedThreadPool 定义了短期可重用的线程池
        ExecutorService service = Executors.newCachedThreadPool();

        // 运行生产者和消费者
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);
        
        service.execute(c1);
        service.execute(c2);
        service.execute(c3);

        Thread.sleep(10 * 1000);
        // 终止线程中的while
        p1.stop();
        p2.stop();
        p3.stop();

        Thread.sleep(3000);
        // shutdown() This method does not wait for previously submitted tasks to complete execution.
        // shutdown() 这个方法不会等待任务执行完成。(有说:不再接受新的任务,如果有等待的任务,就执行完成)
        service.shutdown();

        // Attempts to stop all actively executing tasks, halts the
        // processing of waiting tasks, and returns a list of the tasks
        // that were awaiting execution. These tasks are drained (removed)
        // from the task queue upon return from this method.
        // 立即变为shutdown状态,如果有正在执行的任务,尝试停止,并返回未完成的任务列表,然后移除
        // List<Runnable> taskList = service.shutdownNow();

        System.out.println("程序结束!");
    }

}


Ps:感觉shutdown了以后,线程还是没有终止,也进行不了操作??????不知道为什么!


© 著作权归作者所有

共有 人打赏支持
小七酱
粉丝 1
博文 30
码字总数 17079
作品 0
武汉
程序员
私信 提问
Java多线程-工具篇-BlockingQueue

前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利...

空云万里晴
2016/06/16
54
0
[高并发Java 七] 并发设计模式

什么是设计模式 在软件工程中,设计模式(design pattern)是对软件设计中普遍存在(反复出现)的各种问题 ,所提出的解决方案。这个术语是由埃里希·伽玛(Erich Gamma)等人在1990年代从建...

Hosee
2016/02/14
6.7K
0
java中线程队列BlockingQueue的用法

认识BlockingQueue阻塞队列, 顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入...

landebug
2015/07/30
0
0
Java多线程-工具篇-BlockingQueue

前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利...

vshcxl
2016/12/12
5
1
concurrent 包 BlockingQueue

认识队列: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出; 常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类...

这些年了1990
2016/04/18
24
0

没有更多内容

加载失败,请刷新页面

加载更多

Confluence 6 升级中的一些常见问题

升级的时候遇到了问题了吗? 如果你想尝试重新进行升级的话,你需要首先重新恢复老的备份。不要尝试再次对 Confluence 进行升级或者在升级失败后重新启动老的 Confluence。 在升级过程中的一...

honeymoose
36分钟前
0
0
C++随笔(四)Nuget打包

首先把自己编译好的包全部准备到一个文件夹 像这样 接下来新建一个文本文档,后缀名叫.nuspec 填写内容 <?xml version="1.0"?><package xmlns="http://schemas.microsoft.com/packaging/201......

Pulsar-V
今天
2
0
再谈使用开源软件搭建数据分析平台

三年前,我写了这篇博客使用开源软件快速搭建数据分析平台, 当时收到了许多的反馈,有50个点赞和300+的收藏。到现在我还能收到一些关于dataplay2的问题。在过去的三年,开源社区和新技术的发...

naughty
今天
3
0
Python3的日期和时间

python 中处理日期时间数据通常使用datetime和time库 因为这两个库中的一些功能有些重复,所以,首先我们来比较一下这两个库的区别,这可以帮助我们在适当的情况下时候合适的库。 在Python文...

编程老陆
今天
2
0
分布式面试整理

并发和并行 并行是两个任务同时进行,而并发呢,则是一会做一个任务一会又切换做另一个任务。 临界区 临界区用来表示一种公共资源或者说是共享数据,可以被多个线程使用,但是每一次,只能有...

群星纪元
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部