阅读java.util.concurrent.PriorityBlockingQueue源码Note

原创
2020/05/03 22:40
阅读数 73

java.util.concurrent.BlockingQueue

在Queue的基础增加额外的功能:遍历队列时,若无元素则阻塞等待;插入元素时,无额外的空间则等待空间释放。

其方法可分为四种形式:根据对相同操作(对操作不能立即满足)的不同处理结果来分为以下四种

  1. 抛出异常
  2. 返回特定值,不同的方法返回的值不同
  3. 阻塞,直到条件满足
  4. 阻塞一段时间,而后再放弃
Summary of BlockingQueue methods
  Throws exception Special value Blocks Times out
Insert {@link #add add(e)} {@link #offer offer(e)} {@link #put put(e)} {@link #offer(Object, long, TimeUnit) offer(e, time, unit)}
Remove {@link #remove remove()} {@link #poll poll()} {@link #take take()} {@link #poll(long, TimeUnit) poll(time, unit)}
Examine {@link #element element()} {@link #peek peek()} not applicable not applicable
  1. 不允许插入Null, add \offer \ put 均应抛出异常
  2. 设计之初是为了方便:生产者-消费者队列
  3. 线程安全;但对于批量操作,比如addAll containsAll retainAll removeAll等,是未做要求的,除非实现类做了特殊说明。
  4. 内存可见性:Happen-before原则,

    Memory consistency effects: As with other concurrent
    collections, actions in a thread prior to placing an object into a
    {@code BlockingQueue}
    happen-before
    actions subsequent to the access or removal of that element from
    the {@code BlockingQueue} in another thread.

java.util.concurrent.PriorityBlockingQueue

阻塞优先级队列

  1. 功能同java.util.PriorityQueue
  2. 无容量限制,除非是资源耗尽,抛出OutOfMemory
  3. 不允许插入Null
  4. 若队列依赖元素自身的排序规则,则不允许插入非Comparable的元素
  5. iterator遍历时,不保证顺序。若要一致的顺序,需自行toArray后排序,再进行遍历
  6. priority相同的元素,顺序不固定。
  7. 每个public的方法均使用内部的lock来实现并发控制
  8. 所有的公共方法均是使用lock锁实现并发控制
  9. take和带有超时时间的poll,均使用了lock和notEmpty(condition)实现。
  10. 在构造器中,会对lock和notEmpty初始化,condition在等待时会释放锁,被唤醒时会重新获取锁!
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
  1. 在获取lock后,若支持阻塞,则应使用lockInterruptibly来获取锁!
  2. 四个疑问
    1. 使用SortedSet初始化时,为何不需要重新堆化呢?
    2. 初始的堆化时,为何指定了从half = (n >>> 1) - 1;开始呢?
    3. siftDown时,while的停止条件是 k < (n >>> 1)
    4. removeAt方法中,最后为何要判断array[i] == moved???

从Collection初始化一个PriorityBlockingQueue

    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            // sorted set 已经是堆化的???? 尤其是toArray()方法返回的数组
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                // 只相信 PriorityBlockingQueue 返回的数组是 堆化的。
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        // this.comparator != null: comparator不允许null
        // n == 1 ???? 兼容历史?
        // PriorityQueue也是如此的判断条件
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    //不允许存储Null
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }

从Collection初始化一个PriorityQueue

    public PriorityQueue(Collection<? extends E> c) {
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            initElementsFromCollection(ss);
        }
        else if (c instanceof PriorityQueue<?>) {
            PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            initFromPriorityQueue(pq);
        }
        else {
            this.comparator = null;
            initFromCollection(c);
        }
    }
    //复制底层数组和size,并重新进行堆化
    private void initFromCollection(Collection<? extends E> c) {
        initElementsFromCollection(c);
        heapify();
    }

    //不需要重新 堆化
    private void initElementsFromCollection(Collection<? extends E> c) {
        Object[] a = c.toArray();
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, a.length, Object[].class);
        int len = a.length;
        // 此处未能理解。。。。为何是在这种条件下进行校验空元素
        if (len == 1 || this.comparator != null)
            for (int i = 0; i < len; i++)
                if (a[i] == null)
                    throw new NullPointerException();
        this.queue = a;
        this.size = a.length;
    }
    // 只有是从另外一个PriorityQueue初始化时,才不需要重新堆化
    private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
        if (c.getClass() == PriorityQueue.class) {
            this.queue = c.toArray();
            this.size = c.size();
        } else {
            initFromCollection(c);
        }
    }

  1. 对于集合作为参数的构造器,其中有一个疑问:当集合类型为SortedSet时,是不需要重新堆化的。难道SortedSet底层的存储结构的存储顺序是和PriorityQueue是一样的逻辑? toArray返回的数组顺序满足PriortyQueue的要求,意即:权重最小的在最前面,父节点和子节点的位置关系满足如下要求:

Priority queue represented as a balanced binary heap: the two
children of queue[n] are queue[2n+1] and queue[2(n+1)]. The
priority queue is ordered by comparator, or by the elements'
natural ordering, if comparator is null: For each node n in the
heap and each descendant d of n, n <= d. The element with the
lowest value is in queue[0], assuming the queue is nonempty.

  1. 添加元素时的扩容过程

在扩容的分配空间阶段会释放锁,分配完成后再重新获取锁。在分配空间时也需要对并发做控制,为了不影响其他的操作,此处使用了UNSAFE来实现并发控制。

UNSAFE简介


    /*
     * The implementation uses an array-based binary heap, with public
     * operations protected with a single lock. However, allocation
     * during resizing uses a simple spinlock (used only while not
     * holding main lock) in order to allow takes to operate
     * concurrently with allocation.  This avoids repeated
     * postponement of waiting consumers and consequent element
     * build-up. The need to back away from lock during allocation
     * makes it impossible to simply wrap delegated
     * java.util.PriorityQueue operations within a lock, as was done
     * in a previous version of this class. To maintain
     * interoperability, a plain PriorityQueue is still used during
     * serialization, which maintains compatibility at the expense of
     * transiently doubling overhead.
     * 此队列实现使用了基于数组的二元堆,并用单一的锁来实现公共并发操作的保护!但是,在扩容过程中
     * 用了一个简单的自旋锁(仅在没有获取主锁时使用),从而实现了在扩容的过程中也能允许take并发操作。
     * 这样避免了重复的推迟等待的消费者以及后续元素的建立。在扩容期间,退出锁的动作,不能再简单的在获取锁的情况下包装委托至
     * java.util.PriorityQueue操作,这也是上个版本的实现。为了兼容,在序列化时,仍然使用了一个简单的PriorityQueue, 在保持
     * 兼容性的同时,暂时增加了一倍的开销!
     */
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        /**
        * // 该方法用户比较及替换值
          // 第一个参数为要替换的对象本身,第二个参数为值的内存地址
          // 第三个参数为变量的预期值,第四个参数为变量要换的值
          // 如果变量目前的值等于预期值(第三个参数),则会将变量的值换成新值(第四个参数),返回 true
          // 如果不等于预期,则不会改变,并返回 false
        *
        */
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                //queue 可能已变更
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // 释放资源;重新获取CPU资源
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        //再次检查queue == array, 必须保证queue是最新的
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }    
  1. 两种方式来平衡堆,用于不同的场景!

出队时,头部节点移除,根节点空缺,此时将最后一个元素,放置于根节点,不停的下沉,直至比其子节点小或者相等或者变为叶
子节点;
入队时,将元素直接插至最后一个元素的后面,不停的与父节点进行对比,冒泡;直至比其父节点大或者相等或者变为根节点。
在其上两种方法中,都需要根据当前节点获取父节点或者子节点的位置;
当前节点n的子节点:2n+1,2n+2
当前节点n的父节点:(n-1)/2

  1. 初始化建立二元堆的过程
    //初始化堆
    private void heapify() {
        Object[] array = queue;
        int n = size;
        // 为何从此处开始呢?代表什么意义?
        int half = (n >>> 1) - 1;
        Comparator<? super E> cmp = comparator;
        if (cmp == null) {
            for (int i = half; i >= 0; i--)
                siftDownComparable(i, (E) array[i], array, n);
        }
        else {
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
        }
    }
    //将元素x插入数组array的位置k,数组大小为n
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            //保证为非叶子节点,如果是叶子节点,则停止循环
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
  1. take方法,没有元素时会被阻塞;使用了lock和lock.condition来实现阻塞!
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //可被打断,要么获取锁,要么被打断;否则将一直处于休眠状态,等待获取锁
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                // 释放锁资源,被唤醒时需重新获取锁。 被打断或者接到信号,否则将一直等待
                //signal, signAll, interupte, spurious wakeup
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

  1. poll有两种方法,区别在于是否会返回null。 空参数的poll,在队列为空时会立即返回null;支持超时时间的poll,允许等待超时时间。
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }
  1. 移除i位置的元素
    /**
     * Removes the ith element from queue.
     */
    private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        if (n == i) // removed last element
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            //节点指针相同 ???
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }

  1. iterator方法,遍历时是遍历的一份array备份;但在移除元素时,会移除真正的array里的元素。iterator本身拷贝的数组不会删除元素。
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部