java.util.concurrent.locks.LockSupport
private static final sun.misc.Unsafe UNSAFE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}
sun.misc.Unsafe#getUnsafe
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”
这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的
注意,unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行
这一点,也恰巧是park/unpark的灵活的地方。可以解决多种场景应用。
有点类似,容量为1的信号量
一个线程它有可能在别的线程unPark之前,或者之后,或者同时调用了park,那么因为park的特性,它可以不用担心自己的park的时序问题,否则,如果park必须要在unpark之前,那么给编程带来很大的麻烦!!
Java线程对象中(vm中对应的Java线程对象,C++),有一个Parker类
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}
pthread_mutex_t
- Posix的mutex
pthread_cond_t
- Posix的condition
_counter
volatile
- 记录“许可”数量
- 在
park
时,尝试直接拿到“许可”_counter > 0
,则更新为0,并返回
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
利用了原子化更新
如果不成功:
ThreadBlockInVM tbivm(jt);
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
则更新_counter
为0,并加上互斥锁
再需要判断是否需要等待时间
if (time == 0) {
status = pthread_cond_wait (_cond, _mutex) ;
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
OrderAccess::fence();
利用pthread_cond_wait
进行等待。 在pthread_mutex_unlock
后,断言检查status状态
unpark时,如果_counter
为1,则释放互斥锁,再放回。 如果为0,说明之前线程是占用“许可”的。那就有可能其他线程正在等待。 所以在0的时候,还需要额外唤醒在等待的线程pthread_cond_signal
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1
值得注意的是在park函数里,调用pthread_cond_wait时,并没有用while来判断,所以posix condition里的"Spurious wakeup"一样会传递到上层Java的代码里。
if (time == 0) {
status = pthread_cond_wait (_cond, _mutex) ;
}
Java文档中
- Some other thread invokes unpark with the current thread as the target; or
- Some other thread interrupts the current thread; or
- The call spuriously (that is, for no reason) returns.
mutex和condition实际上是绑定在一起的,一个condition只能对应一个mutex。在Java的代码里,Condition对象只能通过lock.newCondition()的函数来获取。
- 伪唤醒
- Spurious wakeup
所谓的spurious wakeup,指的是一个线程调用pthread_cond_signal(),却有可能不止一个线程被唤醒。为什么会出现这种情况?wiki和其它的一些文档都只是说在多核的情况下,简化实现允许出现这种spurious wakeup
先unlock再signal,这有个好处,就是调用enqueue_msg的线程可以再次参与mutex的竞争中,这样意味着可以连续放入多个消息,这个可能会提高效率。类似Java里ReentrantLock的非公平模式。 网上有些文章说,先singal再unlock,有可能会出现一种情况是被singal唤醒的线程会因为不能马上拿到mutex(还没被释放),从而会再次休眠,这样影响了效率。
从而会有一个叫“wait morphing”优化,就是如果线程被唤醒但是不能获取到mutex,则线程被转移(morphing)到mutex的等待队列里。
引用: