详解分布式锁的三种实现方式

原创
2021/01/21 21:25
阅读数 2.4K

前言

分布式锁的背景

我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的Java多线程的18般武艺进行处理,并且可以完美的运行,毫无Bug。

但注意这是单机应用,也就是所有的请求都会分配到当前服务器的JVM内部,然后映射为操作系统的线程进行处理!而这个共享变量只是在这个JVM内部的一块内存空间。为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。

但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

对分布式锁的要求

  1. 分布式互斥性:在分布式系统环境下,一个同步资源在同一时间只能被一个机器的一个线程执行;
  2. 高可用:高可用的获取锁与释放锁;
  3. 高性能:高性能的获取锁与释放锁;
  4. 可重入性:具备可重入特性;
  5. 可失效性:具备锁失效机制,防止死锁;
  6. 非阻塞性:具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

分布式锁的实现方向

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。

分布式的CAP理论告诉我们,任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。所以,很多系统在设计之初就要对这三者做出取舍。

在分布式场景下,CAP理论是很多架构设计的指导思想。CAP思想下有两个分支CP与AP:

  • CP模型强调不管什么情况下,都要求各服务之间的数据一致;CP模型仍然保持原有的一致性要求,保证了业务资源串行竞争,更加适合于金融交易场景的强数据要求。
  • AP模型强调高可用下的数据最终一致性。基于此,AP模式可以做到更高的并发性能。

这也导致了分布式锁的实现也分为了CP型和AP型两种方向。虽然传统含义的锁要求强一致性CP模型,但AP模型分布式锁也并非没有用武之地,其使用取决于业务场景对脏数据的最大容忍度。

1 基于数据库实现分布式锁

1.1 数据库分布式锁的设计实现

基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含lock_name等字段,并在lock_name字段上创建唯一索引,多个客户端同时向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。

CREATE TABLE `lock_record` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `lock_name` varchar(50) DEFAULT NULL COMMENT '锁名称',
  `expire_time` bigint(20) DEFAULT NULL COMMENT '过期时间',
  `version` int(11) DEFAULT NULL COMMENT '版本号',
  `lock_owner` varchar(100) DEFAULT NULL COMMENT '锁拥有者',
  PRIMARY KEY (`id`),
  UNIQUE KEY `lock_name` (`lock_name`)
)

因为我们对lock_name做了唯一性约束,所以这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个INSERT操作可以成功,锁的互斥性得到了保证。

注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法。

1.2 数据库分布式锁的优化

使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:

  1. 因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,

    • 数据库需要双机部署、数据同步、主备切换;
  2. 不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据。

    • 需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
    • 或者仿照zk对可重入锁的实现,使用一个map记录当前获取的锁对象。
  3. 没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁。

    • 需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据,或者代码中判断失效时间,再使用乐观锁机制去互斥性的争锁。

基于数据库实现分布式锁,在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

一般来说,基于数据库实现分布式锁,不是一个最好的选择。

1.3 数据库分布式锁的逻辑实现

1.3.1 获取锁逻辑

// 定义一个map,用来做可重入,模仿zk的可重入实现。
private final ConcurrentMap<Thread, LockData> threadData;
/**
* @param lockName 锁名称
* @param lockTime 锁时间
* @return
*/
public boolean acquire(String lockName, Long lockTime) {
    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    // 重入
    if (lockData != null) {
        lockData.lockCount.incrementAndGet();
        return true;
    }
    // 先检查是否被其他客户端锁上了
    LockRecordDTO lockRecord = lockRecordMapperExt.selectByLockName(lockName);
    if (lockRecord == null) {// 锁还不存在
        String lockOwner = generatorOwner();
        // 尝试获取锁
        boolean acquired = tryAcquireIfLockNotExist(lockName, lockTime, lockOwner);
        if (acquired) {// 获取到了
            startExtendExpireTimeTask(lockName, lockOwner, lockTime);
            lockData = new LockData(currentThread, lockName, lockOwner);
            // 放入map
            threadData.put(currentThread, lockData);
        }
        return acquired;
    }
    // 锁已经存在了,检查它的过期时间
    long lockExpireTime = lockRecord.getExpireTime();
    // 如果已经过期,那么再次争锁,只存在于上一次获取锁的线程没有正确释放锁时
    if (lockExpireTime < System.currentTimeMillis()) {
        // 当上一次获取锁的线程没有正确释放锁时,其他线程获取锁时会走到这里
        String lockOwner = generatorOwner();
        boolean acquired = tryAcquireIfLockExist(lockRecord, lockTime, lockOwner);
        if (acquired) {
            lockData = new LockData(currentThread, lockName, lockOwner);
            threadData.put(currentThread, lockData);
        }
        return acquired;
    }
    return false;
}

/**
* 尝试获得锁,数据库表有设置唯一键约束,只有插入成功的线程才可以获取锁
*
* @param lockName  锁名称
* @param lockTime  锁的过期时间
* @param lockOwner 锁的拥有者
* @return
*/
private boolean tryAcquireIfLockNotExist(String lockName, long lockTime, String lockOwner) {
    try {
        LockRecordDTO lockRecord = new LockRecordDTO();
        lockRecord.setLockName(lockName);
        Long expireTime = System.currentTimeMillis() + lockTime;
        lockRecord.setExpireTime(expireTime);
        lockRecord.setLockOwner(lockOwner);
        lockRecord.setVersion(0);
        int insertCount = lockRecordMapperExt.insert(lockRecord);
        return insertCount == 1;
    } catch (Exception exp) {
        return false;
    }
}

/**
* 当上一次获取锁的线程没有正确释放锁时,下一次其他线程获取锁时会调用本方法
* 这时候也不用删除锁了,直接再利用就好,此时就是用乐观锁来控制互斥性了
* 当多个线程竞争获取锁时,有乐观锁控制,只有更新成功的线程才会获的锁
*
* @param lockRecord 锁记录,里面保存了上一次获取锁的拥有者信息
* @param lockTime   锁过期时间
* @param lockOwner  锁的拥有者
* @return
*/
private boolean tryAcquireIfLockExist(LockRecordDTO lockRecord, long lockTime, String lockOwner) {
    try {
        // 获取锁时,如果数据库中有记录且超时时间小于当前时间,说明持有锁的客户端崩溃退出了,
        // 没有正确释放锁,才会导致表中有过期的记录。
        // 这时,并发的获取锁时,只有更新成功的线程才可以获取锁。
        Long expireTime = System.currentTimeMillis() + lockTime;
        lockRecord.setExpireTime(expireTime);
        lockRecord.setLockOwner(lockOwner);
        int updateCount = lockRecordMapperExt.updateExpireTime(lockRecord);
        return updateCount == 1;
    } catch (Exception exp) {
        return false;
    }
}

对应乐观锁更新sql如下:

<update id="updateExpireTime" parameterType="com.iwill.db.model.LockRecordDTO">
    update lock_record
    set expire_time = #{expireTime},
     version = version + 1
    where lock_name = #{lockName} and version = #{version}
</update>

1.3.2 释放锁逻辑

释放锁时,只有持有锁的线程才可以释放锁,代码如下:

/**
* 释放锁
* 实现参考zookeeper的锁释放机制
*/
public void release() {
    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if (lockData == null) {
        throw new RuntimeException("current thread do not own lock");
    }
    int newLockCount = lockData.lockCount.decrementAndGet();
    if (newLockCount > 0) {
        return;
    }
    if (newLockCount < 0) {
        throw new RuntimeException("Lock count has gone negative for lock :" + lockData.lockName);
    }
    try {
        lockRecordMapperExt.deleteByOwner(lockData.lockName, lockData.owner);
    } finally {
        threadData.remove(currentThread);
    }
}

对应的底层sql如下:

<delete id="deleteByOwner" parameterType="java.util.Map">
    delete from lock_record where lock_name = #{lockName} and lock_owner = #{lockOwner}
</delete>

2 基于Redis实现分布式锁

Redis具有很高的并发性能,并且Redis命令对分布式锁的支持较好,实现起来比较方便。这使得使用Redis实现分布式锁成为了一种主流的实现方案。

2.1 Redis分布式锁的设计实现

Redis分布式锁分为单节点和多节点两种类型,我们分开论述。

2.1.1 单节点下的设计实现

单节点指的是一个Redis节点,即多个jvm对应一个Redis实例。主要是使用set命令加上NX PX选项(又叫做setNX命令)来实现Redis单机的分布式锁。

  1. 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的随机数,通过此在释放锁的时候进行判断。
  2. 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
  3. 释放锁的时候,通过随机数判断是不是该锁,若是该锁,则执行delete进行锁释放。

setnx加锁语句如下:

SET key_name random_value NX PX 30000

早期版本的set命令没有PX选项,导致执行完set命令后,还要在执行一次expire命令,现在二者可以合二为一了。

  • setNX命令可以设置一个key,并且只有在这个key不存的情况下才能设置成功(由NX参数控制)。
  • 同时,对这个key设置了一个30000毫秒的过期时间(由PX参数控制)。避免客户端加锁后宕机无法释放锁从而引发死锁的情况。
  • 这个key对应的值被设置为一个随机的值,但是必须保证这个随机的值在所有的客户端上都是唯一的。

随机数的唯一性很重要,它被用来在释放锁时进行比较、判断,也可以作为锁重入的依据,即如果value是自己设置的随机数,那么就表示自己现在持有锁,可重入。

避免释放一个由其他客户端创建的锁是非常重要的

例如:有个客户端A获取了锁,但是执行了比较长时间的业务逻辑,以至于超过了锁的生命周期(TTL)而让锁自动释放掉了。

客户端B之后尝试获取锁,因为之前的锁过期了,所以客户端B成功获取到了。

不久后,客户端A执行完业务逻辑,再去释放这个锁的时候,如果没有检验随机数,那么客户端A会将其误认为当前的锁是自己之前持有的锁,再进行删除的话就会发生问题。

所以只使用单独的删除命令会误删除已被其他客户端获取的锁。

加入随机数就是为了防止这个问题:

  1. 客户端A执行setNX的时候,key=lock对应的value是random_A,一段时间后,锁过期失效。
  2. 客户端B执行setNX的时候,key=lock对应的value是random_B。
  3. 客户端A执行完业务逻辑,准备删除锁,此时他判断了一下key=lock的value,发现是random_B,那显然当前的锁不是自己加的锁,不能删除。

用一段脚本执行以下语义:只有在key存在并且key对应的value值与随机数的值相等时才能被删除

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

随机数的选择有很多,最简单的方法就是取unix的当前系统时间,转换成毫秒形式,然后粘连到客户端ID的后面作为唯一随机数,虽然这种方式不是很安全,但是能满足大部分需求了。

2.1.2 多节点的分布式锁实现——Redlock算法

单节点下的分布式锁实现不可避免会出现单点问题,即单机Redis挂了就没有灾备了。正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的多节点分布式锁的实现方式:Redlock。

antirez提出的Redlock算法大概是这样的:

  • 在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。

  • 我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位,记为start time。

  2. 尝试对5个实例依次使用setNX命令,它们相同的key和具有唯一性的value(例如UUID)获取锁。

  3. 当向Redis请求获取锁时,客户端应该设置一个connect timeout时间和read timeout时间,这个超时时间应该远远小于锁的过期时间。这样单个redis宕机,不会导致客户端一直等在那。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

  4. 客户端使用当前时间current time减去start time,就得到获取锁使用的时间,记为work time。当且仅当从大多数((N/2)+1)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。

  5. 如果取到了锁,key的真正有效时间即为work time的时间。

  6. 如果因为某些原因,获取锁失败(没有在至少(N/2)+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁,即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点其实已经set成功,但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁。

  7. 获取锁失败后,需要稍等一段时间再重试,避免发生多个客户端同时申请锁的情况。最好的情况是一个客户端几乎同时地向多个Redis节点发起锁申请。

2.2 Redission封装的Redlock逻辑实现

Redisson是一个在Redis的基础上实现的Java驻内存数据网格,相较于暴露底层操作的Jedis,Redisson提供了一系列的分布式的Java常用对象,还提供了许多分布式服务

Redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.3.2</version>
</dependency>

我们来看一下Redission封装的Redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
 .setMasterName("masterName")
 .setPassword("password").setDatabase(0);

RedissonClient redissonClient = Redisson.create(config);

// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
    isLock = redLock.tryLock();

    // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);

    if (isLock) {
        //TODO if get lock success, do something;
    }
} catch (Exception e) {

} finally {
    // 无论如何, 最后都要解锁
    redLock.unlock();
}

2.3 Redis分布式锁的弱正确性

Redis实现的分布式锁,是AP模式的分布式锁,它强调并发的性能,而非数据的强一致性。在极端情况下,它甚至无法保证绝对的互斥性。或者说,无法保证分布式锁的绝对正确性。

如下这些极端场景,都可能导致Redis的分布式锁不正确。

2.3.1 主从同步场景

Redis集群中leader与slave之间的数据复制是采用异步的方式(因为需要满足高性能要求),即,leader将客户端发送的写请求记录下来后,就给客户端返回响应,后续该leader的slave节点就会从该leader节点复制数据。

那么就会存在这么一种可能性:leader接收了客户端的写请求,也给客户端响应了,但是该数据还没来得及复制到它对应的slave节点中,leader就宕机了。

注意,客户端不会给slave发送获取锁的请求,Redlock算法要求多节点之间是完全独立的,主从关系不可以存在。slave只是leader的灾备。

此时从slave节点中重新选举出来的leader也不包含之前leader最后写的数据了,这时,客户端来获取同样的锁就可以获取到,这样就会在同一时刻,两个客户端持有锁。

2.3.1 崩溃恢复场景

我们一般会开启AOF来做持久化,假设Redis集群中的某个master节点突然断电,导致setNX命令没来得及被AOF刷回磁盘就直接丢失了,导致重启后的该节点不存在锁数据。

这种情况几乎难以避免,除非我们将AOF策略设置为fsnyc = always,但这会损伤性能。

在Redlock官方文档中也提到了这个情况,Redlock 官方本身也是知道Redlock算法不是完全可靠的,官方为了解决这种问题建议使用延时启动。

当一个节点重启之后,我们规定在max TTL期间它是不可用的,这样它就不会干扰原本已经申请到的锁,等到它crash前的那部分锁都过期了,环境不存在历史锁了,那么再把这个节点加进来正常工作。

2.3.3 不可靠的时间

在不同的Redis节点上,它们的本地时间不是精确相同的,考虑到Redis使用 get time of day获取时间而不是单调的时钟,故而会受到系统时间的影响,可能会突然前进或者后退一段时间,这会导致一个key更快或更慢地过期。

在极端情况下,不精确一致的时间可能会带来如下的问题:

  1. client1想ABCDE五个节点发送获取锁的请求,从ABC三个节点处申请到锁,DE由于网络原因请求没有到达。
  2. 这之后,C节点的时钟往前推了,导致lock提前过期。
  3. 此时client2也向五个节点获取锁,并在CDE处获得了锁,AB由于lock还未过期,导致set失败。
  4. 此时client1和client2都获得了锁。

2.3.4 程序停顿

我们在假设一下程序停顿的情况:

  1. client1从ABCDE处获得了锁
  2. 当获得锁的response还没到达client1时,client1进入GC停顿
  3. 停顿期间锁已经过期了
  4. client2在ABCDE处获得了锁
  5. client1在GC完成后,也收到了获得锁的response,此时两个 client 又拿到了同一把锁

所以Redis实现的分布式锁不可用在对强一致性有严格要求的场景,如金融领域等。

2.4 总结

Martin Kleppmann在文章《How to do distributed locking》中认为Redlock实在不是一个好的选择,对于需求性能的分布式锁应用它太重了且成本高;对于需求正确性的应用来说它不够安全。

因为它对高危的时钟或者说其他上述列举的情况进行了不可靠的假设,如果你的应用只需要高性能的分布式锁不要求多高的正确性,那么单节点Redis够了;

如果你的应用想要保住正确性,那么不建议Redlock,建议使用一个合适的一致性协调系统,例如Zookeeper,可能会更好。

3 基于Zookeeper实现分布式锁

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。

作为基于AP模式实现的zk来说,它天然的适合用来实现AP模型的分布式锁,zk能够保证分布式锁的正确性,但因为需要频繁的创建和删除节点,性能上不如Redis优秀。

3.1 zk分布式锁的设计实现

Zookeeper实现分布式锁的原理就是:

  1. 创建一个目录mylock;

  2. 线程A想获取锁就在/lock-path目录下创建临时顺序节点;

  3. 获取/lock-path目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;

  4. 线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;

  5. 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

流程如下:

可以看到,zk的实现原理,天然是公平锁的原理,即客户端在获取锁时,就创建了临时会话顺序节点,那么它的顺序就固定了。而不像Redis和数据库实现的那样,是非公平的。

zk分布式锁的可失效性,是根据临时节点这个特性来的,如果某个客户端获取了锁,在执行业务代码期间宕机了,zk服务端的心跳检测到客户端失联,第一步就会删除这个客户端创建的临时节点。

3.2 Curator封装的zk分布式锁逻辑实现

Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度,Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。

Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

3.2.1 使用Curator分布式锁

应用依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

客户端注入:

@Configuration
public class CuratorBean {
    @Bean
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 1000);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
        return client;
    }
}

使用案例:

package com.iwill.zookeeper.service;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.CloseableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class CuratorClient implements InitializingBean, DisposableBean {

    @Autowired
    private CuratorFramework client;

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public void execute(String lockPath, BusinessService businessService) throws Exception {
        // 根据lockPath生产一个分布式锁的handler
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        try {
            // 尝试获取锁
            boolean acquireLockSuccess = lock.acquire(200, TimeUnit.MILLISECONDS);
            if (!acquireLockSuccess) {
                logger.warn("acquire lock fail , thread id : " + Thread.currentThread().getId());
                return;
            }
            logger.info("acquire lock success ,thread id : " + Thread.currentThread().getId());
            // 执行业务逻辑
            businessService.handle();
        } catch (Exception exp) {
            logger.error("execute throw exp", exp);
        } finally {
            if (lock.isOwnedByCurrentThread()) {
                // 释放锁
                lock.release();
            }
        }
    }
}

3.2.2 Curator分布式锁的可重入实现

跟踪获取锁的代码进入到org.apache.curator.framework.recipes.locks.InterProcessMutex#internalLock,代码如下:

private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
    */
    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        // 重入
        lockData.lockCount.incrementAndGet();
        return true;
    }

    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }
    return false;
}

可以看见zookeeper的锁是可重入的,即同一个线程可以多次获取锁,只有第一次真正的去创建临时会话顺序节点,后面的获取锁都是对重入次数加1。

相应的,在释放锁的时候,前面都是对锁的重入次数减1,只有最后一次才是真正的去删除节点。代码见:


@Override
public void release() throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
        */

    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null )
    {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }

    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 )
    {
        return;
    }
    if ( newLockCount < 0 )
    {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try
    {
        internals.releaseLock(lockData.lockPath);
    }
    finally
    {
        threadData.remove(currentThread);
    }
}

2.3 zk分布式锁的强正确性

zk分布式锁的正确性保证,是基于如下机制的:

2.3.1 zk的客户端故障检测机制

正常情况下,客户端会在会话的有效期内,会向服务器端发送PING请求,来进行心跳检查,说明自己还是存活的。

服务器端接收到客户端的请求后,会进行对应的客户端的会话激活,延长该会话的存活期。如果有会话一直没有激活,那么说明该客户端出问题了,服务器端的会话超时检测任务就会检查出那些一直没有被激活的与客户端的会话,然后进行清理,清理中有一步就是删除临时会话节点(包括临时会话顺序节点)。

这就保证了zookeeper分布锁的容错性,不会因为客户端的意外退出,导致锁一直不释放,其他客户端获取不到锁。

2.3.2 zk的强一致性机制

zk服务器集群一般由一个leader节点和其他的follower节点组成,数据的读写都是在leader节点上进行。

当一个写请求过来时,leader节点会发起一个proposal,待大多数follower节点都返回ack之后,再发起commit,待大多数follower节点都对这个proposal进行commit了,leader才会对客户端返回请求成功;

如果之后leader挂掉了,那么由于zookeeper集群的leader选举算法采用zab协议保证数据最新的follower节点当选为新的leader,所以,新的leader节点上都会有原来leader节点上提交的所有数据。

这样就保证了客户端请求数据的一致性了。不过要注意的是,zk选举期间,zk服务不可用。

更详细的细节,请见本站博客《ZAB协议分析》

综上所述,zookeeper分布式锁保证了锁的容错性、一致性。但是会产生空闲节点(/lock-path),并且有些时候不可用。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
1 收藏
0
分享
返回顶部
顶部