07.JUC 锁 - AQS - Condition
07.JUC 锁 - AQS - Condition
夕阳晒裤衩 发表于12个月前
07.JUC 锁 - AQS - Condition
  • 发表于 12个月前
  • 阅读 17
  • 收藏 0
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

##基本概念

Condition ,即条件(也称为条件队列或条件变量)。它主要是为了在 JUC 框架中提供和 Java 传统的监视器风格的 wait、notify、notifyAll 方法类似的功能。

Condition 自己也维护了一个队列,该队列的作用是维护一个等待 signal 信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个。将设有线程 1、2,下面来具体过程。

线程1:

  • 线程1 调用 reentrantLock.lock时,持有锁。

  • 线程1 调用 await 方法,进入[条件等待队列],同时释放锁。

  • 线程1 获取到线程2 signal 信号,从 [条件等待队列] 进入 [同步等待队列]。

线程2:

  • 线程2 调用 reentrantLock.lock时,由于锁被线程1 持有,进入 [同步等待队列]。

  • 由于线程1 释放锁,线程2 从 [同步等待队列] 移除,获取到锁。线程2 调用 signal 方法,导致线程 1 被唤醒。

  • 线程2 调用 reentrantLock.unlock 。线程1 获取锁,继续循环。


##条件等待队列

条件等待队列,指的是 Condition 内部自己维护的一个队列,不同于 AQS 的[同步等待队列]。它具有以下特点:

  • 要加入[条件等待队列]的节点,不能在 [同步等待队列]。
  • 从 [条件等待队列] 移除的节点,会进入[同步等待队列]。
  • 一个锁对象只能有一个[同步等待队列],但可以有多个[条件等待队列]。

这里以 AbstractQueuedSynchronizer 类(AQS)的内部类 ConditionObject 为例(该类是 Condition 的实现类)来分析下它的具体实现过程。

首先来看该类内部定义的几个成员变量:

private transient Node firstWaiter;
private transient Node lastWaiter;

它采用了 AQS 的 Node 节点构造,并定义了两个成员变量:firstWaiter、lastWaiter ,类似同步等待队列中 head、tail。说明在 ConditionObject 内部也维护着一个自己的等待队列。目前可知它的结构如下:

输入图片说明

不同于 AQS 等待队列的双向链表结构,条件等待队列的结构是单向链表。通过 Node 的 nextWaiter 来指向下一个节点。


###入队操作

Condition 的入队操作表示将节点添加进[条件等待队列]。该过程通过 AQS.ConditionObject 类 的addConditionWaiter 方法来完成。

具体过程如下:

private Node addConditionWaiter() {
	
	// 判断[条件等待队列]是否为空
	Node t = lastWaiter;
	if (t != null && t.waitStatus != Node.CONDITION) {
		// 清空[条件等待队列]中节点状态不为 CONDITION 的节点
		unlinkCancelledWaiters();
		t = lastWaiter;
	}

	// 创建节点并添加进条件等待队列
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null) {
		firstWaiter = node;
	} else {
		t.nextWaiter = node;
	}
	lastWaiter = node;
	return node;
}

private void unlinkCancelledWaiters() {
	Node t = firstWaiter;
	Node trail = null;
	
	// 条件等待队列不为空,则从头节点开始清除
	while (t != null) {
		Node next = t.nextWaiter;
		
		if (t.waitStatus != Node.CONDITION) {
			t.nextWaiter = null;
			
			// 第一次循环 trail 为空
			if (trail == null) {
				firstWaiter = next;
			} else {
				// 这里 trail 表示上一次循环的节点,即当前 t 的前节点
				trail.nextWaiter = next;
			}
			
			if (next == null) {
				lastWaiter = trail;
			}
		} else {
			trail = t;
		}
		
		t = next;
	}
}

分析代码可知,在 ConditionObject 中虽然也是采用链表构造,但是它的节点构造与 AQS 的等待队列存在一些差异,如下图所示:

输入图片说明

  • t ,即 thread,表示节点存放的线程
  • w,即 waitStatus,表示节点等待状态。[条件等待队列]中的节点[等待状态]都是 CONDITION,否则会被清除。
  • nextWaiter ,表示后指针,与 AQS 的同步等待队列不同,在同步等待队列中表示 mode,即节点的模式。

再来看看它添加过程:

输入图片说明


###2.出队操作

// 设置新的头节点
if ((firstWaiter = first.nextWaiter) == null) {
	lastWaiter = null;
}

// 将旧的头节点从[条件等待队列]中移除
first.nextWaiter = null;

整个过程如下:

输入图片说明


##await

