java 分布式锁
java 分布式锁
walle-Liao 发表于2年前
java 分布式锁
  • 发表于 2年前
  • 阅读 212
  • 收藏 11
  • 点赞 2
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

摘要: 常见的实现分布式锁的方式包括采用 redis 和 zookeeper 的方式实现 本文主要介绍基于 redisson 实现分布式锁

实现分布式锁的基本思路

采用 redis 和 zookeeper 的方式实现

  • 采用 redis 实现分布式锁的基本思路通过一个单节点的 redis 服务器,然后所有客户端线程通过设置同一个 key 来实现锁的获取,这里的关键是要区分开不同节点机器的不同线程(例如每个 JVM 里面的不同线程都通过一个单例对象来获取锁,该单例对象中对应一个 UUID,这样就区分开了不同的 JVM,然后一个 JVM 的不同线程通过线程 ID 来区分,UUID+线程ID 就区分开了不同进程的不同线程),对应缓存值需要保存两部分信息,一个就是线程的唯一标识(UUID+线程ID);另外一个就是该线程获取锁的次数,用来支持重入锁的特性,可以将这两个信息保存到一个 json 对象中,这样 key 和 value 就都有了,如果线程设置 key 成功,则表示锁获取成功,如果当前 key 已经被设置,则说明锁已经被人占用,如果当前 key 对应的 value 里面的线程标识正好好当前线程标识相同,则锁重入

  • 采用 zookeeper 方式实现分布式锁的基本思路是利用,ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名。例如:我们在Zookeeper目录 /test 目录下创建,两个客户端创建一个名为 Lock 节点,只有一个能够成功。利用名称唯一性,加锁操作时,只需要所有客户端一起创建 /test/Lock 节点,只有一个创建成功,成功者获得锁。解锁时,只需删除 /test/Lock 节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁。


RedissonLock 实现分布式锁部分源码介绍

参考 redisson 项目 

RLock 接口介绍

注:这里的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有区别

lockInterruptibly 方法中多了 leaseTime 参数,表示锁最大持有时间,即客户申请到锁之后不管是否手动 unlock 了,超过 leaseTime 设定的时间后都将自动释放锁,防止客户程序异常导致锁无法释放的问题

tryLock 方法除了 waitTime 参数外,也多了一个 leaseTime 时间,其原理也是一样的


下面是 RedissonLock 里面的部分源码分析

lockInterruptibly 获取锁的主要入口方法

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }
        // 锁获取成功,直接返回
        if (ttl == null) {
            return;
        }

        // redis 消息机制
        // "redisson_lock__channel__{" + getName() + "}"; 在该 channel 上订阅消息,当 unlock 发生时,将 channel 上所有监听者将收到通知
        Future<RedissonLockEntry> future = subscribe();
        future.sync();

        try {
            while (true) {
                // 再尝试一次获取锁,如果获取到了就直接返回
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // 阻塞等待,当其他线程调用 unlock 方法时被唤醒,或者 ttl 时间超时
                // 唤醒之后需要重新竞争锁,因为可能多个线程被同时唤醒,而每次只会有一个线程成功获取锁
                RedissonLockEntry entry = getEntry();
                if (ttl >= 0) {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    entry.getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future);
        }
    }

tryLockInner 方法的内部实现

    Long tryLockInner(long leaseTime, TimeUnit unit) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 这里通过一条 redis 的批处理命令来设置 key(这里由 redis 的特性来保证整条批处理命令的原子性)
        // 第一个 if 如果 key 不存在,则设置 uuid+线ID 对应的值为 1,并设值 key 对应的超时时间
        // 第二个 if 为重入锁特性的支持,并且刷新 key 的超时时间
        // 否则返回 key 对应超时时间,即锁获取失败
        return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +   // KEYS[1] == Collections.<Object>singletonList(getName())
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // ARGV[2] == getLockName()
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +   // ARGV[1] == internalLockLeaseTime 
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName());
    }

unlock 方法的内部实现

    @Override
    public void unlock() {
        // 这里需要先理解 redis 中缓存的 key 和 value 的结构,key 对应的就是锁的名称,所有节点的所有线程都是采用同一个 key (同一把锁)
        // value 对应的是一个对象的结构,其中有两个属性 {UUID+线程ID, 获取锁的次数}
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +   // 如果对应的 key 已经不存在,说明 key 已经超时,redis 自动删除了该 key,getChannelName() 发布 unlockMessage 通知其他在阻塞等待获取锁的线程
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  // key 存在,但是已经不是被当前线程占有
                            "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  // getLockName() 其实等于 UUID+线程ID,将这个值-1,如果结果大于0,说明锁存在重入,则重新刷新锁超时时间
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('del', KEYS[1]); " +  // -1之后=0,则正常删除 key,并且发布 unlockMessage 事件
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; "+
                        "end; " +
                        "return nil;",
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName());
        if (opStatus == null) {
            // IllegalMonitorStateException 表示当前调用 unlock 方法的线程不是持有 lock 的线程
            throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }


RedissonLock 分布式锁的基本使用

RedissonLock 使用起来非常简单,如果需要详细了解 RedissonLock 的使用,可以看看 Redisson 项目中相关的测试用例

public void testLock() {
	RLock lock = redisson.getLock("lockName");
	try {
		// .....
	} finally {
		if(lock.isHeldByCurrentThread())
			lock.unlock();
	}
}

// redisson 创建示例代码,一般实现成单例模式
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);


其他问题探讨

本文详细介绍了采用 RedissonLock 方式实现分布式锁的相关源码和使用方式,但是这种方式也存在一个问题是,用于实现分布式锁的获取和释放是一个单实例的 redis 实例,如果该实例宕机,系统中所有的分布式锁获取程序都无法正常工作,那么第一个问题:

如何通过一个集群环境的 redis 实例来实现分布式锁的管理,以实现分布式锁的高可用性

一个可行的解决思路如下(参考:http://www.open-open.com/lib/view/open1415107259996.html):

我们假设有 N 个 Redis 主节点。这些节点是相互独立的,因此我们不使用复制或其他隐式同步机制。我们已经描述过在单实例情况下如何安全地获取锁。我们也指出此算法将使用这种方法从单实例获取和释放锁。在以下示例中,我们设置N=5(这是个比较适中的值),这样我们需要在不同物理机或虚拟机上运行 5 个 Redis 主节点,以确保它们的出错是尽可能独立的。

为了获取锁,客户端执行以下操作:

  1. 获取当前时间,以毫秒为单位。

  2. 以串行的方式尝试从所有的N个实例中获取锁,使用的是相同的key值和相同的随机value值。在从每个实例获取锁时,客户端会设置一个连接超时,其时长相比锁的自动释放时间要短得多。例如,若锁的自动释放时间是10秒,那么连接超时大概设在5到50毫秒之间。这可以避免当Redis节点挂掉时,会长时间堵住客户端:如果某个节点没及时响应,就应该尽快转到下个节点。

  3. 客户端计算获取所有锁耗费的时长,方法是使用当前时间减去步骤1中的时间戳。当且仅当客户端能从多数节点(至少3个)中获得锁,并且耗费的时长小于锁的有效期时,可认为锁已经获得了。

  4. 如果锁获得了,它的最终有效时长将重新计算为其原时长减去步骤3中获取锁耗费的时长。

  5. 如果锁获取失败了(要么是没有锁住N/2+1个节点,要么是锁的最终有效时长为负数),客户端会对所有实例进行解锁操作(即使对那些没有加锁成功的实例也一样)。


  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 2
博文 14
码字总数 13621
×
walle-Liao
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: