文档章节

Java并发(四)BlockingQueue的使用

摆渡者
 摆渡者
发布于 2015/10/12 10:57
字数 1222
阅读 634
收藏 5

    wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无届队列,你还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

    如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyAll()相比,则要简单并可靠许多。

    考虑下面这个BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序:

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Toast {
    /**
     * 吐司的状态:
     * DRY: 烘干的
     * BUTTERED: 涂了黄油的
     * JAMMED: 涂了果酱的
     * <p>吐司的状态只能由DRY->BUTTERED->JAMMED转变
     */
    public enum Status {DRY, BUTTERED, JAMMED}
    private Status status = Status.DRY;//默认状态为DRY
    private final int id;
    public Toast(int id) { this.id =  id;}
    public void butter() {status = Status.BUTTERED;}
    public void jam() {status = Status.JAMMED;}
    public Status getStatus() {return status;}
    public int getId() {return id;}
    public String toString() {
        return "Toast id: " + id + ", status: " + status;
    }
}

@SuppressWarnings("serial")
class ToastQueue extends LinkedBlockingQueue<Toast> {}

/**
 * 生产吐司的任务。
 */
class Toaster implements Runnable {
    private ToastQueue toastQueue;
    private int count = 0;
    private Random random = new Random(47);
    public Toaster(ToastQueue queue) {
        this.toastQueue = queue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
                //生产一片吐司,这些吐司是有序的
                Toast toast = new Toast(count++);
                System.out.println(toast);
                //放到toastQueue中
                toastQueue.put(toast);
            }
        } catch (InterruptedException e) {
            System.out.println("Toaster interrupted.");
        }
        System.out.println("Toaster off.");
    }
}

/**
 * 涂黄油的任务。
 */
class Butterer implements Runnable {
    private ToastQueue dryQueue;
    private ToastQueue butteredQueue;
    public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
        this.dryQueue = dryQueue;
        this.butteredQueue = butteredQueue;
    }
    
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个吐司之前会一直阻塞
                Toast toast = dryQueue.take();
                toast.butter();
                System.out.println(toast);
                butteredQueue.put(toast);
            }
        } catch (InterruptedException e) {
            System.out.println("Butterer interrupted.");
        }
        System.out.println("Butterer off.");
        
    }
}

/**
 * 涂果酱的任务。
 */
class Jammer implements Runnable {
    private ToastQueue butteredQueue;
    private ToastQueue finishedQueue;
    public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
        this.butteredQueue = butteredQueue;
    }
    
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个吐司之前会一直阻塞
                Toast toast = butteredQueue.take();
                toast.jam();
                System.out.println(toast);
                finishedQueue.put(toast);
            }
        } catch (InterruptedException e) {
            System.out.println("Jammer interrupted.");
        }
        System.out.println("Jammer off.");
    }
}

/**
 * 吃吐司的人,消费者。
 */
class Eater implements Runnable {
    private ToastQueue finishedQueue;
    private int count = 0;
    public Eater (ToastQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个吐司之前会一直阻塞
                Toast toast = finishedQueue.take();
                //验证取得的吐司是有序的,而且状态是JAMMED的
                if (toast.getId() != count++ || 
                        toast.getStatus() != Toast.Status.JAMMED) {
                    System.out.println("Error -> " + toast);
                    System.exit(-1);
                } else {
                    //吃掉吐司
                    System.out.println(toast + "->Eaten");
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Eater interrupted.");
        }
        System.out.println("Eater off.");
    }
}

