文档章节

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

闪电
 闪电
发布于 2016/08/13 11:53
字数 2542
阅读 30
收藏 0
点赞 1
评论 0

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用。

1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程。BlockingQueue的核心方法有:

boolean add(E e) ,把 e 添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。
boolean offer(E e),表示如果可能的话,将 e 加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
void put(E e),把 e 添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
E poll(long timeout, TimeUnit unit) ,取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
E take() ,取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。
int drainTo(Collection<? super E> c) 和 int drainTo(Collection<? super E> c, int maxElements) ,一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取 数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

注意:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值。

2. BlockingQueue常用的四个实现类

ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2) LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的
3) PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
4) SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

 本文将从JDK源码层面分析对比ArrayBlockingQueue和LinkedBlockingQueue

3. ArrayBlockingQueue源码分析

    ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
     这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

    ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    ArrayBlockingQueue类中定义的变量有:

/** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本文主要讲解put()和take()操作,其他方法类似。

put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。

/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     */
    public void put(E e) throws InterruptedException {
		//不能存放 null  元素
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;	//数组队列
        final ReentrantLock lock = this.lock;
		//加锁
        lock.lockInterruptibly();
        try {
            try {
				//当队列满时,调用notFull.await()方法,使该线程阻塞。
				//直到take掉某个元素后,调用notFull.signal()方法激活该线程。
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
			//把元素 e 插入到队尾
            insert(e);
        } finally {
			//解锁
            lock.unlock();
        }
    }
insert(E e) 方法如下:

/**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void insert(E x) {
        items[putIndex] = x;  
		//下标加1或者等于0
        putIndex = inc(putIndex);
        ++count;  //计数加1
		//若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
		//若没有take()线程陷入阻塞,则该操作无意义。
        notEmpty.signal();
    }
	
	/**
     * Circularly increment i.
     */
    final int inc(int i) {
		//此处可以看到使用了循环队列
        return (++i == items.length)? 0 : i;
    }
take()方法代码如下。take操作和put操作相反,故不作详细介绍。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();  //加锁
        try {
            try {
				//当队列空时,调用notEmpty.await()方法,使该线程阻塞。
				//直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
			//取出队头元素
            E x = extract();
            return x;
        } finally {
            lock.unlock();	//解锁
        }
    }
extract() 方法如下:

/**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }
小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!
4. LinkedBlockingQueue 源码分析

    基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

 LinkedBlockingQueue 类中定义的变量有:

/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);

    /** Head of linked list */
    private transient Node<E> head;

    /** Tail of linked list */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
该类中定义了两个ReentrantLock锁:putLock和takeLock,分别用于put端和take端。也就是说,生成端和消费端各自独立拥有一把锁,避免了读(take)写(put)时互相竞争锁的情况。

/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); //加 putLock 锁
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
			//当队列满时,调用notFull.await()方法释放锁,陷入等待状态。
			//有两种情况会激活该线程
			//第一、 某个put线程添加元素后,发现队列有空余,就调用notFull.signal()方法激活阻塞线程
			//第二、 take线程取元素时,发现队列已满。则其取出元素后,也会调用notFull.signal()方法激活阻塞线程
            while (count.get() == capacity) { 
                    notFull.await();
            }
			// 把元素 e 添加到队列中(队尾)
            enqueue(e);
            c = count.getAndIncrement();
			//发现队列未满,调用notFull.signal()激活阻塞的put线程(可能存在)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
			//队列空,说明已经有take线程陷入阻塞,故调用signalNotEmpty激活阻塞的take线程
            signalNotEmpty();
    }
enqueue(E e)方法如下:

/**
     * Creates a node and links it at end of queue.
     * @param x the item
     */
    private void enqueue(E x) {
        // assert putLock.isHeldByCurrentThread();
        last = last.next = new Node<E>(x);
    }
take()方法代码如下。take操作和put操作相反,故不作详细介绍。
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
dequeue()方法如下:

/**
     * Removes a node from head of queue.
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

小结:take和put操作各有一把锁,可并行读取。


参考地址:

1). Java多线程-工具篇-BlockingQueuehttp://blog.csdn.net/xiaoliang_xie/article/details/6887115

2). Java多线程(五)之BlockingQueue深入分析http://blog.csdn.net/vernonzheng/article/details/8247564

本文转载自:http://blog.csdn.net/xin_jmail/article/details/26157971

共有 人打赏支持
闪电
粉丝 74
博文 390
码字总数 6789
作品 0
海淀
技术主管
源码|并发一枝花之BlockingQueue

今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。 JDK版本:oracle java 1.8.0_102 继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列...

猴子007 ⋅ 2017/10/25 ⋅ 0

源码|并发一枝花之BlockingQueue

今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。 JDK版本:oracle java 1.8.0_102 继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列...

monkeysayhi ⋅ 2017/11/27 ⋅ 0

【死磕Java并发】-----J.U.C之阻塞队列:BlockingQueue总结

原文出处http://cmsblogs.com/ 『chenssy』 经过前面六篇博客的阐述,我想各位应该对阻塞队列BlockingQueue有了较为深入的理解,下面来一个总结,先看整个类图: BlockingQueue BlockingQueu...

chenssy ⋅ 2017/10/04 ⋅ 0

Java集合--阻塞队列(LinkedBlockingQueue)

1. LinkedBlockingQueue 上篇中,说到了ArrayBlockingQueue阻塞队列。在ArrayBlockingQueue中,底层使用了数组结构来实现。 那么,提到数组了就不得不提及链表。作为两对成双成对的老冤家,链...

贾博岩 ⋅ 2017/12/10 ⋅ 0

阻塞队列之LinkedBlockingQueue源码分析

LinkedBlockingQueue是一个基于单向链表实现的可选容量的阻塞队列,队列的头节点是等待时间最长的元素,队列的尾节点是等待时间最短的元素。新元素直接插入到尾节点的后面,成为新的尾节点,...

qq_30572275 ⋅ 05/24 ⋅ 0

并发编程(四)——Java中的阻塞队列

什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可...

whc20011 ⋅ 2016/10/31 ⋅ 0

并发容器与框架——并发容器(二)

1.何为阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:意思是当队列满(无界队列除外)时,队列会阻塞...

江左煤郎 ⋅ 05/06 ⋅ 0

Java中的队列都有哪些,有什么区别。

Java中的队列都有哪些,实际上是问queue的实现有哪些,如:ConcurrentLinkedQueue、LinkedBlockingQueue 、ArrayBlockingQueue、LinkedList。 源于我经历过的一次生产事故,有个服务会收集业...

郑加威 ⋅ 02/27 ⋅ 0

Java集合之队列和栈

Java集合之队列 队列Queue Queue是Java中的一个接口,继承自Collection接口。 offer方法和add方法的区别: offer方法——{@code true} if the element was added to this queue, else {@cod...

秋风醉了 ⋅ 2014/08/09 ⋅ 0

并发十三:并发容器Queue实现分析

Queue J.U.C中分为阻塞队里和非阻塞队列。 阻塞队列在满时进行入列操作会被阻塞,空时进行出列操作会被阻塞,很适合并发编程中最常见的生产者-消费者模式。 非阻塞队使用CAS无锁算法避免锁竞...

wangjie2016 ⋅ 04/14 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

一篇文章学懂Shell脚本

Shell脚本,就是利用Shell的命令解释的功能,对一个纯文本的文件进行解析,然后执行这些功能,也可以说Shell脚本就是一系列命令的集合。 Shell可以直接使用在win/Unix/Linux上面,并且可以调用...

Jake_xun ⋅ 16分钟前 ⋅ 0

大数据工程师需要精通算法吗,要达到一个什么程度呢?

机器学习是人工智能的一个重要分支,而机器学习下最重要的就是算法,本文讲述归纳了入门级的几个机器学习算法,加大数据学习群:716581014一起加入AI技术大本营。 1、监督学习算法 这个算法由...

董黎明 ⋅ 49分钟前 ⋅ 0

Kylin 对维度表的的要求

1.要具有数据一致性,主键值必须是唯一的;Kylin 会进行检查,如果有两行的主键值相同则会报错。 2.维度表越小越好,因为 Kylin 会将维度表加载到内存中供查询;过大的表不适合作为维度表,默...

无精疯 ⋅ 52分钟前 ⋅ 0

58到家数据库30条军规解读

军规适用场景:并发量大、数据量大的互联网业务 军规:介绍内容 解读:讲解原因,解读比军规更重要 一、基础规范 (1)必须使用InnoDB存储引擎 解读:支持事务、行级锁、并发性能更好、CPU及...

kim_o ⋅ 55分钟前 ⋅ 0

代码注释中顺序更改 文件读写换行

`package ssh; import com.xxx.common.log.LogFactory; import com.xxx.common.log.LoggerUtil; import org.apache.commons.lang3.StringUtils; import java.io.*; public class DirErgodic ......

林伟琨 ⋅ 今天 ⋅ 0

linux实用操作命令

参考 http://blog.csdn.net/qwe6112071/article/details/50806734 ls [选项] [目录名 | 列出相关目录下的所有目录和文件 -a 列出包括.a开头的隐藏文件的所有文件-A 同-a,但不列出"."和"...

简心 ⋅ 今天 ⋅ 0

preg_match处理中文符号 url编码方法

之前想过直接用符号来替换,但失败了,或者用其他方式,但有有些复杂,这个是一个新的思路,亲测可用 <?php$str='637朗逸·超速新风王(300)(白光)'; $str=iconv("UTF-8","GBK",$s...

大灰狼wow ⋅ 今天 ⋅ 0

DevOps 资讯 | PostgreSQL 的时代到来了吗 ?

PostgreSQL是对象-关系型数据库,BSD 许可证。拼读为"post-gress-Q-L"。 作者: Tony Baer 原文: Has the time finally come for PostgreSQL?(有删节) 近30年来 PostgreSQL 无疑是您从未听...

RiboseYim ⋅ 今天 ⋅ 0

github太慢

1:用浏览器访问 IPAddress.com or http://tool.chinaz.com 使用 IP Lookup 工具获得github.com和github.global.ssl.fastly.net域名的ip地址 2:/etc/hosts文件中添加如下格式(IP最好自己查一...

whoisliang ⋅ 今天 ⋅ 0

非阻塞同步之 CAS

为解决线程安全问题,互斥同步相当于以时间换空间。多线程情况下,只有一个线程可以访问同步代码。这种同步也叫阻塞同步(Blocking Synchronization). 这种同步属于一种悲观并发策略。认为只...

长安一梦 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部