文档章节

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

闪电
 闪电
发布于 2016/08/13 11:53
字数 2542
阅读 31
收藏 0

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用。

1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程。BlockingQueue的核心方法有:

boolean add(E e) ,把 e 添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。
boolean offer(E e),表示如果可能的话,将 e 加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
void put(E e),把 e 添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
E poll(long timeout, TimeUnit unit) ,取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
E take() ,取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。
int drainTo(Collection<? super E> c) 和 int drainTo(Collection<? super E> c, int maxElements) ,一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取 数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

注意:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值。

2. BlockingQueue常用的四个实现类

ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2) LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的
3) PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
4) SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

 本文将从JDK源码层面分析对比ArrayBlockingQueue和LinkedBlockingQueue

3. ArrayBlockingQueue源码分析

    ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
     这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

    ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    ArrayBlockingQueue类中定义的变量有:

/** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本文主要讲解put()和take()操作,其他方法类似。

put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。

/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     */
    public void put(E e) throws InterruptedException {
		//不能存放 null  元素
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;	//数组队列
        final ReentrantLock lock = this.lock;
		//加锁
        lock.lockInterruptibly();
        try {
            try {
				//当队列满时,调用notFull.await()方法,使该线程阻塞。
				//直到take掉某个元素后,调用notFull.signal()方法激活该线程。
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
			//把元素 e 插入到队尾
            insert(e);
        } finally {
			//解锁
            lock.unlock();
        }
    }
insert(E e) 方法如下:

/**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void insert(E x) {
        items[putIndex] = x;  
		//下标加1或者等于0
        putIndex = inc(putIndex);
        ++count;  //计数加1
		//若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
		//若没有take()线程陷入阻塞,则该操作无意义。
        notEmpty.signal();
    }
	
	/**
     * Circularly increment i.
     */
    final int inc(int i) {
		//此处可以看到使用了循环队列
        return (++i == items.length)? 0 : i;
    }
take()方法代码如下。take操作和put操作相反,故不作详细介绍。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();  //加锁
        try {
            try {
				//当队列空时,调用notEmpty.await()方法,使该线程阻塞。
				//直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
			//取出队头元素
            E x = extract();
            return x;
        } finally {
            lock.unlock();	//解锁
        }
    }
extract() 方法如下:

/**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }
小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!
4. LinkedBlockingQueue 源码分析

    基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

 LinkedBlockingQueue 类中定义的变量有:

/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);

    /** Head of linked list */
    private transient Node<E> head;

    /** Tail of linked list */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
该类中定义了两个ReentrantLock锁:putLock和takeLock,分别用于put端和take端。也就是说,生成端和消费端各自独立拥有一把锁,避免了读(take)写(put)时互相竞争锁的情况。

/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); //加 putLock 锁
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
			//当队列满时,调用notFull.await()方法释放锁,陷入等待状态。
			//有两种情况会激活该线程
			//第一、 某个put线程添加元素后,发现队列有空余,就调用notFull.signal()方法激活阻塞线程
			//第二、 take线程取元素时,发现队列已满。则其取出元素后,也会调用notFull.signal()方法激活阻塞线程
            while (count.get() == capacity) { 
                    notFull.await();
            }
			// 把元素 e 添加到队列中(队尾)
            enqueue(e);
            c = count.getAndIncrement();
			//发现队列未满,调用notFull.signal()激活阻塞的put线程(可能存在)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
			//队列空,说明已经有take线程陷入阻塞,故调用signalNotEmpty激活阻塞的take线程
            signalNotEmpty();
    }
enqueue(E e)方法如下:

/**
     * Creates a node and links it at end of queue.
     * @param x the item
     */
    private void enqueue(E x) {
        // assert putLock.isHeldByCurrentThread();
        last = last.next = new Node<E>(x);
    }
take()方法代码如下。take操作和put操作相反,故不作详细介绍。
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
dequeue()方法如下:

/**
     * Removes a node from head of queue.
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

小结:take和put操作各有一把锁,可并行读取。


参考地址:

1). Java多线程-工具篇-BlockingQueuehttp://blog.csdn.net/xiaoliang_xie/article/details/6887115

2). Java多线程(五)之BlockingQueue深入分析http://blog.csdn.net/vernonzheng/article/details/8247564

本文转载自:http://blog.csdn.net/xin_jmail/article/details/26157971

共有 人打赏支持
闪电
粉丝 74
博文 392
码字总数 6789
作品 0
海淀
技术主管
源码|并发一枝花之BlockingQueue

今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。 JDK版本:oracle java 1.8.0_102 继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列...

猴子007
2017/10/25
0
0
源码|并发一枝花之BlockingQueue

今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。 JDK版本:oracle java 1.8.0_102 继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列...

monkeysayhi
2017/11/27
0
0
聊聊 JDK 阻塞队列源码(ReentrantLock实现)

项目中用到了一个叫做 Disruptor 的队列,今天楼主并不是要介绍 Disruptor 而是想巩固一下基础扒一下 JDK 中的阻塞队列,听到队列相信大家对其并不陌生,在我们现实生活中队列随处可见,最经...

haifeiWu
08/03
0
0
JDK容器学习之Queue:LinkedBlockingQueue

基于链表阻塞队列LinkedBlockingQueue 基于链表的无边界阻塞队列,常用与线程池创建中作为任务缓冲队列使用 I. 底层数据结构 先看一下内部定义,与 做一下对比,顺带看下这两者的区别及不同的...

小灰灰Blog
2017/11/03
0
0
Java集合--阻塞队列(LinkedBlockingQueue)

1. LinkedBlockingQueue 上篇中,说到了ArrayBlockingQueue阻塞队列。在ArrayBlockingQueue中,底层使用了数组结构来实现。 那么,提到数组了就不得不提及链表。作为两对成双成对的老冤家,链...

贾博岩
2017/12/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

SharedPreferences 的使用,commit和apply两个方法的区别

SharedPreferences sp = getSharedPreferences("config",MODE_PRIVATE); //第一个参数:存储文件的名称,第二个参数文件的访问权限,通常MODE_PRIVATE是私有存储 sp.edit().putString("USERNAM...

lanyu96
30分钟前
1
0
02-《Apache Tomcat 9 User Guide》之简介

1.Introduction - 介绍 For administrators and web developers alike, there are some important bits of information you should familiarize yourself with before starting out. This d......

飞鱼说编程
34分钟前
1
0
关于maven的使用,这一篇基本就够了

2.1 关于maven 每一个工具的出现都有其历史意义,而Maven的出现则是开发者对于不同的项目都要有自己的Ant构建文件,而这些文件都各不相同,而且JAR被检入CVS(Concurrent Version System),...

小小明童鞋
36分钟前
16
0
从xtrabackup完整备份恢复单个innodb表

现在大多数同学在线上采取的备份策略都是xtrabackup全备+binlog备份,那么当某天某张表意外的删除那么如何从xtrabackup全备中恢复呢?从mysql 5.6版本开始,支持可移动表空间(Transportable...

IT--小哥
41分钟前
1
0
百度AI攻城狮,用TensorFlow API训练目标检测模型(浣熊超可爱)

今天,人工智能正影响我们生产、生活的方方面面。10月10日,为期三天的2018华为全联接大会在上海拉开帷幕,此次大会以“+智能,见未来”为主题,发布了AI战略及全球领先的全栈全场景AI解决方...

Python唱情歌
41分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部