文档章节

[Curator] Shared Reentrant Read Write Lock 的使用与分析

秋雨霏霏
 秋雨霏霏
发布于 2017/05/31 19:43
字数 2574
阅读 33
收藏 0

Shared Reentrant Read Write Lock

一个可重入的分布式读写锁。其中读写锁就是由两把锁组成的,一把控制着只读操作,而另外一个控制着写操作。读锁是允许同时进行多个读操作的,但是写操作则不允许。写锁是独占的。

1. 关键 API

org.apache.curator.framework.recipes.locks.InterProcessLock

org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock

2. 机制说明

可重入性:

在可重入模式时,对于读锁和写锁都可以像重入锁一样重入。

不可重入模式时,读锁需要等待所有的写锁释放掉才可进入。

至于使用哪种模式,完全取决于使用的锁的类型。

另外,写锁持有者可以自动获得读锁。但是,反过来就不行了。 如果一个读锁的持有者申请写锁,那它永远也获取不到。

锁降级:

可重入锁允许从写锁降级为读锁;反之,不允许从读锁升级到写锁。

3. 用法

3.1 创建

public InterProcessReadWriteLock(CuratorFramework client,
                                 String basePath)

简单的构造器创建

3.2 使用

public InterProcessLock readLock()

public InterProcessLock writeLock()

两个api,分别对应着读/写锁。返回一个org.apache.curator.framework.recipes.locks.InterProcessLock。这两个方法返回的都是Shared Reentrant Lock

4. 错误处理

和其他锁一样,务必关注链接状态的变化。增加ConnectionStateListener。 参见:Shared Reentrant Lock

5. 源码分析

5.1 类定义

public class InterProcessReadWriteLock {}

没有继承任何父类,也没有实现什么接口。

5.2 成员变量

public class InterProcessReadWriteLock
{
    private final InterProcessMutex readMutex;
    private final InterProcessMutex writeMutex;

    // must be the same length. LockInternals depends on it
    private static final String READ_LOCK_NAME  = "__READ__";
    private static final String WRITE_LOCK_NAME = "__WRIT__";
}
  • readMutex
    • org.apache.curator.framework.recipes.locks.InterProcessMutex
    • 可重入
  • writeMutex
    • org.apache.curator.framework.recipes.locks.InterProcessMutex
    • 可重入

可见,读写锁本身并不是什么新的锁类型,只是两把锁的组合。 从锁的类型来看,可以发现,读写锁是可重入的。

5.3 内部类

public class InterProcessReadWriteLock {
    private static class SortingLockInternalsDriver extends StandardLockInternalsDriver
    {
        @Override
        public final String fixForSorting(String str, String lockName)
        {
            str = super.fixForSorting(str, READ_LOCK_NAME);
            str = super.fixForSorting(str, WRITE_LOCK_NAME);
            return str;
        }
    }

    private static class InternalInterProcessMutex extends InterProcessMutex
    {
        private final String lockName;
        private final byte[] lockData;

        InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver)
        {
            super(client, path, lockName, maxLeases, driver);
            this.lockName = lockName;
            this.lockData = lockData;
        }

        @Override
        public Collection<String> getParticipantNodes() throws Exception
        {
            Collection<String>  nodes = super.getParticipantNodes();
            Iterable<String>    filtered = Iterables.filter
            (
                nodes,
                new Predicate<String>()
                {
                    @Override
                    public boolean apply(String node)
                    {
                        return node.contains(lockName);
                    }
                }
            );
            return ImmutableList.copyOf(filtered);
        }

        @Override
        protected byte[] getLockNodeBytes()
        {
            return lockData;
        }
    }
}
  • SortingLockInternalsDriver
    • 定义了一个内部锁驱动类,这个类的作用可以参见Shared Reentrant Lock
    • 主要是对fixForSorting进行覆盖
      • 可以看到用两个常量字符串拼接在排序字段上
  • InternalInterProcessMutex
    • 继承与org.apache.curator.framework.recipes.locks.InterProcessMutex
    • 多了个lockName,用于区分读/写锁
    • getParticipantNodes中,加入了一个filter
      • 对于同一个path下的锁节点进行过滤
        • 说明读/写锁都放在同一个path下,用名字中的字符串来区分

5.4 构造器

public InterProcessReadWriteLock(CuratorFramework client, String basePath)
{
    this(client, basePath, null);
}

public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
    lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);

    writeMutex = new InternalInterProcessMutex
    (
        client,
        basePath,
        WRITE_LOCK_NAME,
        lockData,
        1,
        new SortingLockInternalsDriver()
        {
            @Override
            public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
            {
                return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
            }
        }
    );

    readMutex = new InternalInterProcessMutex
    (
        client,
        basePath,
        READ_LOCK_NAME,
        lockData,
        Integer.MAX_VALUE,
        new SortingLockInternalsDriver()
        {
            @Override
            public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
            {
                return readLockPredicate(children, sequenceNodeName);
            }
        }
    );
}

可以看见,构造器就是做了一件事,初始化读/写锁。 两者都是使用内部的InternalInterProcessMutex,而这个初始化过程完全可以参见Shared Reentrant Lock

但是,选哟注意的是:

  1. 租约数量不一样
    • writeMutex : 写锁
      • maxLeases = 1
        • 最多只有一个锁的租约
        • 排他锁
    • readMutex : 读锁
      • maxLeases = Integer.MAX_VALUE
        • “无限”
  2. 读锁驱动的getsTheLock逻辑被替换掉了

5.5 实现

上一节中提到过读锁驱动的getsTheLock逻辑被替换掉了。对于读锁是否获得锁的逻辑是一个特殊实现:org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock#readLockPredicate方法

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
    if ( writeMutex.isOwnedByCurrentThread() )
    {
        return new PredicateResults(null, true);
    }

    int         index = 0;
    int         firstWriteIndex = Integer.MAX_VALUE;
    int         ourIndex = -1;
    for ( String node : children )
    {
        if ( node.contains(WRITE_LOCK_NAME) )
        {
            firstWriteIndex = Math.min(index, firstWriteIndex);
        }
        else if ( node.startsWith(sequenceNodeName) )
        {
            ourIndex = index;
            break;
        }

        ++index;
    }

    StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

    boolean     getsTheLock = (ourIndex < firstWriteIndex);
    String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
    return new PredicateResults(pathToWatch, getsTheLock);
}
  1. 如果当前线程已经持有了写锁,则直接判定获得了读锁

    • 读/写锁的重入性质降级逻辑
  2. 通过遍历锁节点列表,找到自己的顺位,以及写锁的顺位

  3. 校验自己的顺位

    • 主要是防止自己的读锁节点出现问题(误删,session失效等)
  4. 只要自身顺位在写锁顺位之前

    • 即:先加读锁,后加的写锁
    • 则判定获得读锁
  5. 如果没有获取读锁,则对写锁顺位的节点进行监听

    • 当写锁释放掉之后,重新进入判定过程

5.6 小结

所以,可以简单归纳一下读写锁的特性:

  1. 读写锁都可重入
  2. 可以同时有多个读锁持有者
  3. 写锁为排他锁,只能等所有的之前的锁持有者(无论读/写)都释放完
  4. 如果有写锁存在,读锁需要等待写锁释放

6. 测试

6.1 读锁重入

package com.roc.curator.demo.locks

import org.apache.commons.lang3.RandomStringUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit

/**
 * Created by roc on 2017/5/31.
 */
class ReadWriteLockTest {

    val LATCH_PATH: String = "/test/locks/rw"

    var client: CuratorFramework = CuratorFrameworkFactory.builder()
            .connectString("0.0.0.0:8888")
            .connectionTimeoutMs(5000)
            .retryPolicy(ExponentialBackoffRetry(1000, 10))
            .sessionTimeoutMs(3000)
            .build()

    @Before fun init() {
        client.start()
    }

    @Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var count: Int = 0;

        while (count < 10) {
            if (readLock.acquire(3, TimeUnit.SECONDS)) {
                println("$id $count 加读锁成功 $time")
            } else {
                println("$id $count 加读锁失败 $time")
            }
            TimeUnit.SECONDS.sleep(10)
            count++
        }

        println("$id 结束: $time")

    }
}

单线程,重复申请10次读锁。 输出:

id : tvyLxApqnS 
tvyLxApqnS 0 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 1 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 2 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 3 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 4 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 5 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 6 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 7 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 8 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 9 加读锁成功 Wed May 31 18:48:16 CST 2017
tvyLxApqnS 结束: Wed May 31 18:48:16 CST 2017

zookeeper节点:

ls /test/locks/rw
[_c_4ca5aad6-958a-41b3-a890-6991460978d0-__READ__0000000001]

get /test/locks/rw/_c_4ca5aad6-958a-41b3-a890-6991460978d0-__READ__0000000001
192.168.60.165
cZxid = 0x1e2cb
ctime = Wed May 31 18:48:17 CST 2017
mZxid = 0x1e2cb
mtime = Wed May 31 18:48:17 CST 2017
pZxid = 0x1e2cb
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07fd
dataLength = 14
numChildren = 0

可以看到,10次读锁,其实只有一个锁节点产生。所以,读锁可重入。

6.2 写锁重入

@Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var count: Int = 0;

        while (count < 10) {
            if (writeLock.acquire(3, TimeUnit.SECONDS)) {
                println("$id $count 加写锁成功 $time")
            } else {
                println("$id $count 加写锁失败 $time")
            }
            TimeUnit.SECONDS.sleep(10)
            count++
        }

        println("$id 结束: $time")

    }

单线程,重复申请10次写锁。 输出:

id : fiQYaoAGcN 
fiQYaoAGcN 0 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 1 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 2 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 3 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 4 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 5 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 6 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 7 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 8 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 9 加写锁成功 Wed May 31 18:55:14 CST 2017
fiQYaoAGcN 结束: Wed May 31 18:55:14 CST 2017

zookeeper节点:

ls /test/locks/rw
[_c_dce473ea-580d-472d-aa5c-dfdcfba610f0-__WRIT__0000000002]


get /test/locks/rw/_c_dce473ea-580d-472d-aa5c-dfdcfba610f0-__WRIT__0000000002
192.168.60.165
cZxid = 0x1e2ce
ctime = Wed May 31 18:55:16 CST 2017
mZxid = 0x1e2ce
mtime = Wed May 31 18:55:16 CST 2017
pZxid = 0x1e2ce
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07fe
dataLength = 14
numChildren = 0

可以看到,10次写锁,其实只有一个锁节点产生。所以,写锁可重入。

6.3 写锁自动拥有读锁

@Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var count: Int = 0;

        if (writeLock.acquire(3, TimeUnit.SECONDS)) {
            println("$id $count 加写锁成功 $time")
            while (count < 10) {
                if (readLock.acquire(3, TimeUnit.SECONDS)) {
                    println("$id $count 加读锁成功 $time")
                }else{
                    println("$id $count 加读锁失败 $time")
                }
                TimeUnit.SECONDS.sleep(10)
                count++
            }
        }

        println("$id 结束: $time")

    }

同一线程先加写锁,再加读锁。

输出:

id : DFhGjYvdGI 
**DFhGjYvdGI 0 加写锁成功 Wed May 31 19:04:00 CST 2017**
DFhGjYvdGI 0 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 1 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 2 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 3 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 4 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 5 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 6 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 7 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 8 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 9 加读锁成功 Wed May 31 19:04:00 CST 2017
DFhGjYvdGI 结束: Wed May 31 19:04:00 CST 2017

zookeeper节点:

ls /test/locks/rw
[_c_0e882376-24e3-48ee-ac92-f01a5de30130-__WRIT__0000000003, _c_3f71ef35-e744-49f8-93d3-c6f43d875768-__READ__0000000004]

get /test/locks/rw/_c_0e882376-24e3-48ee-ac92-f01a5de30130-__WRIT__0000000003
192.168.60.165
cZxid = 0x1e2d1
ctime = Wed May 31 19:04:01 CST 2017
mZxid = 0x1e2d1
mtime = Wed May 31 19:04:01 CST 2017
pZxid = 0x1e2d1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07ff
dataLength = 14
numChildren = 0

get /test/locks/rw/_c_3f71ef35-e744-49f8-93d3-c6f43d875768-__READ__0000000004
192.168.60.165
cZxid = 0x1e2d2
ctime = Wed May 31 19:04:01 CST 2017
mZxid = 0x1e2d2
mtime = Wed May 31 19:04:01 CST 2017
pZxid = 0x1e2d2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07ff
dataLength = 14
numChildren = 0

根据5.5节的介绍,如果读锁在写锁之后,是不会成功的。但是在同一线程中,已经持有了写锁,所以读锁可以直接成功。

6.3 多线程模拟

@Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        var rw: InterProcessReadWriteLock = InterProcessReadWriteLock(client, LATCH_PATH)
        var readLock: InterProcessMutex = rw.readLock()
        var writeLock: InterProcessMutex = rw.writeLock()
        var writeLockCount: Int = 0
        var readLockCount: Int = 0

        while (true) {
            if (Math.random() > 0.5) {
                println("$id 申请写锁 ${System.currentTimeMillis()}")
                if (acquire(writeLock, 10, TimeUnit.SECONDS)) {
                    println("$id 写锁成功 ${System.currentTimeMillis()}")
                    writeLockCount++
                } else {
                    println("$id 写锁失败 ${System.currentTimeMillis()}")
                }
            } else {
                println("$id 申请读锁 ${System.currentTimeMillis()}")
                if (acquire(readLock, 10, TimeUnit.SECONDS)) {
                    println("$id 读锁成功 ${System.currentTimeMillis()}")
                    readLockCount++
                } else {
                    println("$id 读锁失败 ${System.currentTimeMillis()}")
                }
            }
            TimeUnit.SECONDS.sleep(10)
            if (Math.random() > 0.95) {
                println("$id 退出 ${System.currentTimeMillis()}")
                release(writeLock)
                release(readLock)
                break
            }
        }
        println("$id 结束: ${System.currentTimeMillis()}")
    }

    fun acquire(lock: InterProcessMutex, time: Int, unit: TimeUnit): Boolean {
        return lock.acquire(time.toLong(), unit)
    }

    fun release(lock: InterProcessMutex) {
        while (lock.isAcquiredInThisProcess) {
            lock.release()
        }
    }

这里可以多运行几个实例,观察是否符合5.6节的描述~

© 著作权归作者所有

共有 人打赏支持
秋雨霏霏
粉丝 149
博文 91
码字总数 160620
作品 0
杭州
CTO(技术副总裁)
使用Zookeeper解决微服务架构下分布式事务问题

准备工作 单机调试zookeeper集群的话,我们需要在虚拟机里虚拟出几台“微服务器“,做这一步操作之前需要在系统中预留出来8G以上磁盘空间,4G以上物理内存。 [if !supportLists]1. [endif]虚...

A尚学堂Nancy老师
09/17
0
0
分布式利器Zookeeper(三)

前言 《分布式利器Zookeeper(一)》 《分布式利器Zookeeper(二):分布式锁》 本篇博客是分布式利器Zookeeper系列的最后一篇,涉及的话题是:Zookeeper分布式锁的代码实现、zkclient的使用、...

zfz_linux_boy
07/01
0
0
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh
04/28
0
0
跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下: Recipes Framework Utilities Client Errors Extensions 前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Clien...

longbadx
2015/02/11
0
0
ZooKeeper典型使用场景

先需要下载 curator 的依赖包 curator-framework curator-recipes Guava-14.0.1 分布式锁 在分布式环境中,为了保证数据的一致性,经常在程序的某个运行点需要进行同步控制。 类是一个同步计...

兔之
2015/10/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

可爱的python测试开发库(python测试开发工具库汇总)

欢迎转载,转载请注明来源: github地址 谢谢点赞 本文地址 相关书籍下载 测试开发 Web UI测试自动化 splinter - web UI测试工具,基于selnium封装。 链接 selenium - web UI自动化测试。 链...

python测试开发人工智能安全
53分钟前
2
0
Shiro | 实现权限验证完整版

写在前面的话 提及权限,就会想到安全,是一个十分棘手的话题。这里只是作为学校Shiro的一个记录,而不是,权限就应该这样设计之类的。 Shiro框架 1、Shiro是基于Apache开源的强大灵活的开源...

冯文议
今天
1
0
linux 系统的运行级别

运行级别 运行级别 | 含义 0 关机 1 单用户模式,可以想象为windows 的安全模式,主要用于修复系统 2 不完全的命令模式,不含NFS服务 3 完全的命令行模式,就是标准的字符界面 4 系统保留 5 ...

Linux学习笔记
今天
2
0
学习设计模式——命令模式

任何模式的出现,都是为了解决一些特定的场景的耦合问题,以达到对修改封闭,对扩展开放的效果。命令模式也不例外: 命令模式是为了解决命令的请求者和命令的实现者之间的耦合关系。 解决了这...

江左煤郎
今天
3
0
字典树收集(非线程安全,后续做线程安全改进)

将500W个单词放进一个数据结构进行存储,然后进行快速比对,判断一个单词是不是这个500W单词之中的;来了一个单词前缀,给出500w个单词中有多少个单词是该前缀. 1、这个需求首先需要设计好数据结...

算法之名
昨天
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部