【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的

原创
2019/06/01 19:19
阅读数 4.2K

一、了解 AbstractQueuedSynchronizer(AQS)

1、AQS 简介

AbstractQueuedSynchronizer 是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS 息息相关。

借用一下源码中的说法,AbstractQueuedSynchronizer 基于一个 FIFO 队列 提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers 的底层实现,它使用了一个原子int private volatile int state; 来表示当前状态。当在 AQSacquired (获取资源) 或被 release (释放资源)时,需要依据这个 state 来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)

2、实现最简单的 AQS

我们来看一个最简单的例子,我们有一个类 Sync 继承了 AbstractQueuedSynchronizer,并重写了其 tryAcquiretryRelease 方法。实现非常简单,我们通过调用父类的 compareAndSetState() 以及 setState() 来完成,简单来说(不是特别准确),就是 tryAcquire 返回 true,代表获取锁成功,否则就会阻塞。而 tryRelease 则负责锁的释放。

在例子中:将 state 设置为 100 代表当前状态为无锁,1 则代表已经有某个线程获取了该锁。当然这个 state 表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100 还是 200 还是 -100,都没有什么关系。

/**
 * Created by Anur IjuoKaruKas on 2019/5/7
 */
public class Mutex extends AbstractQueuedSynchronizer {

    public static class Sync extends AbstractQueuedSynchronizer {

        public Sync() {
            setState(100); // set the initial state, being unlocked.
        }

        @Override
        protected boolean tryAcquire(int ignore) {
            boolean result = compareAndSetState(100, 1);
            print("尝试获取锁" + (result ? "成功" : "失败"));
            return result;
        }

        @Override
        protected boolean tryRelease(int ignore) {
            setState(100);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(0);
    }

    public void unLock() {
        sync.release(0);
    }

    public static void main(String[] args) throws InterruptedException {
        Mutex mutex = new Mutex();
        mutex.lock();

        Thread thread = new Thread(() -> {
            print("调用 mutex.lock() 之前");
            mutex.lock();
            print("调用 mutex.lock() 之后");
        });

        thread.start();

        print("main 线程 Sleep 之前");
        Thread.sleep(5000);
        print("main 线程 Sleep 之后");
        mutex.unLock();
    }

    public static void print(String print) {
        System.out.println(String.format("时间 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print));
    }
}

========================================= 输出
时间 - Fri May 24 15:44:19 CST 2019		Thread[main,5,main]		尝试获取锁成功
时间 - Fri May 24 15:44:19 CST 2019		Thread[main,5,main]		main 线程 Sleep 之前
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之前
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 15:44:24 CST 2019		Thread[main,5,main]		main 线程 Sleep 之后
时间 - Fri May 24 15:44:24 CST 2019		Thread[Thread-0,5,main]		尝试获取锁成功
时间 - Fri May 24 15:44:24 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之后

我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock(); 释放锁之前,子线程是一直阻塞的,调用 mutex.lock() 之后 的日志输出发生在 main 线程 Sleep 之后 之后。

通过重写 tryAcquiretryRelease 方法,以及调用 acquirerelease 方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS 一个简单的了解。

看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire 控制的,那和 state 又有什么关系呢? 我们完全可以定义一个自定义变量,比如 signfalse 代表无锁,true 代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSetCAS操作了。

3、AQS 绕不过的话题: CAS Compare And Swap

前面说到,我们暂时认为 :tryAcquire 返回 true,代表获取到锁,反之只要 tryAcquire 返回 flase,线程就会被阻塞(不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:


  • ※ 无论何时,都只能有一个线程 tryAcquire 成功,且在某个线程 tryAcquire 成功之后,并在其 release 释放锁之前,任何线程进行 tryAcquire 都将返回 false

是的,就是并发问题!

下面这个例子我们简单使用一个自定义变量 sign 来实现 tryAcquire,看看会发生什么:

        private boolean sign;

        @Override
        protected boolean tryAcquire(int ignore) {
            boolean result = false;
            if (!sign) {
                sign = true;
                result =  true;
            }
            print("尝试获取锁" + (result ? "成功" : "失败"));
            return result;
        }

        @Override
        protected boolean tryRelease(int ignore) {
            sign = false;
            return true;
        }
========================================= 输出
时间 - Fri May 24 18:03:12 CST 2019		Thread[main,5,main]		尝试获取锁成功
时间 - Fri May 24 18:03:12 CST 2019		Thread[main,5,main]		main 线程 Sleep 之前
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之前
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 18:03:17 CST 2019		Thread[main,5,main]		main 线程 Sleep 之后
时间 - Fri May 24 18:03:17 CST 2019		Thread[Thread-0,5,main]		尝试获取锁成功
时间 - Fri May 24 18:03:17 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之后

看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire 实现是一个 "CompareThenSet" 操作,在并发的情况下,会出现不可预期的情况

  • 线程A 进来,发现 signfalse
  • 线程B 同时进来,也发现 signfalse
  • 两者同时将 sign 修改为 true,问题就来了。

到底是 线程A 获取到了锁,还是 线程B 呢?(实际上都获取到了)

我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock(); 获取锁成功则会输出语句 print("获取锁成功");,执行,发现,竟然有两个线程同时获取到了锁。有两个线程同时将 sign 修改为了 true

    public static void main(String[] args) throws InterruptedException {
        Mutex mutex = new Mutex();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            threads.add(new Thread(() -> {
                mutex.lock();
                print("获取锁成功");
            }));
        }

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        threads.forEach(executorService::submit);
        Thread.sleep(1000);
    }

如果我们使用 AQS 帮我们写好的 compareAndSetState 则没有这个问题。

Java9 之前,底层实现是调用 unsafe 包的 compareAndSwapInt 来实现的:

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

而在 Java9 之后,则是使用 VarHandle 来实现 VarHandleunSafe 的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。

    // VarHandle mechanics
    private static final VarHandle STATE;

---------------------------------------------

    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }

这里简单的说一下 CAS,即 CompareAndSwapCAS 可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS 来完成的。对并发编程有所了解的小伙伴应该都知道 CAS,一般情况下,Compare(比较)Swap(交换) 至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。CAS 则保证了 CompareSwap 为一个原子操作。


二、深入理解 AbstractQueuedSynchronizer(AQS) 资源锁定与解锁正向流程

上文说到,我们暂时认为 :tryAcquire 返回 true,代表获取到锁,反之只要 tryAcquire 返回 flase,线程就会被阻塞。

AQS 当然没有这么简单,但我们可以先看看加锁时调用的 acquire 方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们发现,tryAcquire 只是第一重判断,如果 tryAcquire 失败,紧接着还有另一个核心逻辑 acquireQueued。在简介里,我们说,AQS 除了使用一个 原子state 来作为状态判断以外,还有一个 FIFO 队列,此队列就和 acquireQueued 方法息息相关。另外,AQS 所控制的资源访问,还可以是共享的,或者独占的(addWaiter 参数 Node.EXCLUSIVE)。

以下的分析我们以一个简单的 独占式非公平 AQS 实现: java.util.concurrent.locks.ReentrantLock.NonfairSync 来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。

1、TryAcquire 与 TryRelease 的标准写法及其优化

先看看 NonfairSynctryAcuire 是怎么实现及优化的。首先,NonfairSync 中将 state == 0 定义为无锁状态。

  1. 竞争优化: 如果当前无锁(state == 0),再调用 CAS。这实际上对性能是一个很好的优化,假设当前取 state 不为 0,实际上 CompareAndSetState 成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改 state 这个状态。
  2. 重入设计: 试想如果我们不判断当前线程是否持有锁,就去进行 CAS 操作,会发生什么?毫无疑问是 CAS 失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个 int nextc = c + acquires; 操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将无法精确的控制加锁和解锁的层级,难免会出现一些意料之外的情况。简单来说:lock 几次,就要 unLock 几次。当然我们也可以做到 aquire 多次,一次性 release 掉,或者反过来,取决于怎么我们实现 tryAquiretryRlease 方法。
  3. 偏向优化: 这个优化实际上很简单,如果说要获取锁的线程就是锁的持有线程,我们无需去进行任何 CAS 操作,返回 true 即可。
        @ReservedStackAccess
        final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 避免过多的线程竞争 CAS 操作
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++`
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true; // 偏向优化
            }
            return false;
        }



        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

2、AcquireQueued 解析

①、addWaiter阶段

如果 tryAquire 失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))addWaiter 方法创建了一个新的 Node 实例,Node 实例中主要保存了当前线程信息,并将 nextWaiter 赋值为 Node.EXCLUSIVE, 这个 nextWaiter 后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。

操作比较简单,原理是将 node 塞入双向链表尾端,也就是前面提到的 FIFO队列。就是利用 CAS 操作将新创建的、带有本线程信息的 node 设置为双向链表新的 tail,并且修改两者的 ‘指针’ prevnext

        /** Constructor used by addWaiter. */
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }
      Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。
                                       // 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。
            }
        }
②、acquireQueued阶段(自旋)

入队成功后,进入 acquireQueued 方法,抛开线程被 interrupt 的情况acquireQueued 的代码其实也很简单,我们不看 interrupt 相关逻辑,其实逻辑还是很简单的。这是一个无限循环(或者说自旋),只要没有 tryAcquire 成功,就会一直循环下去,逻辑如下:

  1. 如果上一个节点是 FIFO 队列头,则进行一次 tryAquire,如果成功,则跳出循环。
  2. 检测是否需要阻塞,如果需要阻塞,则阻塞等待唤醒,parkAndCheckInterrupt 便是阻塞直到被唤醒(或者被 interrupt ,暂时先不考虑这个情况)。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquire()parkAndCheckInterrupt() 都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。

③、阻塞阶段

我们先说说 shouldParkAfterFailedAcquire,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0,另一个是 pred.waitStatus == SIGNAL == -1

代码中我们可以很容易看出,在 CASprev 节点的 waitStatus 设置为 SIGNAL : -1 之前,都将返回 false,如果设置成功,下一次自旋进入该方法就是 true 了,也就是说,会进入 parkAndCheckInterrupt() 方法,阻塞直到被唤醒。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
自旋阶段图解:

阻塞阶段图解:

3、release 解析

①、唤醒 FIFO 的下一个节点

阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release 做了什么,release 除了调用了我们自己实现的 tryRelease 之外,其实关键的就是这个 unparkSuccessor

tryRelease 上面也说过了,就是改改原子 state,这里不多赘述。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

代码中可以看出,当 FIFO 队列不为空且头结点的 waitStatu 被修改过,就会进入 unparkSuccessorunparkSuccessor 传入了当前 FIFO 的队列头,逻辑如下:

  1. 如果当前节点 waitStatus 为负(可能为 SIGNALCONDITION 或者 PROPAGATE),我们这里简单先看成只有 SIGNAL 状态,则 CAS 将其设置为 0。其他几个状态我们后面会说到。
  2. 如果 !(s == null || s.waitStatus > 0),也就是说 node.nextwaitStatus <= 0 ,则简单的直接将其唤醒:LockSupport.unpark(s.thread);
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

②、被唤醒后

被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue 进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev 是头结点,如果 tryAcquire 成功,我们看到其实很简单,只是将自己设为头部即可。

                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }


这篇文章只是简单的说说 AQS 的正向获取资源,释放资源流程,后续会继续解析 conditioncancel 等基于 AQS 的线程调度解析 ~~ 以及各个锁是如何实现 AQS 的 ~~

文章皆是基于源码一步步分析,没有参考过多资料,如有错误,请指出!!


另外欢迎来 Q 群讨论技术相关(目前基本没人)[左二维码]~

如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码]~


参考资料:

JDK12 源码
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example

另外小伙伴可以思考一下:

  1. 如果说阶段7:ThreadB 被唤醒后,继续自旋时,另一个线程ThreadC tryAcquire成功了会发生什么。
  2. 如果说第一个问题了解了,那应该就很清楚为什么说本文解析的这个锁叫做:非公平锁了
  3. 众所周知,只要是 CAS 操作,都有 ABA 问题,如果说修改 waitStatus 发生了 ABA 问题,会发生什么?
展开阅读全文
打赏
2
22 收藏
分享
加载中
Anur博主

引用来自“大不列颠菠菜”的评论

牛皮🐒

谢谢!ヾ(❀╹◡╹)ノ゙❀~
2019/06/13 19:47
回复
举报
该评论暂时无法显示,详情咨询 QQ 群:912889742
更多评论
打赏
2 评论
22 收藏
2
分享
返回顶部
顶部