[Curator] Shared Reentrant Read Write Lock 的使用与分析
[Curator] Shared Reentrant Read Write Lock 的使用与分析
秋雨霏霏 发表于12个月前
[Curator] Shared Reentrant Read Write Lock 的使用与分析
  • 发表于 12个月前
  • 阅读 27
  • 收藏 0
  • 点赞 0
  • 评论 0

移动开发云端新模式探索实践 >>>   

摘要: Zookeeper Curator :Shared Reentrant Read Write Lock 分布式读写锁

Shared Reentrant Read Write Lock

一个可重入的分布式读写锁。其中读写锁就是由两把锁组成的,一把控制着只读操作,而另外一个控制着写操作。读锁是允许同时进行多个读操作的,但是写操作则不允许。写锁是独占的。

1. 关键 API

org.apache.curator.framework.recipes.locks.InterProcessLock

org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock

2. 机制说明

可重入性:

在可重入模式时,对于读锁和写锁都可以像重入锁一样重入。

不可重入模式时,读锁需要等待所有的写锁释放掉才可进入。

至于使用哪种模式,完全取决于使用的锁的类型。

另外,写锁持有者可以自动获得读锁。但是,反过来就不行了。 如果一个读锁的持有者申请写锁,那它永远也获取不到。

锁降级:

可重入锁允许从写锁降级为读锁;反之,不允许从读锁升级到写锁。

3. 用法

3.1 创建

public InterProcessReadWriteLock(CuratorFramework client,
                                 String basePath)

简单的构造器创建

3.2 使用

public InterProcessLock readLock()

public InterProcessLock writeLock()

两个api,分别对应着读/写锁。返回一个org.apache.curator.framework.recipes.locks.InterProcessLock。这两个方法返回的都是Shared Reentrant Lock

4. 错误处理

和其他锁一样,务必关注链接状态的变化。增加ConnectionStateListener。 参见:Shared Reentrant Lock

5. 源码分析

5.1 类定义

public class InterProcessReadWriteLock {}

没有继承任何父类,也没有实现什么接口。

5.2 成员变量

public class InterProcessReadWriteLock
{
    private final InterProcessMutex readMutex;
    private final InterProcessMutex writeMutex;

    // must be the same length. LockInternals depends on it
    private static final String READ_LOCK_NAME  = "__READ__";
    private static final String WRITE_LOCK_NAME = "__WRIT__";
}
  • readMutex
    • org.apache.curator.framework.recipes.locks.InterProcessMutex
    • 可重入
  • writeMutex
    • org.apache.curator.framework.recipes.locks.InterProcessMutex
    • 可重入

可见,读写锁本身并不是什么新的锁类型,只是两把锁的组合。 从锁的类型来看,可以发现,读写锁是可重入的。

5.3 内部类

public class InterProcessReadWriteLock {
    private static class SortingLockInternalsDriver extends StandardLockInternalsDriver
    {
        @Override
        public final String fixForSorting(String str, String lockName)
        {
            str = super.fixForSorting(str, READ_LOCK_NAME);
            str = super.fixForSorting(str, WRITE_LOCK_NAME);
            return str;
        }
    }

    private static class InternalInterProcessMutex extends InterProcessMutex
    {
        private final String lockName;
        private final byte[] lockData;

        InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver)
        {
            super(client, path, lockName, maxLeases, driver);
            this.lockName = lockName;
            this.lockData = lockData;
        }

        @Override
        public Collection<String> getParticipantNodes() throws Exception
        {
            Collection<String>  nodes = super.getParticipantNodes();
            Iterable<String>    filtered = Iterables.filter
            (
                nodes,
                new Predicate<String>()
                {
                    @Override
                    public boolean apply(String node)
                    {
                        return node.contains(lockName);
                    }
                }
            );
            return ImmutableList.copyOf(filtered);
        }

        @Override
        protected byte[] getLockNodeBytes()
        {
            return lockData;
        }
    }
}
  • SortingLockInternalsDriver
    • 定义了一个内部锁驱动类,这个类的作用可以参见Shared Reentrant Lock
    • 主要是对fixForSorting进行覆盖
      • 可以看到用两个常量字符串拼接在排序字段上
  • InternalInterProcessMutex
    • 继承与org.apache.curator.framework.recipes.locks.InterProcessMutex
    • 多了个lockName,用于区分读/写锁
    • getParticipantNodes中,加入了一个filter
      • 对于同一个path下的锁节点进行过滤
        • 说明读/写锁都放在同一个path下,用名字中的字符串来区分

5.4 构造器

public InterProcessReadWriteLock(CuratorFramework client, String basePath)
{
    this(client, basePath, null);
}

public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
    lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);

    writeMutex = new InternalInterProcessMutex
    (
        client,
        basePath,
        WRITE_LOCK_NAME,
        lockData,
        1,
        new SortingLockInternalsDriver()
        {
            @Override
            public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
            {
                return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
            }
        }
    );

    readMutex = new InternalInterProcessMutex
    (
        client,
        basePath,
        READ_LOCK_NAME,
        lockData,
        Integer.MAX_VALUE,
        new SortingLockInternalsDriver()
        {
            @Override
            public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
            {
                return readLockPredicate(children, sequenceNodeName);
            }
        }
    );
}

可以看见,构造器就是做了一件事,初始化读/写锁。 两者都是使用内部的InternalInterProcessMutex,而这个初始化过程完全可以参见Shared Reentrant Lock

但是,选哟注意的是:

  1. 租约数量不一样
    • writeMutex : 写锁
      • maxLeases = 1
        • 最多只有一个锁的租约
        • 排他锁
    • readMutex : 读锁
      • maxLeases = Integer.MAX_VALUE
        • “无限”
  2. 读锁驱动的getsTheLock逻辑被替换掉了

5.5 实现

上一节中提到过读锁驱动的getsTheLock逻辑被替换掉了。对于读锁是否获得锁的逻辑是一个特殊实现:org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock#readLockPredicate方法

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
    if ( writeMutex.isOwnedByCurrentThread() )
    {
        return new PredicateResults(null, true);
    }

    int         index = 0;
    int         firstWriteIndex = Integer.MAX_VALUE;
    int         ourIndex = -1;
    for ( String node : children )
    {
        if ( node.contains(WRITE_LOCK_NAME) )
        {
            firstWriteIndex = Math.min(index, firstWriteIndex);
        }
        else if ( node.startsWith(sequenceNodeName) )
        {
            ourIndex = index;
            break;
        }

        ++index;
    }

    StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

    boolean     getsTheLock = (ourIndex < firstWriteIndex);
    String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
    return new PredicateResults(pathToWatch, getsTheLock);
}
  1. 如果当前线程已经持有了写锁,则直接判定获得了读锁

    • 读/写锁的重入性质降级逻辑
  2. 通过遍历锁节点列表,找到自己的顺位,以及写锁的顺位

  3. 校验自己的顺位

    • 主要是防止自己的读锁节点出现问题(误删,session失效等)
  4. 只要自身顺位在写锁顺位之前

    • 即:先加读锁,后加的写锁
    • 则判定获得读锁
  5. 如果没有获取读锁,则对写锁顺位的节点进行监听

    • 当写锁释放掉之后,重新进入判定过程

5.6 小结

所以,可以简单归纳一下读写锁的特性:

  1. 读写锁都可重入
  2. 可以同时有多个读锁持有者
  3. 写锁为排他锁,只能等所有的之前的锁持有者(无论读/写)都释放完
  4. 如果有写锁存在,读锁需要等待写锁释放

6. 测试

6.1 读锁重入

package com.roc.curator.demo.locks

import org.apache.commons.lang3.RandomStringUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit

/**
 * Created by roc on 2017/5/31.
 */
class ReadWriteLockTest {

    val LATCH_PATH: String = "/test/locks/rw"

    var client: CuratorFramework = CuratorFrameworkFactory.builder()
            .connectString("0.0.0.0:8888")
            .connectionTimeoutMs(5000)
            .retryPolicy(ExponentialBackoffRetry(1000, 10))
            .sessionTimeoutMs(3000)
            .build()

    @Before fun init() {
        client.start()
    }

    @Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var count: Int = 0;

        while (count < 10) {
            if (readLock.acquire(3, TimeUnit.SECONDS)) {
                println("$id $count 加读锁成功 $time")
            } else {
                println("$id $count 加读锁失败 $time")
            }
            TimeUnit.SECONDS.sleep(10)
            count++
        }

        println("$id 结束: $time")

    }
}

单线程,重复申请10次读锁。 输出:

id : tvyLxApqnS 
tvyLxApqnS 0 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 1 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 2 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 3 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 4 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 5 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 6 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 7 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 8 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 9 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 结束: Wed May 31 18:48:16 CST 2017

zookeeper节点:

ls /test/locks/rw
[_c_4ca5aad6-958a-41b3-a890-6991460978d0-__READ__0000000001]

get /test/locks/rw/_c_4ca5aad6-958a-41b3-a890-6991460978d0-__READ__0000000001
192.168.60.165
cZxid = 0x1e2cb
ctime = Wed May 31 18:48:17 CST 2017
mZxid = 0x1e2cb
mtime = Wed May 31 18:48:17 CST 2017
pZxid = 0x1e2cb
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07fd
dataLength = 14
numChildren = 0

可以看到,10次读锁,其实只有一个锁节点产生。所以,读锁可重入。

6.2 写锁重入

@Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var count: Int = 0;

        while (count < 10) {
            if (writeLock.acquire(3, TimeUnit.SECONDS)) {
                println("$id $count 加写锁成功 $time")
            } else {
                println("$id $count 加写锁失败 $time")
            }
            TimeUnit.SECONDS.sleep(10)
            count++
        }

        println("$id 结束: $time")

    }

单线程,重复申请10次写锁。 输出:

id : fiQYaoAGcN 
fiQYaoAGcN 0 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 1 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 2 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 3 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 4 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 5 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 6 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 7 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 8 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 9 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 结束: Wed May 31 18:55:14 CST 2017

zookeeper节点:

ls /test/locks/rw
[_c_dce473ea-580d-472d-aa5c-dfdcfba610f0-__WRIT__0000000002]


get /test/locks/rw/_c_dce473ea-580d-472d-aa5c-dfdcfba610f0-__WRIT__0000000002
192.168.60.165
cZxid = 0x1e2ce
ctime = Wed May 31 18:55:16 CST 2017
mZxid = 0x1e2ce
mtime = Wed May 31 18:55:16 CST 2017
pZxid = 0x1e2ce
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07fe
dataLength = 14
numChildren = 0

可以看到,10次写锁,其实只有一个锁节点产生。所以,写锁可重入。

6.3 写锁自动拥有读锁

@Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var count: Int = 0;

        if (writeLock.acquire(3, TimeUnit.SECONDS)) {
            println("$id $count 加写锁成功 $time")
            while (count < 10) {
                if (readLock.acquire(3, TimeUnit.SECONDS)) {
                    println("$id $count 加读锁成功 $time")
                }else{
                    println("$id $count 加读锁失败 $time")
                }
                TimeUnit.SECONDS.sleep(10)
                count++
            }
        }

        println("$id 结束: $time")

    }

同一线程先加写锁,再加读锁。

输出:

id : DFhGjYvdGI 
**DFhGjYvdGI 0 加写锁成功 Wed May 31 19:04:00 CST 2017**
DFhGjYvdGI 0 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 1 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 2 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 3 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 4 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 5 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 6 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 7 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 8 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 9 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 结束: Wed May 31 19:04:00 CST 2017

zookeeper节点:

ls /test/locks/rw
[_c_0e882376-24e3-48ee-ac92-f01a5de30130-__WRIT__0000000003, _c_3f71ef35-e744-49f8-93d3-c6f43d875768-__READ__0000000004]

get /test/locks/rw/_c_0e882376-24e3-48ee-ac92-f01a5de30130-__WRIT__0000000003
192.168.60.165
cZxid = 0x1e2d1
ctime = Wed May 31 19:04:01 CST 2017
mZxid = 0x1e2d1
mtime = Wed May 31 19:04:01 CST 2017
pZxid = 0x1e2d1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07ff
dataLength = 14
numChildren = 0

get /test/locks/rw/_c_3f71ef35-e744-49f8-93d3-c6f43d875768-__READ__0000000004
192.168.60.165
cZxid = 0x1e2d2
ctime = Wed May 31 19:04:01 CST 2017
mZxid = 0x1e2d2
mtime = Wed May 31 19:04:01 CST 2017
pZxid = 0x1e2d2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07ff
dataLength = 14
numChildren = 0

根据5.5节的介绍,如果读锁在写锁之后,是不会成功的。但是在同一线程中,已经持有了写锁,所以读锁可以直接成功。

6.3 多线程模拟

@Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var writeLockCount: Int = 0
        var readLockCount: Int = 0

        while (true) {
            if (Math.random() > 0.5) {
                println("$id 申请写锁 ${System.currentTimeMillis()}")
                if (acquire(writeLock, 10, TimeUnit.SECONDS)) {
                    println("$id 写锁成功 ${System.currentTimeMillis()}")
                    writeLockCount++
                } else {
                    println("$id 写锁失败 ${System.currentTimeMillis()}")
                }
            } else {
                println("$id 申请读锁 ${System.currentTimeMillis()}")
                if (acquire(readLock, 10, TimeUnit.SECONDS)) {
                    println("$id 读锁成功 ${System.currentTimeMillis()}")
                    readLockCount++
                } else {
                    println("$id 读锁失败 ${System.currentTimeMillis()}")
                }
            }
            TimeUnit.SECONDS.sleep(10)
            if (Math.random() > 0.95) {
                println("$id 退出 ${System.currentTimeMillis()}")
                release(writeLock)
                release(readLock)
                break
            }
        }
        println("$id 结束: ${System.currentTimeMillis()}")
    }

    fun acquire(lock: InterProcessMutex, time: Int, unit: TimeUnit): Boolean {
        return lock.acquire(time.toLong(), unit)
    }

    fun release(lock: InterProcessMutex) {
        while (lock.isAcquiredInThisProcess) {
            lock.release()
        }
    }

这里可以多运行几个实例,观察是否符合5.6节的描述~

  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 141
博文 87
码字总数 155365
×
秋雨霏霏
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: