文档章节

ReentrantReadWriteLock实现分析(源码)

AbeJeffrey
 AbeJeffrey
发布于 2017/03/26 22:34
字数 3489
阅读 100
收藏 3

ReentrantReadWriteLock是一个可重入读写锁,内部提供了读锁和写锁的单独实现。其中读锁用于只读操作,可被多个线程共享;写锁用于写操作,只能互斥访问。ReentrantReadWriteLock尤其适合读多写少的应用场景,下面将做详细分析。

ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock定义了readLock()和writeLock()用于获取读锁和写锁。

ReentrantReadWriteLock的核心在于内部Sync的实现,Sync继承自AbstractQueuedSynchronizer,关于AQS核心原理参见http://my.oschina.net/7001/blog/862143

ReentrantReadWriteLock特性

公平性

支持两种策略获取锁,非公平模式(默认)和公平模式。

非公平模式不按照线程请求锁的顺序分配锁,新加入的线程总是和等待队列中的线程竞争锁,竞争失败才会进入等待队列。非公平锁的吞吐量高于公平锁,不会导致写线程饿死,但是连续的锁竞争可能会导致队列中的线程长时间等待。

公平模式则按照FIFO的顺序获取锁,主要的竞争在于加入队列的顺序,具体可阅读AQS的源码addWaiter。公平模式下不存在写线程一直等待的问题。

重入性

AQS使用了int state来保存锁的状态,state的数量(大于1)则代表锁的重入数。在读写锁中,则用state的高16位用作读锁,低16位用作写锁,因此读锁和写锁都最多只能被持有65535次。单个读线程的重入次数则ThreadLocalHoldCounter来保存。

锁降级

ReentrantReadWriteLock共享锁实现中,支持当前持有写锁的线程可继续申请读锁,实现锁降级。所谓锁降级,即当前线程持有写锁,再申请读锁,然后释放写锁,这个时候则实现了锁降级,其他线程也可获得读锁,最后再释放读锁。当线程申请读锁后,先释放读锁,最后再释放写锁,实际并没有实现锁降级,此时仍然是独占锁,当然,实际编写代码中应该坚决避免,浪费资源。

condition支持

写锁提供了一个Condition实现,与ReentrantLock.newCondition()提供的Condition行为相同,读锁不支持 Condition,readLock().newCondition()会抛出UnsupportedOperationException。

锁中断

无论读锁或写锁均支持在获取锁期间被中断。

同步器Sync

Sync核心属性源码说明:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * 锁的state状态在逻辑上被分为两个无符号short:低16位表示一个独占(写)锁定计数,              
         * 高16位则表示共享(读)锁定计数
         */
        //读锁计数与写锁计数提取常数
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);//用于计算高16位的值
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//最大持有数65535
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//写锁掩码,用于计算低16位有效值

        /** 读锁计数  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** 写锁技术,即重入数  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

        /**
         * 每个线程的读锁计数器,即每个线程读锁的重入数,保存在ThreadLocal中,缓存在cachedHoldCounter中
         */
        static final class HoldCounter {
            int count = 0;
            // 使用id而非引用,避免保留垃圾
            final long tid = getThreadId(Thread.currentThread());
        }

        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        /**
         * 当前线程持有读锁的可重入数,仅在构造方法和readObject中初始化。可重入数变为0时被移除
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * 最近一个成功获取读锁的线程的计数,可避免从ThreadLocal中查找。通常情况下,下一个释放锁的线程
         * 是最后一个获取线程。由于它仅用作启发式,因此是非volatile的,对线程进行缓存是非常好的。
         * 
         * 用于缓存存活线程读锁的持有量,同时避免线程引用产生的垃圾
         * 
         *  通过良性数据竞争获得,依靠内存模型的final域和out-of-thin-air保证
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * firstReader代表第一个已经获取读锁的线程,firstReaderHoldCount则是持有计数
         *
         * 更准确的说,firstReader是最后一个把共享计数从0改为1,并且还没有释放锁。 
         * 如果没有这样的线程,firstReader为null;
         *  
         *firstReader不会导致垃圾堆积,因为在tryReleaseShared中将它置空了,除非 
         * 线程异常终止,没有释放读锁。
         * 
         * 通过良性数据竞争获得,依靠内存模型的final域和out-of-thin-air保证
         * 
         * 跟踪无竞争的读锁计数时,代价很低 
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }
        /*
         * 公平锁和非公平锁均使用相同的代码获取锁和释放锁,唯一的不同是,当等待队列非空时锁的竞争方式
         */

        /**
         * 是否阻塞读线程,具体行为由公平锁/非公平锁提供实现
         */
        abstract boolean readerShouldBlock();

        /**
         * 是否阻塞写线程,具体行为由公平锁/非公平锁提供实现
         */
        abstract boolean writerShouldBlock();
}

ReadLock

读锁是共享模式持有锁,下面将使用读锁的非公平实现来说明lock/unlock的过程。

加锁

    public void lock() {
         sync.acquireShared(1);//使用共享模式
    }
    //由AQS实现,使用共享模式,忽略中断
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    protected final int tryAcquireShared(int unused) {
            /*
             * 过程:
             * 1. 如果写锁被其他线程持有,fail;也即说,如果当前线程持有写锁,那么仍然可以申请读锁(锁降级)
             * 2. 否则,该线程有资格申请锁,因此,查看是否因为队列策略而被阻塞,如果不阻塞,尝试
             *    通过CAS更新状态和计数器。注意:这里不检查可重入获取,将在fullTryAcquireShared中
             *    做该操作,以避免在更典型的非可重入的情况下检查计数
             * 3. 如果步骤2因为read应该阻塞,共享计数达到65535或CAS失败而失败,则进入fullTryAcquireShared
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {//无任何线程持有读锁
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {//当前线程是第一次持有读锁的线程
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();//更新缓存
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
    }
    //非公平锁实现
    final boolean readerShouldBlock() {
            /* 作为避免写线程饿死的启发式。
             * 如果队列第一个线程正在等待写锁,则应该阻塞
             * 如果其他已经启动但还未加入队列的读线程之后有写线程,新的读线程将不会阻塞
             */
            return apparentlyFirstQueuedIsExclusive();
    }
    //如果列队中第一个线程正在以独占模式(这里即写锁)等待,返回true。为了我防止写线程饿死。
    /* 如果返回true,表示当前线程正在使用共享模式尝试获取锁(即正在调用tryAcquireShared),
     * 那么应该保证当前线程不是队列中的第一个线程
     * 为什么这么说呢?如果返回true,阻塞当前线程,当前线程又是队列中的第一个线程,那岂不玩完啦
     */
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
    final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else 持有独占锁,在这里阻塞将导致死锁
                } else if (readerShouldBlock()) {
                    // 当前没有线程持有读锁时,确保不重新获取读锁,如果当前线程或其他线程已经持有读锁,不应该阻塞
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)//没有线程持有读锁,可安全阻塞
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

由代码可知,tryAcquireShared尝试获取共享锁失败主要有以下几种情况:

1 其他线程持有写锁,可安全阻塞

2 共享计数达到65535 (抛出error,不会进行后续操作)

3 根据队列策略,当前线程应该阻塞,确保当前线程或其他线程没有持有读锁,可安全阻塞

接下来调用doAcquireShared将当前线程加入等待队列:

   private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //设置新的队列head,如果node的后继节点在共享模式下等待,如果propagate>0或PROPAGATE已被设置,则传播
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

当前线程在doAcquireShared中被阻塞和唤醒,被唤醒后,若为等待队列中的第一个节点,则尝试获取读锁。获取锁成功则执行setHeadAndPropagate,该方法的主要作用是设置新的head并向后传播共享特性,唤醒共享模式下的后继节点,本文末尾将结合实际场景详细说明如何向后传播。

解锁

    public void unlock() {
        sync.releaseShared(1);
    }
    //在AQS中实现
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//只有读锁和写锁都空闲才会返回true
            doReleaseShared();
            return true;
        }
        return false;
    }
    protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {//自旋操作,直到成功
                int c = getState();
                int nextc = c - SHARED_UNIT;//高16位减1
                if (compareAndSetState(c, nextc))
                    // 释放读锁对读线程没有影响,如果读锁和写锁都是空闲,则应该允许等待中的写线程尝试获取锁
                    return nextc == 0;
            }
      }

ReadLock的公平锁实现主要在于readerShouldBlock的实现不同,公平锁实现中,只要当前等待队列中有线程在等待锁,则会返回true,进入队列中排队。

WriteLock

WriteLock是独占锁,其lock/unlock原理和ReentrantLock实现基本相同,主要的区别在于lock的tryAcquire实现不同,而unlock的tryRelease实现也不同,这里将详细分析tryAcquire和tryRelease,整个lock和unlock的实现可参见https://my.oschina.net/7001/blog/778625

lock的tryAcquire实现:

    protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. 如果读锁计数非0 或 写锁计数非0且持有写锁的线程非当前线程, fail.
             * 2. 如果写锁计数超过65535, fail. (仅会在count非0的情况下发生)
             * 3. 否则,如果当前线程是重入获取锁或队列策略允许获取锁,则进行锁定,并更新状态和设置所有者
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)//加锁后写锁计数超过65535
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);//重入体现
                return true;
            }
            //非公平锁writerShouldBlock总是返回false;
            //公平锁则判断当前等待队列中有无等待线程,即调用hasQueuedPredecessors
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
      }

unlock的tryRelease实现:

       protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())//锁持有者非当前线程
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;//发生重入则需经历相应次数的释放
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
      }

在WriteLock和ReadLock的实现中,使用中断方式获取锁时,发生中断则抛出异常,不再参与锁的获取中,由客户端代码处理中断异常。

共享锁传播

考虑如下场景:

当前等待队列中第一个等待线程为写线程,则此时申请共享锁的线程都将进入等待队列。

当写线程释放独占锁之后,唤醒后继节点,若后继线程使用共享模式获取锁,在doAcquireShared中调用tryAcquireShared尝试获取锁,成功则执行setHeadAndPropagate,在setHeadAndPropagate中,首先将当前获取共享锁的节点设置为新的head,且新的head节点的waitStatus应该为SIGNAL。如果当前节点的后继节点仍然以共享模式获取锁,则调用doReleaseShared();在doReleaseShared中主要做以下几件事:

1 等待队列非空,若head节点的waitStatus为SIGNAL,则修改waitStatus为0,成功则唤醒继任节点

2 head节点的waitStatus为0,则修改waitStatus为PROPAGATE,表示可向后传播。

此时,共享模式继任节点被唤醒,在doAcquireShared中调用tryAcquireShared尝试获取锁,成功则继续调用setHeadAndPropagate,将自己设为新的head节点,继任节点若以共享模式获取锁,则唤醒;如此重复,直到队尾或继任节点是以独占模式申请锁。

综上分析,共享锁的传播主要是通过移动head指针,修改head指针的waitStatus并唤醒继任节点来完成,使队列中获取共享锁的连续节点均被唤醒。

总结:ReentrantReadWriteLock同时实现了读锁和写锁,读锁和写锁共享state来保存状态,其中高16位用于读锁,低16位用于写锁;读锁和写锁均支持重入,且有公平模式和非公平模式两种实现;ReentrantReadWriteLock锁的获取与释放由核心内部类Sync实现,且读锁与写锁之间满足如下约束:

1 当任一线程持有写锁或读锁时,其他线程不能获得写锁; 

2 多个线程可以同时持有读锁。

3 当前线程持有写锁后,仍然可以持有读锁,实现锁降级。

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/867991

© 著作权归作者所有

上一篇: GC日志参数详解
下一篇: Java并发编程之AQS
AbeJeffrey
粉丝 43
博文 43
码字总数 116095
作品 0
杭州
高级程序员
私信 提问
Java并发编程之读写锁 ReentrantReadWriteLock 源码分析

我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求...

狂小白
2018/06/14
0
0
Java锁之ReentrantReadWriteLock

一、前言 上一篇Java锁之ReentrantLock(二)分析了ReentrantLock实现利器AQS同步器,通过AQS源码分析,我们知道了同步器通过sate状态进行锁的获取与释放,同时构造了双向FIFO双向链表进行线...

木木匠
2018/08/26
583
2
Java并发编程-读写锁(ReentrantReadWriteLock)

章节目录 ReentrantReadWriteLock 特性 读写锁接口示例 读写锁的实现分析 1. ReentrantReadWriteLock 特性 1.1 读写锁定义 1.2 读写锁使用场景 1.3 读写锁的优点 2.读写锁接口示例 如下为使用...

markfork
2018/05/27
0
0
Java并发编程-CountDownLatch

1、 应用场景   CountDownLatch是并发包中用来控制一个或者多个线程等待其他线程完成操作的并发工具类。现以工作中的一个场景来描述下CountDownLatch的应用,代码如下: /*模拟工作中的一个...

Ala6
2018/10/03
591
1
Java Lock接口分析之ReentantReadWriteLock

ReentantReadWriteLock读写锁,在读线程多余写线程的并发环境中能体现出优异的性能,相比于synchronized与ReentrantLock这种独占式锁的模型,ReentantReadWriteLock采用独占式写锁与共享式读...

我爱春天的毛毛雨
2018/09/21
86
0

没有更多内容

加载失败,请刷新页面

加载更多

PostgreSQL 11.3 locking

rudi
今天
5
0
Mybatis Plus sql注入器

一、继承AbstractMethod /** * @author beth * @data 2019-10-23 20:39 */public class DeleteAllMethod extends AbstractMethod { @Override public MappedStatement injectMap......

一个yuanbeth
今天
10
1
一次写shell脚本的经历记录——特殊字符惹的祸

本文首发于微信公众号“我的小碗汤”,扫码文末二维码即可关注,欢迎一起交流! redis在容器化的过程中,涉及到纵向扩pod实例cpu、内存以及redis实例的maxmemory值,statefulset管理的pod需要...

码农实战
今天
4
0
为什么阿里巴巴Java开发手册中不建议在循环体中使用+进行字符串拼接?

之前在阅读《阿里巴巴Java开发手册》时,发现有一条是关于循环体中字符串拼接的建议,具体内容如下: 那么我们首先来用例子来看看在循环体中用 + 或者用 StringBuilder 进行字符串拼接的效率...

武培轩
今天
8
0
队列-链式(c/c++实现)

队列是在线性表功能稍作修改形成的,在生活中排队是不能插队的吧,先排队先得到对待,慢来得排在最后面,这样来就形成了”先进先出“的队列。作用就是通过伟大的程序员来实现算法解决现实生活...

白客C
今天
81
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部