文档章节

JUC锁框架——ReadWriteLock

长头发-dawn
 长头发-dawn
发布于 2018/09/17 16:02
字数 3672
阅读 35
收藏 10

ReadWriteLock简单介绍

ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。相对于互斥锁而言,ReadWriteLoc允许更高的并发量。

所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。

ReadWriteLock接口

public interface ReadWriteLock {
    Lock readLock();//获取读锁
    Lock writeLock();//获取写锁
}

ReentrantReadWriteLock实现类

ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。

  1. 锁的获取获取模式
    • 非公平模式(默认):读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,比公平锁有更高的吞吐量。
    • 公平模式:线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写/读锁线程就会被分配写/读锁;当有写线程持有写锁或者有等待的写线程时,一个尝试获取公平的读锁(非重入)的线程就会阻塞。这个线程直到等待时间最长的写锁获得锁后并释放掉锁后才能获取到读锁。
  2. 可重入:允许读锁可写锁可重入。写锁可以获得读锁,读锁不能获得写锁。
  3. 锁降级:允许写锁降低为读锁。
  4. 中断锁的获取:在读锁和写锁的获取过程中支持中断。
  5. 支持Condition:写锁提供Condition实现。
  6. 监控:提供确定锁是否被持有等辅助方法

锁降低的简单就示例

class ReadWriteLockTest {
    String data;//缓存中的对象
    volatile boolean cacheValid;//缓存是否还有效
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() throws InterruptedException {
        rwl.readLock().lock();
        if (!cacheValid) {//缓存失效需要再次读取缓存
            //在读取缓存前,必须释放读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                //再次检查是否需要再次读取缓存,如果需要则
                if (!cacheValid) {
                    data = Thread.currentThread().getName()+":缓存数据测试,实际开发可能是一个对象!";
                    cacheValid = true;
                }
                //在释放之前,通过获取读锁降级写锁
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); //释放写锁,持有读锁
            }
        }
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+"【"+data+"】");
        } finally {
            rwl.readLock().unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
        for(int i=0;i<10;i++){
           Thread thread = new Thread(()->{
               try {
                   readWriteLockTest.processCachedData();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });
           thread.start();
        }
    }
}

源码分析

构造方法

public ReentrantReadWriteLock() {
    this(false);//默认为非公平模式
}

public ReentrantReadWriteLock(boolean fair) {
    //决定了Sync是FairSync还是NonfairSync。Sync继承了AbstractQueuedSynchronizer,而Sync是一个抽象类,NonfairSync和FairSync继承了Sync,并重写了其中的抽象方法。
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

获取锁

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

Sync分析

FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。

对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。

NonfairSync

 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 写线程无需阻塞
        }
        final boolean readerShouldBlock() {
            //apparentlyFirstQueuedIsExclusive在当前线程是写锁占用的线程时,返回true;否则返回false。也就说明,如果当前有一个写线程正在写,那么该读线程应该阻塞。
            return apparentlyFirstQueuedIsExclusive();
        }
    }

ReentrantReadWriteLock中的state

继承AQS的类都需要使用state变量代表某种资源,ReentrantReadWriteLock中的state代表了读锁的数量和写锁的持有与否,整个结构如下: 可以看到state的高16位代表读锁的个数;低16位代表写锁的状态。

获取锁

读锁的获取

public void lock() {
    sync.acquireShared(1);
}

读锁使用的是AQS的共享模式,AQS的acquireShared方法如下:

if (tryAcquireShared(arg) < 0)
   doAcquireShared(arg);

当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到等待队列中。

Sync实现了tryAcquireShared方法,如下:

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //如果当前有写线程并且本线程不是写线程,不符合重入,失败.
            //在获取读锁时,如果有写线程,则获取失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //得到读锁的个数
            int r = sharedCount(c);
            //如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值,成功
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {//如果当前读锁为0
                    firstReader = current;//第一个读线程就是当前线程
                    firstReaderHoldCount = 1;//第一个线程持有读锁的个数
                }
                //如果当前线程重入了,记录firstReaderHoldCount
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                }
                //当前读线程和第一个读线程不同,记录每一个线程读的次数
                else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //否则,循环尝试
            return fullTryAcquireShared(current);
        }

从上面的代码以及注释可以看到,分为三步:

  1. 如果当前有写线程并且本线程不是写线程,那么失败,返回-1
  2. 否则,说明当前没有写线程或者本线程就是写线程(可重入),接下来判断是否应该读线程阻塞并且读锁的个数是否小于最小值,并且CAS成功使读锁+1,成功,返回1。其余的操作主要是用于计数的
  3. 如果2中失败了,失败的原因有三,第一是应该读线程应该阻塞;第二是因为读锁达到了上线;第三是因为CAS失败,有其他线程在并发更新state,那么会调动fullTryAcquireShared方法。

fullTryAcquiredShared方法

  final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                //一旦有别的线程获得了写锁,并且获得写锁的线程不是本线程,返回-1,失败
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                }
                //如果读线程需要阻塞
                else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    }
                    //说明有别的读线程占有了锁
                    else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //如果读锁达到了最大值,抛出异常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //如果成功更改状态,成功返回
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

写锁的获取

public void lock() {
    sync.acquire(1);
}

AQS的acquire方法如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

从上面可以看到,写锁使用的是AQS的独占模式。首先尝试获取锁,如果获取失败,那么将会把该线程加入到等待队列中。

Sync实现了tryAcquire方法用于尝试获取一把锁,如下:

protected final boolean tryAcquire(int acquires) {
             //得到调用lock方法的当前线程
            Thread current = Thread.currentThread();
            int c = getState();
            //得到写锁的个数
            int w = exclusiveCount(c);
            //如果当前有写锁或者读锁.(对于读锁而言,如果当前写线程可以进行写操作,那么读线程读到的数据可能有误)
            if (c != 0) {
                // 如果写锁为0或者当前线程不是独占线程(不符合重入),返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //如果写锁的个数超过了最大值(65535),抛出异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 写锁重入,返回true
                setState(c + acquires);
                return true;
            }
            //如果当前没有写锁或者读锁,如果写线程应该阻塞或者CAS失败,返回false
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //否则将当前线程置为获得写锁的线程,返回true
            setExclusiveOwnerThread(current);
            return true;
        }

释放锁

读锁的释放

ReadLock的unlock方法如下:

 public void unlock() {
     sync.releaseShared(1);
 }

调用了Sync的releaseShared方法,该方法在AQS中提供,如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

调用tryReleaseShared方法尝试释放锁,如果释放成功,调用doReleaseShared尝试唤醒下一个节点。

AQS的子类需要实现tryReleaseShared方法,Sync中的实现如下:

protected final boolean tryReleaseShared(int unused) {
    //得到调用unlock的线程
    Thread current = Thread.currentThread();
    //如果是第一个获得读锁的线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    }
    //否则,是HoldCounter中计数-1
    else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //死循环
    for (;;) {
        int c = getState();
        //释放一把读锁
        int nextc = c - SHARED_UNIT;
        //如果CAS更新状态成功,返回读锁是否等于0;失败的话,则重试
        if (compareAndSetState(c, nextc))
            //释放读锁对读线程没有影响,但是可能会使等待的写线程解除挂起开始运行。所以,一旦没有锁了,就返回true,否则false;返回true后,那么则需要释放等待队列中的线程,这时读线程和写线程都有可能再获得锁。
            return nextc == 0;
    }
}

写锁的释放

WriteLock的unlock方法如下:

public void unlock() {
    sync.release(1);
}

Sync的release方法使用的AQS中的,如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)//如果等待队列中有线程再等待
            unparkSuccessor(h);//将下一个线程解除挂起。
        return true;
    }
    return false;
}

Sync需要实现tryRelease方法,如下:

protected final boolean tryRelease(int releases) {
    //如果没有线程持有写锁,但是仍要释放,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    //如果没有写锁了,那么将AQS的线程置为null
    if (free)
        setExclusiveOwnerThread(null);
    //更新状态
    setState(nextc);
    return free;//此处返回当且仅当free为0时返回,如果当前是写锁被占有了,只有当写锁的数据降为0时才认为释放成功;否则失败。因为只要有写锁,那么除了占有写锁的那个线程,其他线程即不可以获得读锁,也不能获得写锁
}

getOwner()

getOwner方法用于返回当前获得写锁的线程,如果没有线程占有写锁,那么返回null。实现如下:

protected Thread getOwner() {
    return sync.getOwner();
}

可以看到直接调用了Sync的getOwner方法,下面是Sync的getOwner方法:

final Thread getOwner() {
  // Must read state before owner to ensure memory consistency
  //如果独占锁的个数为0,说明没有线程占有写锁,那么返回null;否则返回占有写锁的线程。
  return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread());
}

getReadLockCount()

getReadLockCount()方法用于返回读锁的个数,实现如下:

public int getReadLockCount() {
    return sync.getReadLockCount();
}

Sync的实现如下:

final int getReadLockCount() {
    return sharedCount(getState());
}
//要想得到读锁的个数,就是看AQS的state的高16位。这和前面讲过的一样,高16位表示读锁的个数,低16位表示写锁的个数。
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

getReadHoldCount()

getReadHoldCount()方法用于返回当前线程所持有的读锁的个数,如果当前线程没有持有读锁,则返回0。直接看Sync的实现即可:

final int getReadHoldCount() {
   //如果没有读锁,自然每个线程都是返回0
   if (getReadLockCount() == 0)
       return 0;

   //得到当前线程
   Thread current = Thread.currentThread();
   //如果当前线程是第一个读线程,返回firstReaderHoldCount参数
   if (firstReader == current)
       return firstReaderHoldCount;
   //如果当前线程不是第一个读线程,得到HoldCounter,返回其中的count
   HoldCounter rh = cachedHoldCounter;
   //如果缓存的HoldCounter不为null并且是当前线程的HoldCounter,直接返回count
   if (rh != null && rh.tid == getThreadId(current))
       return rh.count;

   //如果缓存的HoldCounter不是当前线程的HoldCounter,那么从ThreadLocal中得到本线程的HoldCounter,返回计数
    int count = readHolds.get().count;
    //如果本线程持有的读锁为0,从ThreadLocal中移除
    if (count == 0) readHolds.remove();
    return count;
}

从上面的代码中,可以看到两个熟悉的变量,firstReader和HoldCounter类型。这两个变量在读锁的获取中接触过,前面没有细说,这里细说一下。HoldCounter类的实现如下:

static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

readHolds是ThreadLocalHoldCounter类,定义如下:

 static final class ThreadLocalHoldCounter
      extends ThreadLocal<HoldCounter> {
      public HoldCounter initialValue() {
          return new HoldCounter();
      }
  }

可以看到,readHolds存储了每一个线程的HoldCounter,而HoldCounter中的count变量就是用来记录线程获得的写锁的个数。所以可以得出结论:Sync维持总的读锁的个数,在state的高16位;由于读线程可以同时存在,所以每个线程还保存了获得的读锁的个数,这个是通过HoldCounter来保存的。 除此之外,对于第一个读线程有特殊的处理,Sync中有如下两个变量:

private transient Thread firstReader = null;//第一个得到读锁的线程
private transient int firstReaderHoldCount;//第一个线程获得的写锁

其余获取到读锁的线程的信息保存在HoldCounter中。

看完了HoldCounter和firstReader,再来看一下getReadLockCount的实现,主要有三步:

  1. 当前没有读锁,那么自然每一个线程获得的读锁都是0;
  2. 如果当前线程是第一个获取到读锁的线程,那么返回firstReadHoldCount;
  3. 如果当前线程不是第一个获取到读锁的线程,得到该线程的HoldCounter,然后返回其count字段。如果count字段为0,说明该线程没有占有读锁,那么从readHolds中移除。获取HoldCounter分为两步,第一步是与cachedHoldCounter比较,如果不是,则从readHolds中获取。

getWriteLockCount()

getWriteLockCount()方法返回写锁的个数,Sync的实现如下:

final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

可以看到如果没有线程持有写锁,那么返回0;否则返回AQS的state的低16位。

总结

当分析ReentranctReadWriteLock时,或者说分析内部使用AQS实现的工具类时,需要明白的就是AQS的state代表的是什么。ReentrantLockReadWriteLock中的state同时表示写锁和读锁的个数。为了实现这种功能,state的高16位表示读锁的个数,低16位表示写锁的个数。AQS有两种模式:共享模式和独占模式,读写锁的实现中,读锁使用共享模式;写锁使用独占模式;另外一点需要记住的即使,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。

本文转载自:https://blog.csdn.net/qq_19431333/article/details/70568478

长头发-dawn
粉丝 9
博文 27
码字总数 51237
作品 0
西安
私信 提问
java程序猿技术栈

一、java 基础知识 1.1 java基础集合类 1.2 jdk1.5、1.6、1.7、1.8 特效比较 1.3 java异常处理 1.4 jvm原理及常见问题 1.5 log4j等日志收集 1.6 jdbc驱动 1.7 jdk反射机制使用和原理 1.8 ja...

南寒之星
2016/11/30
17
0
JUC 基础内容概述

Concurrent Programming in Java 的作者 Doug Lea 编写了一个极其优秀的、免费的并发实用程序包,它包括并发应用程序的锁、互斥、队列、线程池、轻量级任务、有效的并发集合、原子的算术操作...

暗之幻影
2016/12/17
70
0
显式锁(java.util.Concurrent)

一、前言   在分析完了集合框架后,很有必要接着分析java并发包下面的源码,JUC(java.util.concurrent)源码也是我们学习Java迈进一步的重要过程。我们分为几个模块进行分析,首先是对锁模...

狼王黄师傅
2018/11/27
45
0
22、Java并发性和多线程-Java中的读/写锁

以下内容转自http://ifeve.com/read-write-locks/: 相比Java中的锁(Locks in Java)里Lock实现,读写锁更复杂一些。假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么...

easonjim
2017/06/17
0
0
Java 多线程系列目录(共43篇)

Java多线程系列目录(共43篇) 最近,在研究Java多线程的内容目录,将其内容逐步整理并发布。 (一) 基础篇 01. Java多线程系列--“基础篇”01之 基本概念 02. Java多线程系列--“基础篇”02之 ...

foxeye
2016/02/29
277
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot全家桶,集成shiro,rabbitmq,redis, mybatis,druid,swagger

搭建了一个SpringBoot的模板,下载即可作为单体应用的服务后台模板 项目地址:https://gitee.com/devilscode/springboot_template 项目集成框架 springboot 作为基础框架 shiro 权限管理 ra...

devils_os
14分钟前
2
0
云服务器的优势有哪些?

与传统的服务器相比,云服务器有多种显著的优势,因此近年来云服务器租用一直都很受消费者的欢迎。 那么云服务器都有哪些优势呢? 云服务器的稳定性很好 虚拟主机是很多用户建站的首选方案,这...

云漫网络Ruan
14分钟前
2
0
好程序员web前端学习路线分享CSS浮动-清除浮动篇

好程序员web前端学习路线分享CSS浮动-清除浮动篇,为什么要清除浮动   这里所说的清除浮动,并不是不要浮动了,而是清除浮动与浮动之间的影响。那么到底会有什么影响呢? 1.高度塌陷   ...

好程序员官方
25分钟前
1
0
我有酒,你有故事吗?

记录学习的每一秒,巨杉技术社区有奖征文全新启动! 如果, 你热爱数据技术,乐于尝(zhe)试(teng), 又喜欢写点什么~ 那么, 无论你是架构大佬,还是DBA大神, 只要你愿意通过文字分享你...

巨杉数据库
29分钟前
1
0
【Git】ignore文件不生效

简介 使用git的过程中, 某些文件可能不希望上传到服务器,例如日志、本地化配置等信息。这时候,我们可以通过配置.gitignore文件忽略这些文件的提交。 配置.gitignore 常见的配置如下: HELP...

Areya
31分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部