public final void await() throws InterruptedException {
	if (Thread.interrupted()) {
		// 抛出异常...
	}

	// 创建包含当前线程的节点并添加到[条件等待队列]
	Node node = addConditionWaiter();

	// 释放锁的所有重入计数,失败则中断线程,并将节点的状态置为 CANCELD
	long savedState = fullyRelease(node);
	int interruptMode = 0;

	// 判断该节点是否在[同步等待队列]
	while (!isOnSyncQueue(node)) {
		// 不在的话,则线程进入阻塞状态
		LockSupport.park(this);

		// 表示线程被唤醒的操作:确定中断模式并退出循环
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
			break;
		}
	}

	// 成功获取独占锁后,并判断  interruptMode 的值
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
		interruptMode = REINTERRUPT;
	}


	if (node.nextWaiter != null) {
		// 清除条件等待队列上节点状态不为 CONDITION 的节点
		unlinkCancelledWaiters();
	}

	if (interruptMode != 0) {
		// 根据 interruptMode 作出对应的动作
		// 若为 THROW_IE 则抛出异常中断线程
		// 若为 REINTERRUPT 则设置线程中断标记位
		reportInterruptAfterWait(interruptMode);
	}
}

###1.fullyRelease

该方法表示释放独占锁的所有重入计数。

final int fullyRelease(Node node) {
	boolean failed = true;
	try {
		// 释放独占锁
		int savedState = getState();
		if (release(savedState)) {
			failed = false;
			return savedState;
		} else {
			// 失败则中断该线程
			throw new IllegalMonitorStateException();
		}
	} finally {
		// 并将该节点的状态置为 CANCELD
		if (failed){
			node.waitStatus = Node.CANCELLED;
		}	
	}
}

###2.isOnSyncQueue

该方法表示判断节点是否在同步等待队列上。

final boolean isOnSyncQueue(Node node) {
	// 当 node 的状态为 CONDITION 或同步等待队列为空,则返回 false
	if (node.waitStatus == Node.CONDITION || node.prev == null){
		return false;
	}
	
	// 后继节点不为空,说明该节点肯定在同步等待队列中
	if (node.next != null){
		return true;
	}
		
	// 查询同步等待队列的末尾节点
	return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
	Node t = tail;
	// 从同步等待队列的尾节点往前开始查找
	for (;;) {
		if (t == node){
			return true;
		}
		if (t == null){
			return false;
		}
		t = t.prev;
	}
}

###3.checkInterruptWhileWaiting

该过程表示

// 在退出等待时重新中断
private static final int REINTERRUPT =  1;

// 在退出等待时抛出异常
private static final int THROW_IE    = -1;

private int checkInterruptWhileWaiting(Node node) {
	// 判断线程的中断标记位,若为 true,则再选择对应的动作;若为 false 返回 0;
	return Thread.interrupted() ? 
		(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
	// 修改节点的等待状态,并将其加入[同步等待队列]
	if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
		enq(node);
		return true;
	}
	
	// 直到节点在[同步等待队列]后退出
	while (!isOnSyncQueue(node)){
		Thread.yield();
	}	
	return false;
}

###4.unlinkCancelledWaiters

该方法用于清除条件等待队列上节点状态不为 CONDITION 的节点

private void unlinkCancelledWaiters() {
	Node t = firstWaiter;
	Node trail = null;
	while (t != null) {
		Node next = t.nextWaiter;
		if (t.waitStatus != Node.CONDITION) {
			t.nextWaiter = null;
			if (trail == null){
				firstWaiter = next;
			}else{
				trail.nextWaiter = next;
			}
				
			if (next == null){
				lastWaiter = trail;
			}
		} else{
			trail = t;
		}
		t = next;
	}
}

###5.reportInterruptAfterWait

该操作是根据中断模式作出相对于的动作

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
	if (interruptMode == THROW_IE){
		throw new InterruptedException();
	}else if (interruptMode == REINTERRUPT){
		selfInterrupt();
	}
}

##signal

public final void signal() {
	// 该线程是否持有独占锁
	if (!isHeldExclusively()) {
		// 抛出异常...
	}

	// 唤醒[条件等待队列]中的第一个节点
	Node first = firstWaiter;
	if (first != null) {
		doSignal(first);
	}
}
  • doSignal
private void doSignal(Node first) {
	do {
		// 设置新的头节点,并将节点从[条件等待队列]中移除
		if ((firstWaiter = first.nextWaiter) == null) {
			lastWaiter = null;
		}
		first.nextWaiter = null;

		// 直到将该节点加入[同步等待队列]或[条件等待队列]为空时跳出循环
	} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
  • transferForSignal
final boolean transferForSignal(Node node) {
	
	// 若不能修改节点状态,说明该节点已被取消
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
		return false;
	}
		
	// 将节点添加进[同步等待队列]
	Node p = enq(node);
	int ws = p.waitStatus;

	//如果该节点的状态为 cancel 或者修改waitStatus失败,则直接唤醒。
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
		LockSupport.unpark(node.thread);
	}
		
	return true;
}
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 2
博文 15
码字总数 18485
×
夕阳晒裤衩
Thanks
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: