阅读java.util.concurrent.SynchronousQueue源码Note

原创
2020/06/02 00:32
阅读数 68

java.util.concurrent.SynchronousQueue

同步队列,主要用去生产与消费!
采用CAS方式来实现并发控制,sun.misc.Unsafe

基于公平性做了两种实现,一公平策略,使用队列实现;二非公平策略,使用栈实现。

常用操作

操作  
put 一直阻塞,直至数据被消费
offer 分为两种,一种不阻塞,尝试阿静数据放入队列,无空间时,则直接返回false;一种指定阻塞时间,超时后仍无空间,返回false,否则返回true
take 一直阻塞,直至获取到数据
poll 分为两种,一种不阻塞,尝试从队列中获取数据,未获取到返回null;另一种指定阻塞时间,超时后仍未获取到数据,返回null

底层实现

两种策略的实现,底层均是使用了 java.util.concurrent.SynchronousQueue.Transferer, 此抽象类定义了如何放入数据和从队列中取数据,相当于是传输数据,方法名称也很贴切:transfer.

1.1 首先来看非公平策略的实现,此策略使用栈实现,java.util.concurrent.SynchronousQueue.TransferStack,栈中定义了栈顶节点head,类型为java.util.concurrent.SynchronousQueue.TransferStack.SNode,

1.2 节点包含以下属性:
next:此节点的下一个节点指针
match:此节点匹配的节点,匹配失败时会设置为自身
waiter:此节点的等待线程,阻塞时,使用LockSupport.park此线程,唤醒时unpark此线程。
item: 持有的数据,消费节点的item为空,生产节点的item为实际的数据
mode:节点状态,0-请求数据,代表了消费节点;1-生产数据,代表了生产节点;2-代表了匹配状态,前两个均为未匹配状态。

1.3 校验是否匹配:

static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

1.4 接下来看具体的实现方法

无论是放入数据还是请求数据,只会有三种执行场景。

1.4.1 空栈或者栈内节点类型与当前操作类型一致,则考虑将元素压入栈顶,而后阻塞等待匹配,或者不阻塞直接返回null,或者阻塞指定的时间后匹配到则返回数据,否则返回null。
1.4.2 栈内节点类型与当前操作类型不一致,为互补状态,则可以进行匹配,此时,将直接返回数据,同时弹出栈顶节点,唤醒节点的waiter线程
1.4.3 以上两种均不是,则帮助步骤2匹配,帮助出站,而后再次进入主循环。

E transfer(E e, boolean timed, long nanos) {
            /*
             * Basic algorithm is to loop trying one of three actions:
             *
             * 1. If apparently(显然的) empty or already containing nodes of same
             *    mode, try to push node on stack and wait for a match,
             *    returning it, or null if cancelled.
             *
             * 2. If apparently containing node of complementary(补充) mode,
             *    try to push a fulfilling node on to stack, match
             *    with corresponding waiting node, pop both from
             *    stack, and return matched item. The matching or
             *    unlinking might not actually be necessary because of
             *    other threads performing action 3:
             *
             * 3. If top of stack already holds another fulfilling node,
             *    help it out by doing its match and/or pop
             *    operations, and then continue. The code for helping
             *    is essentially the same as for fulfilling, except
             *    that it doesn't return the item.
             */

            SNode s = null; // constructed/reused as needed
            // e == null 请求数据 ,意即消费者
            // e != null 放入数据至栈,意即:生产者
            int mode = (e == null) ? REQUEST : DATA;
            // put: e false 0 插入元素,无问题则一直阻塞等待
            // offer: e true 0 无论是否有位置,都会立即返回
            // offer: e true xx 插入元素,无位置时,等待指定的时间
            // take: null false 0 若无可用元素,则一直等待,阻塞
            // poll: null true 0 无论是否有元素,都会立即返回;无阻塞
            // poll: null true xx 没有元素时,会等待指定的时间; 有阻塞
            for (;;) {
                SNode h = head;
                /**
                 * 如果是空栈或者栈顶元素与当前操作类型相同,则可能需要进入阻塞等待状态
                 */
                if (h == null || h.mode == mode) {  // empty or same-mode
                    // 空栈 或者 栈顶节点和正在操作的类型是一样的,消费者or生产者
                    // 有同类型的节点,说明有在等待的,设置了超时时间
                    /**
                     * 不可等待
                     */
                    if (timed && nanos <= 0) {      // can't wait
                        //设置了超时,但超时时间小于等于0,说明是不能等待!若timed是false,说明会等待
                        /**
                         * 检查下栈顶元素是否已取消,若取消,则将栈顶元素弹出,并再次进入主循环。
                         * 否则将直接返回null
                         */
                        if (h != null && h.isCancelled())
                            // 将无效的头节点去除,此操作为辅助功能!
                            casHead(h, h.next);     // pop cancelled node
                        else
                            // 如果是空栈或者头节点并未取消,则直接返回null,无论是请求获取数据或者生产数据
                            // 代表未插入成功或者未能获取到数据
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) { //timed肯定是false, 不会存在timed是true,但nanos大于0的情况
                        /**
                         * 代表可阻塞,有可能是一直阻塞或者指定了超时时间
                         * 进入阻塞之前,需先将操作包装为节点并压入栈顶,替换原来的栈顶节点
                         * 压入栈顶成功后,则进入阻塞状态,等待被匹配!构造节点s时,设置s的next为原栈顶节点
                         */
                        //timed=false,意味着可以等待
                        //当是空栈或者操作类型与头结点的类型一致,且可以等待时,则将此操作压入栈顶
                        //将节点压入栈顶
                        // 此时不管timed是true或者false,都需要进行阻塞;无论是插入元素或者获取元素
                        // 无论栈顶是否为空。只要mode是相同的,则代表了无法进行匹配!
                        // 等待被填充,或者说是匹配到。此时会进行自选
                        /**
                         * 返回时,s节点的match必定不为空
                         */
                        SNode m = awaitFulfill(s, timed, nanos);
                        // 返回的是自身,则代表未匹配到
                        /**
                         * 若匹配节点为自身,则代表超时未匹配到或者自旋次数耗尽或者阻塞时被打断,从而匹配失败
                         * 需将s节点弹出,并且辅助去除栈内其他匹配失败的节点
                         */
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        //匹配到了,此时头结点又被压入了另外一个元素,头结点的next指向了s
                        //将头结点指向s的next,意味着:将头结点和s同时弹出
                        /**
                         * 此处判定是为了防止其他线程已帮助弹出节点
                         */
                        if ((h = head) != null && h.next == s)
                            // head 和 s均pop, 将头节点设置为s 的next
                            casHead(h, s.next);     // help s's fulfiller
                        /**
                         * mode 是 request, 则代表是消费数据,需要返回一个数据;s代表消费,s匹配节点m 是生产
                         * mode 是 data, 则代表是生产数据,此时会返回传入的数据;是代表生产,s匹配节点m 是消费
                         *
                         * 进入此if代表了自旋或者阻塞,被匹配!
                         * s是当前请求压入的节点,m是其他请求匹配至当前节点。两者fullfill、互补
                         */
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    //进入此if代表了,头节点的类型和当前请求的操作类型不一样,可以进行互补
                    /**
                     * 头结点已时效,无法继续匹配,将头结点弹出,重新再来一遍
                     */
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    /**
                     * 0000 0010 | 0000 0000 = 0000 0010 = 2
                     * 0000 0010 | 0000 0001 = 0000 0011 = 3
                     * 将s设置为头节点,h为s的next
                     *
                     * 注意此处设置s的mode,isFulfilling为true
                     */
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            // s - m - mn
                            SNode mn = m.next;
                            //m 可能会被其他thread clean
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    //帮助完成互补,而后继续主循环
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

1.5 进入阻塞后,先进行自旋,超过自旋次数后,再进入阻塞状态


SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            /*
             * When a node/thread is about to block, it sets its waiter
             * field and then rechecks state at least one more time
             * before actually parking, thus covering race vs
             * fulfiller noticing that waiter is non-null so should be
             * woken.
             *
             * When invoked by nodes that appear at the point of call
             * to be at the head of the stack, calls to park are
             * preceded by spins to avoid blocking when producers and
             * consumers are arriving very close in time.  This can
             * happen enough to bother only on multiprocessors.
             *
             * The order of checks for returning out of main loop
             * reflects fact that interrupts have precedence over
             * normal returns, which have precedence over
             * timeouts. (So, on timeout, one last check for match is
             * done before giving up.) Except that calls from untimed
             * SynchronousQueue.{poll/offer} don't check interrupts
             * and don't wait at all, so are trapped in transfer
             * method rather than calling awaitFulfill.
             */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 当前线程
            Thread w = Thread.currentThread();
            // 获取自旋次数
            /**
             * 如果允许自旋,可超时(不可阻塞),自旋次数为maxTimedSpins;不可超时(可阻塞),自旋次数为maxUntimedSpins;
             * 否则,自旋次数为0
             *
             * 允许自旋的条件:当前节点是头节点或者头结点为空或当前节点为fullfill状态,则允许自旋
             */
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            /**
             * 返回条件:
             * 1. s.match不为空,意即已匹配到;此匹配到可包包含:到达了超时时间、被打断,此时match指向了this;或正常的匹配到
             * 2. 超时时间已到
             * 3. 阻塞时被唤醒
             *
             * 进入阻塞前会先进行自旋
             */
            for (;;) {
                // 被打断时,也将取消
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                //指定了超时时间,检测超时时间是否到了
                if (timed) {
                    // 设置有超时时间
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        // 设置match 是自身
                        s.tryCancel();
                        continue;
                    }
                }
                //进入阻塞前,会先自旋,自旋次数耗尽后,再进入阻塞
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    // 阻塞
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    // 阻塞指定的时间,超时时间小于此阈值时,会不停的自旋,而不是进入park
                    LockSupport.parkNanos(this, nanos);
            }
        }

1.6 若匹配失败(超过了指定时间),则会将当前节点弹出栈顶,而后再顺便执行下栈内节点的清理操作,此处,只会清理当前节点之上的节点。

/**
         * Unlinks s from the stack.
         * 清理自身,并且和自身相关联 next节点
         */
        void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread

            /*
             * At worst we may need to traverse entire stack to unlink
             * s. If there are multiple concurrent calls to clean, we
             * might not see s if another thread has already removed
             * it. But we can stop when we see any node known to
             * follow s. We use s.next unless it too is cancelled, in
             * which case we try the node one past. We don't check any
             * further because we don't want to doubly traverse just to
             * find sentinel(哨兵).
             */

            /**
             * 栈顶节点1  节点2    节点3   s节点  节点4   节点5
             *   失败     未失败   失败    失败   未失败
             */

            /**
             * 找到一个终止检测节点
             * past为节点4
             */
            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next;

            // Absorb(吸收) cancelled nodes at head
            /**
             * 从栈顶节点开始,依次检查节点,若发现节点已匹配失败,则弹出此节点。
             * 终止条件:遍历到了past节点,或者栈顶节点还未匹配失败
             *
             * 遍历到了节点2, 此时栈顶节点为 节点2
             */
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);

            // Unsplice embedded nodes 非拼接嵌入式节点
            // p初始时为head
            /**
             * 上一个循环,可能遍历到某个节点未匹配失败,则不继续往下遍历了。
             * 此循环将从上述截止处继续向下遍历,因为栈顶节点还未匹配失败,则不能将此节点弹出。因此只能替换栈顶节点的next,
             * 而非直接替换栈顶节点,直至p的next未匹配失败或者遍历到了past节点
             *
             * 将节点2的next指向为 节点4, 意即 past, 也有可能在s节点之前已停止
             */
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }

1.7 来几张图演示下,只说明正常使用!

2.1 公平策略,使用队列实现,基本上与栈操作一致,细微处有所不同。实现类:java.util.concurrent.SynchronousQueue.TransferQueue, 队列节点类型: java.util.concurrent.SynchronousQueue.TransferQueue.QNode

添加节点时,从尾部添加,头部节点优先被匹配!节点包含以下属性:
next: 当前节点的next节点指针。以此属性来判定是否已出队列,若出队列,则将此属性设置为自身节点
item:持有的数据,以此属性来判定是否匹配失败,匹配失败时,此属性设置为自身节点
waiter: 阻塞等待线程
isData: true生产数据;false消费数据

队列包含了头部节点head指针和尾部节点tail指针。此外还有一个用于清理队列元素的属性:cleanMe。

2.2 初始化队列时,构建了一个假的节点,head和tail均指向此节点,此外head和tail相等时,代表了队列为空队列

TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node. 假节点
            head = h;
            tail = h;
        }

2.3 消费数据or生产数据

只有两种操作场景。一队列为空或者队列中节点类型与当前操作类型一致,则根据是否阻塞,决定是直接返回null或者创建节点放入队列,而后自旋,自旋失败,进入等待阻塞!二,与头部节点互补,注意此处有个细节,
互补的是head的next指向的节点,而后唤醒匹配节点的阻塞线程,最后返回数据。

E transfer(E e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * 每种情景下,都会帮助advanced head and tail
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);// e not null : 生产者  e is null : 消费者

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                // h == t 初始化时,头节点和尾节点,赋值了同样的节点,代表了空队列
                // 从尾部添加新的节点
                /**
                 * 空队列或者队列中的节点与当前操作类型相同,均为消费数据或者生产数据
                 */
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail 滞后的尾节点 帮助入队
                        advanceTail(t, tn);
                        continue;
                    }
                    /**
                     * 若设置的超时时间小于等于0,则代表无需进入阻塞状态,此时会直接返回null
                     */
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    //尾节点的next指向新节点
                    if (!t.casNext(null, s))        // failed to link in
                        continue;
                    /**
                     * 经过一系列的重重校验,最终将新创建的s节点设置为尾节点,同时将原尾节点的next指向s节点
                     * 而后,s节点进入阻塞等待匹配状态
                     */
                    // 设置新的尾节点
                    advanceTail(t, s);              // swing tail and wait
                    // 等待互补
                    /**
                     * 返回匹配到信息,若为自身,则代表匹配失败;否则匹配成功!
                     *
                     * 匹配失败时,会执行清理动作
                     */
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    /**
                     * 出队前,item设置为自身,代表s节点不再匹配
                     * s节点的前任出队,同时将t的next指向自身
                     *
                     * s成为头结点
                     */
                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    /**
                     * 注意,此处取的是头结点的next指向节点,非头节点
                     */
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

2.4 进入阻塞等待操作,与栈基本一致. 先自旋,而后进入等待阻塞。超时后将item设置为自身,而后返回!

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            // 因为是公平策略,所以s节点不是头结点的next指向时,不进行自旋
            // 否则,根据是否允许阻塞,设置不同的自旋次数
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

2.5 来个图示

  1. 其他的方法就不用看了。上个类图吧

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部