从ReentrantLock看AbstractQueuedSynchronized源码之Condition条件队列

原创
09/18 10:51
阅读数 8.7K

上一篇:

从ReentrantLock分析AbstractQueuedSynchronized源码

1.示例代码

演示一个生产者消费者的场景,await阻塞,signal唤醒

细节点:

aqs里面锁是什么,简单来说就是state的值,state>0线程持有了锁,state=0线程释放了锁。

aqs里面线程是什么,简单来说就是Node节点,Node节点通过构成链表结构来代表线程的执行的先后顺序,这个链表有双向的,有单向的,其属性prev和next以及全局head,tail属性用于构建双向链表,其nextWaiter以及firstWaiter,lastWaiter构成单向链表,waitStatus的属性用于表示当前线程的等待状态;

调用await的方法的线程,必然持有锁;

生产者put和消费者take的方法使用同一个lock,存在资源竞争;

await方法会阻塞当前线程,signal会唤醒对应的处于条件队列中等待的线程,并将处于条件队列中的的节点移动到双向同步队列里面;

有了这几个概念,有助于理解下面所说的。

final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putIndex, takeIndex, count;

public void put(Object item) throws InterruptedException {
    //这里会将state值修改,state值在aqs中是全局的,不管同步队列还是条件队列都共享一个state
    lock.lock();
    try {
        //items满了,阻塞
        while (count == items.length) {
            //调用await方法的线程必须持有锁
            //思考:这里调用了await方法是否应该释放当前线程的锁呢?因为全局共用一个state值。
            notFull.await();
        }
        items[putIndex] = item;
        if (++putIndex == items.length) {
            putIndex = 0;
        }
        ++count;
        notEmpty.signal();//items中已经有值,通知notEmpty队列消费
    } finally {
        lock.unlock();
    }
}

public Object take() throws InterruptedException {
    lock.lock();
    try {
        //items为空
        while (count == 0) {
            //调用await方法的线程必须持有锁
            //思考:这里调用了await方法是否应该释放当前线程的锁呢?
            notEmpty.await();
        }
        Object item = items[takeIndex];
        if (++takeIndex == items.length) {
            takeIndex = 0;
        }
        --count;
        notFull.signal();//items中已经被消费掉,通知notFull生产
        return item;
    } finally {
        lock.unlock();
    }
}

2.双向同步队列和Condition条件队列

2.1.双向同步队列结构

这是lock加锁和unLock释放锁过程中形成的队列,有一个虚的头节点,虚节点不存储线程等相关信息,nextWaiter属性在这种队列中无实际作用。通过prev和next属性串联节点,thread保存当前线程,waitStatus表示当前线程的等待状态。

2.2.Condition条件队列结构

由示例代码可以看出,每个Condition对象都对应了一个Condition条件队列,而且每个条件队列是相互独立的。

Condition条件队列是一个单向的队列,而且firstWaiter也有实际作用,彼此之前是通过nextWaiter关联的,在Condition条件队列中,prev和next属性也并不会用到。

当当前线程调用了notFull.await()方法的时候,当前线程就会被包装成Node被加入到notFull队列的末尾。

在条件队列中,我们需要关注的是waitStatus的属性为CONDITION,当waitStatus=CONDITION的时候,我们就认为当前线程不需要等待,可以出队了。

2.3.双向同步队列和条件队列的联系

一般情况下,双向同步队列和条件队列是相互独立的。但是,当我们调用条件队列的signal方法的时候,会将条件队列中的处于等待的线程唤醒,被唤醒的线程同样和普通线程一样要去争抢锁资源。如果争抢失败,同样要被加到双向同步队列中去。这个时候,就需要将条件队列中的节点一个个转移到双向同步队列中去了。注意,这个节点从条件队列迁移到双向同步队列是一个一个迁移的。

3.分析ConditionObject

final Condition notFull = lock.newCondition();

该类位于AbstractQueuedSynchronized里面

核心属性:

//条件队列首节点
private transient Node firstWaiter;
//条件队列尾节点
private transient Node lastWaiter;

构造方法:

public ConditionObject() { }

3.1.分析ConditionObject.await()方法

await方法执行过程:

(1)lock加锁;

(2)await方法检测中断状态;

(3)fullyRelease方法释放锁;

(4)节点如果不位于同步队列,挂起线程,等待被唤醒;

(5)节点被signal唤醒或者中断唤醒;

(6)checkInterruptWhileWaiting检查中断模式,根据节点位于同步队列还是条件队列设置中断模式,并将节点加入到同步队列中;

(7)acquireQueued尝试加锁,加锁失败在同步队列中等待被唤醒,挂起线程;

(8)加锁成功持有锁,reportInterruptAfterWait处理中断;

 

能够调用到await方法的线程都已经获取到锁。

调用await方法这里有两种情况:

1.中断发生时,线程还没有被signal过,则线程恢复后,抛出异常;

2.中断发生时,线程已经被signal过,那么线程已经被从condition队列中移动到同步队列中了,那么await方法调用之后,

只是再补一下中断,也就仅仅改变了线程的中断状态。

从await方法我们可以看出来,这个中断在这里仅仅只判断中断状态。我们直到thread.interrupt()方法只改变线程的中断状态为true而并不能真正意义上的停止线程,再配上LockSupport.unpark唤醒线程,就可以使得await方法抛出异常。

await方法退出的时候根据interruptMode来处理这两种情况:

先看interruptMode属性值:

//如果是默认值0,代表线程整个过程中没有发生中断

//=1表示退出await方法时再自我中断,这种模式发生在signal方法调用之后,因为当前节点被加入到了同步队列
//所以还需要中断
private static final int REINTERRUPT =  1;
//=-1说明退出await方法的时候抛出InterruptedException
private static final int THROW_IE    = -1;
public final void await() throws InterruptedException {
    //判断当前线程是否是中断状态,如果是直接抛出异常
    //await方法中发现了线程中断,则抛出异常,后续reportInterruptAfterWait方法也会处理中断模式
    if (Thread.interrupted())
        throw new InterruptedException();
    //新建一个节点并加入到条件队列中
    Node node = addConditionWaiter();
    //完全释放当前线程的锁,等待其他线程争抢锁
    long savedState = fullyRelease(node);
    int interruptMode = 0;//中断属性默认值,代表不需要中断
    //判断节点是否处于同步队列中,返回false则表示在条件队列中
    //在调用signal方法之前,新加的节点都是位于条件队列中
    //调用signal方法会将处于条件队列中的节点迁移到同步队列中
    while (!isOnSyncQueue(node)) {
        //节点位于条件队列则挂起当前线程,等待signal方法唤醒,线程当前被挂起在这个位置
        LockSupport.park(this);
        //线程如何被唤醒:
        //1.线程被signal唤醒后,立刻走到这里
        //3.被其他线程中断+唤醒,thread.interrupt() + LockSupport.unpark()方法;
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //能执行到这里,说明node已经被迁移到同步队列中了,而且当前线程也被唤醒了
    //条件1:尝试加锁
    //条件2:中断属性不为-1
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        //尝试加锁成功或中断属性不为-1,则设置属性为1,代表需要自我中断
        interruptMode = REINTERRUPT;
    //node被加入到同步队列中的时候,并没有设置nextWaiter=null
    //如果有条件队列中还有其他节点,清除条件队列中取消状态的节点
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    //根据中断模式处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

3.1.1.分析ConditionObject.addConditionWaiter方法

//创建一个新节点并将它加入到条件队列中,并返回该节点
//执行过程中剔除了已取消的节点
private Node addConditionWaiter() {
    //获取条件队列最后一个节点
    Node t = lastWaiter;
    //条件1:如果t!=null说明条件队列中已经有Node节点了
    //条件2:t.waitStatus != Node.CONDITION成立,说明当前尾节点取消了等待
    //那么我们新加的节点就不应该在当前尾节点的后面了
    if (t != null && t.waitStatus != Node.CONDITION) {
        //剔除所有取消状态的节点
        unlinkCancelledWaiters();
        //更新临时变量的值,可能因为上一个方法改变尾节点
        t = lastWaiter;
    }
    //创建一个新节点并设置waitStatus=CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //如果没有最终节点
    if (t == null)
        //设置第一个节点为当前节点
        firstWaiter = node;
    else
        //如果有最终节点,设置当前节点的后续节点为新创建的节点
        t.nextWaiter = node;
    //更新最终节点为当前节点
    lastWaiter = node;
    return node;
}

3.1.2.分析unlinkCancelledWaiters方法

//剔除取消等待的节点
private void unlinkCancelledWaiters() {
    //获取首节点
    Node t = firstWaiter;
    Node trail = null;
    //循环遍历链表
    while (t != null) {
        //获取下一个节点
        Node next = t.nextWaiter;
        //条件成立,说明当前节点取消了等待
        if (t.waitStatus != Node.CONDITION) {
            //将首节点的的nextWaiter设置为空,断链操作,帮助gc
            t.nextWaiter = null;
            //如果链表还没有找到正常状态的节点
            if (trail == null)
                //将next节点设置为首节点
                firstWaiter = next;
            else
                //让上一个正常节点指向取消等待节点的下一个节点
                trail.nextWaiter = next;
            //如果不存在下一个节点,当前尾节点设置为正常节点
            if (next == null)
                lastWaiter = trail;
        }
        //条件不成立说明当前t节点是正常节点
        else
            //给正常节点赋值,这里就找出了所有的正常节点
            trail = t;
        //循环继续向下遍历
        t = next;
    }
}

3.1.3.分析ConditionObject.fullRelease方法

//完全释放锁,当failed=true的时候,说明当前线程未持有锁调用了await方法
//调用await方法必须持有锁
//假如failed=true,finally中会将加入到队列中的节点状态设置为取消状态
//后续节点入队的过程会调用unlinkCancelledWaiters方法清除取消状态的节点
//释放锁成功后返回当前锁的state的值
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //获取当前线程state值的总值
        int savedState = getState();
        //直接调用release方法释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

3.1.4.分析ConditionObject.isOnSyncQueue方法

判断节点是否处于同步队列中,前面已经说过,当调用signal方法的时候,可能会将条件队列中的节点迁移到同步队列中。

final boolean isOnSyncQueue(Node node) {
    //当前节点的状态为CONDITION,说明处于条件队列中
    //当前节点的前驱节点为空,说明处于条件队列中,因为双向同步队列存在虚的首节点
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //当前节点的后续节点不为空,说明处于同步队列中,因为条件队列的prev和next属性为空
    if (node.next != null)
        return true;
    //如果上面条件都不成立,说明当前节点满足成为同步队列节点的属性,即prev和next属性不为空
    //循环链表从后向前遍历,看是否能找到当前节点,找到则返回true
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

3.1.5.分析checkInterruptWhileWaiting方法

该方法返回线程的中断的模式

private int checkInterruptWhileWaiting(Node node) {
    //Thread.interrupted返回当前线程的中断状态并将中断状态清除
    //被唤醒的线程当前中断状态还是中断,需要清除
    //之后调用transferAfterCancelledWait方法判断节点是由signal唤醒还是中断唤醒
    //位于condition队列说明是中断唤醒,然后中断模式返回THROW_IE
    //位于同步队列则说明是signal唤醒,返回中断模式REINTERRUPT
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

3.1.6.分析transferAfterCancelledWait方法

该方法可以判断当前节点是否位于条件队列,位于条件队列,则需要将节点加入到同步队列,并返回true;

如果位于同步队列则返回false;

final boolean transferAfterCancelledWait(Node node) {
    //条件成立,说明当前node的waitStatus值为CONDITION,当前node位于条件队列中
    //为什么会出现这种情况,被signal方法唤醒后的线程不是在同步队列中了吗?
    //如果我们是通过中断唤醒,那么这里还是需要将
    //节点加入到同步队列中的
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //中断唤醒的node也会被加到阻塞队列中
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

3.1.7.分析reportInterruptAfterWait方法

根据中断模式处理中断

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();//线程自我中断,仅仅改变了中断状态
}

3.2.分析ConditionObject.signal方法

public final void signal() {
    //是否独占锁线程,不是则抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    //条件队列有node节点,则进行条件队列迁移到同步队列的操作。
    if (first != null)
        doSignal(first);
}
//将条件队列中所有节点迁移到同步队列中
private void doSignal(Node first) {
    do {
        //firstWaiter = first.nextWaiter,处理完当前节点,则让firstWaiter指向下一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            //条件队列为空,则更新lastWaiter为空
            lastWaiter = null;
        
        //如果下一个节点为空,则出while循环
        first.nextWaiter = null;
        //while循环迁移某个节点成功和条件队列为空才会退出循环
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    //通过CAS操作修改当前条件队列中节点状态为0,因为节点要被迁移到同步队列了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //将节点加入到同步队列尾节点并返回其前一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //在同步队列中,waitStatus只有1,0,-1三种状态,大于0说明前驱节点是取消状态
    //如果前驱节点是取消状态,则唤醒当前线程
    //如果前驱节点不是取消状态,则设置前驱节点状态为-1,代表需要唤醒后续节点
    //如果compareAndSetWaitStatus(p, ws, Node.SIGNAL)返回false,说明当前node是
    //lockInterrupt入队的node,是会响应中断的,如果外部线程中断该node,前驱node会
    //将节点状态修改为取消状态,并执行出队逻辑
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
展开阅读全文
打赏
1
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
1
分享
返回顶部
顶部