public class ToastOMatic {
    public static void main(String[] args) throws Exception {
        ToastQueue dryQueue = new ToastQueue();
        ToastQueue butteredQueue = new ToastQueue();
        ToastQueue finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

执行结果(可能的结果):

Toast id: 0, status: DRY
Toast id: 0, status: BUTTERED
Toast id: 0, status: JAMMED
Toast id: 0, status: JAMMED->Eaten
Toast id: 1, status: DRY
Toast id: 1, status: BUTTERED
Toast id: 1, status: JAMMED
Toast id: 1, status: JAMMED->Eaten
Toast id: 2, status: DRY
Toast id: 2, status: BUTTERED
Toast id: 2, status: JAMMED
Toast id: 2, status: JAMMED->Eaten
Toast id: 3, status: DRY
Toast id: 3, status: BUTTERED
Toast id: 3, status: JAMMED
Toast id: 3, status: JAMMED->Eaten
Toast id: 4, status: DRY
Toast id: 4, status: BUTTERED
Toast id: 4, status: JAMMED
Toast id: 4, status: JAMMED->Eaten
Toast id: 5, status: DRY
Toast id: 5, status: BUTTERED
Toast id: 5, status: JAMMED
Toast id: 5, status: JAMMED->Eaten
Toast id: 6, status: DRY
Toast id: 6, status: BUTTERED
Toast id: 6, status: JAMMED
Toast id: 6, status: JAMMED->Eaten
Toast id: 7, status: DRY
Toast id: 7, status: BUTTERED
Toast id: 7, status: JAMMED
Toast id: 7, status: JAMMED->Eaten
Eater interrupted.
Eater off.
Butterer interrupted.
Toaster interrupted.
Toaster off.
Jammer interrupted.
Jammer off.
Butterer off.

    Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。

© 著作权归作者所有

共有 人打赏支持
摆渡者
粉丝 333
博文 171
码字总数 205876
作品 0
浦东
程序员
私信 提问
Java并发编程利用 Condition 实现阻塞队列

什么是阻塞队列 BlockingQueue 队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,...

行走在旅途中
2017/11/07
0
0
【Java并发专题】27篇文章详细总结Java并发基础知识

努力的意义,就是,在以后的日子里,放眼望去全是自己喜欢的人和事! github:https://github.com/CL0610/Java-concurrency,欢迎题issue和Pull request。所有的文档都是自己亲自码的,如果觉...

你听___
05/06
0
0
Java 并发工具包 java.util.concurrent 用户指南

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。 本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友...

pior
2015/10/26
0
0
聊一聊并发编程的那些事(内含源码及面试题)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/t4i2b10X4c22nF6A/article/details/82889698 导读:之前写了一系列关于并发编程的文章,也对今年的一些大型互...

JAVA高级架构v
09/28
0
0
Java面试:投行的15个多线程和并发面试题

本文由ImportNew -一杯哈希不加盐 翻译自dzone。欢迎加入翻译小组。转载请见文末要求。 多线程和并发问题已成为各种 Java 面试中必不可少的一部分。如果你准备参加投行的 Java 开发岗位面试,...

ImportNew
08/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

[转载]重磅消息:误区陷阱、软件分类和GNU

stallman376032123 Non classé 16 octobre 2018 2 Minutes (注:对原文有精简) GPL、AGPL 并非最严格的许可证,详情请继续阅读本篇文章。可能很多人看到本文好多字,被字数吓得关闭网页,...

ICE冰焰火灵X
15分钟前
0
0
Impala和Hive集成Sentry、Kerberos认证

关于 Kerberos 的安装和 HDFS 配置 kerberos 认证,请参考 HDFS配置kerberos认证。 关于 Kerberos 的安装和 YARN 配置 kerberos 认证,请参考 YARN配置kerberos认证。 关于 Kerberos 的安装和...

hblt-j
24分钟前
1
0
Ubuntu 18.04 PostgreSQL 11 apt 默认安装某些问题解析

首先默认安装软件(本文以PostgreSQL 11.1为例,其他版本类似)。 sudo apt install postgresql-11 等待软件自动安装并完成配置,启动服务。 服务状态如下: vmware@vmware-virtual-machine:...

白豆腐徐长卿
37分钟前
1
0
一步步动手实现高并发的Reactor模型 —— Kafka底层如何充分利用多线程优势去处理网络I/O与业务分发

一、从《Apeche Kafka源码剖析》上搬来的概念和图 Kafka网络采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解Java NIO提供了Reactor模式的API。常见的单线程Jav...

Anur
40分钟前
1
0
数字信号处理各种处理及图象

https://wenku.baidu.com/view/b1bb67f1f90f76c661371a75.html?sxts=1544696459935

whoisliang
44分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部