文档章节

java 分布式锁

w
 walle-Liao
发布于 2016/01/27 22:32
字数 2167
阅读 253
收藏 11

实现分布式锁的基本思路

采用 redis 和 zookeeper 的方式实现

  • 采用 redis 实现分布式锁的基本思路通过一个单节点的 redis 服务器,然后所有客户端线程通过设置同一个 key 来实现锁的获取,这里的关键是要区分开不同节点机器的不同线程(例如每个 JVM 里面的不同线程都通过一个单例对象来获取锁,该单例对象中对应一个 UUID,这样就区分开了不同的 JVM,然后一个 JVM 的不同线程通过线程 ID 来区分,UUID+线程ID 就区分开了不同进程的不同线程),对应缓存值需要保存两部分信息,一个就是线程的唯一标识(UUID+线程ID);另外一个就是该线程获取锁的次数,用来支持重入锁的特性,可以将这两个信息保存到一个 json 对象中,这样 key 和 value 就都有了,如果线程设置 key 成功,则表示锁获取成功,如果当前 key 已经被设置,则说明锁已经被人占用,如果当前 key 对应的 value 里面的线程标识正好好当前线程标识相同,则锁重入

  • 采用 zookeeper 方式实现分布式锁的基本思路是利用,ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名。例如:我们在Zookeeper目录 /test 目录下创建,两个客户端创建一个名为 Lock 节点,只有一个能够成功。利用名称唯一性,加锁操作时,只需要所有客户端一起创建 /test/Lock 节点,只有一个创建成功,成功者获得锁。解锁时,只需删除 /test/Lock 节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁。


RedissonLock 实现分布式锁部分源码介绍

参考 redisson 项目 

RLock 接口介绍

注:这里的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有区别

lockInterruptibly 方法中多了 leaseTime 参数,表示锁最大持有时间,即客户申请到锁之后不管是否手动 unlock 了,超过 leaseTime 设定的时间后都将自动释放锁,防止客户程序异常导致锁无法释放的问题

tryLock 方法除了 waitTime 参数外,也多了一个 leaseTime 时间,其原理也是一样的


下面是 RedissonLock 里面的部分源码分析

lockInterruptibly 获取锁的主要入口方法

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }
        // 锁获取成功,直接返回
        if (ttl == null) {
            return;
        }

        // redis 消息机制
        // "redisson_lock__channel__{" + getName() + "}"; 在该 channel 上订阅消息,当 unlock 发生时,将 channel 上所有监听者将收到通知
        Future<RedissonLockEntry> future = subscribe();
        future.sync();

        try {
            while (true) {
                // 再尝试一次获取锁,如果获取到了就直接返回
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // 阻塞等待,当其他线程调用 unlock 方法时被唤醒,或者 ttl 时间超时
                // 唤醒之后需要重新竞争锁,因为可能多个线程被同时唤醒,而每次只会有一个线程成功获取锁
                RedissonLockEntry entry = getEntry();
                if (ttl >= 0) {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    entry.getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future);
        }
    }

tryLockInner 方法的内部实现

    Long tryLockInner(long leaseTime, TimeUnit unit) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 这里通过一条 redis 的批处理命令来设置 key(这里由 redis 的特性来保证整条批处理命令的原子性)
        // 第一个 if 如果 key 不存在,则设置 uuid+线ID 对应的值为 1,并设值 key 对应的超时时间
        // 第二个 if 为重入锁特性的支持,并且刷新 key 的超时时间
        // 否则返回 key 对应超时时间,即锁获取失败
        return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +   // KEYS[1] == Collections.<Object>singletonList(getName())
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // ARGV[2] == getLockName()
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +   // ARGV[1] == internalLockLeaseTime 
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName());
    }

unlock 方法的内部实现

    @Override
    public void unlock() {
        // 这里需要先理解 redis 中缓存的 key 和 value 的结构,key 对应的就是锁的名称,所有节点的所有线程都是采用同一个 key (同一把锁)
        // value 对应的是一个对象的结构,其中有两个属性 {UUID+线程ID, 获取锁的次数}
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +   // 如果对应的 key 已经不存在,说明 key 已经超时,redis 自动删除了该 key,getChannelName() 发布 unlockMessage 通知其他在阻塞等待获取锁的线程
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  // key 存在,但是已经不是被当前线程占有
                            "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  // getLockName() 其实等于 UUID+线程ID,将这个值-1,如果结果大于0,说明锁存在重入,则重新刷新锁超时时间
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('del', KEYS[1]); " +  // -1之后=0,则正常删除 key,并且发布 unlockMessage 事件
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; "+
                        "end; " +
                        "return nil;",
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName());
        if (opStatus == null) {
            // IllegalMonitorStateException 表示当前调用 unlock 方法的线程不是持有 lock 的线程
            throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }


RedissonLock 分布式锁的基本使用

RedissonLock 使用起来非常简单,如果需要详细了解 RedissonLock 的使用,可以看看 Redisson 项目中相关的测试用例

public void testLock() {
	RLock lock = redisson.getLock("lockName");
	try {
		// .....
	} finally {
		if(lock.isHeldByCurrentThread())
			lock.unlock();
	}
}

// redisson 创建示例代码,一般实现成单例模式
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);


其他问题探讨

本文详细介绍了采用 RedissonLock 方式实现分布式锁的相关源码和使用方式,但是这种方式也存在一个问题是,用于实现分布式锁的获取和释放是一个单实例的 redis 实例,如果该实例宕机,系统中所有的分布式锁获取程序都无法正常工作,那么第一个问题:

如何通过一个集群环境的 redis 实例来实现分布式锁的管理,以实现分布式锁的高可用性

一个可行的解决思路如下(参考:http://www.open-open.com/lib/view/open1415107259996.html):

我们假设有 N 个 Redis 主节点。这些节点是相互独立的,因此我们不使用复制或其他隐式同步机制。我们已经描述过在单实例情况下如何安全地获取锁。我们也指出此算法将使用这种方法从单实例获取和释放锁。在以下示例中,我们设置N=5(这是个比较适中的值),这样我们需要在不同物理机或虚拟机上运行 5 个 Redis 主节点,以确保它们的出错是尽可能独立的。

为了获取锁,客户端执行以下操作:

  1. 获取当前时间,以毫秒为单位。

  2. 以串行的方式尝试从所有的N个实例中获取锁,使用的是相同的key值和相同的随机value值。在从每个实例获取锁时,客户端会设置一个连接超时,其时长相比锁的自动释放时间要短得多。例如,若锁的自动释放时间是10秒,那么连接超时大概设在5到50毫秒之间。这可以避免当Redis节点挂掉时,会长时间堵住客户端:如果某个节点没及时响应,就应该尽快转到下个节点。

  3. 客户端计算获取所有锁耗费的时长,方法是使用当前时间减去步骤1中的时间戳。当且仅当客户端能从多数节点(至少3个)中获得锁,并且耗费的时长小于锁的有效期时,可认为锁已经获得了。

  4. 如果锁获得了,它的最终有效时长将重新计算为其原时长减去步骤3中获取锁耗费的时长。

  5. 如果锁获取失败了(要么是没有锁住N/2+1个节点,要么是锁的最终有效时长为负数),客户端会对所有实例进行解锁操作(即使对那些没有加锁成功的实例也一样)。


© 著作权归作者所有

共有 人打赏支持
w
粉丝 3
博文 15
码字总数 15775
作品 0
深圳
使用zookeeper序列节点实现不可重入分布式锁

一、前言 在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源时候,juc包的锁就...

加多
01/12
0
0
Java开发:错过金三银四 你还要错过金九银十吗?面试大纲总结

前言: 一年之计在于春 金三银四已过,2018也已经年过一半多,作为一个开发人员,你是否面上了自己理想的公司,薪资达到心中理想的高度? 面试:如果不准备充分的面试,完全是浪费时间,更是...

Java大蜗牛
08/16
0
0
Java高级程序员面试大纲——错过了金三,你还要错过银四吗

跳槽时时刻刻都在发生,但是我建议大家跳槽之前,先想清楚为什么要跳槽。切不可跟风,看到同事一个个都走了,自己也盲目的开始面试起来(期间也没有准备充分),到底是因为技术原因(影响自己...

Java高级架构
04/27
0
0
Java程序员面试大纲—错过了金三银四,你还要错过2018吗?

跳槽时时刻刻都在发生,但是我建议大家跳槽之前,先想清楚为什么要跳槽。切不可跟风,看到同事一个个都走了,自己也盲目的开始面试起来(期间也没有准备充分),到底是因为技术原因(影响自己...

java高级架构牛人
04/27
0
0
分布式锁的理解,java自带的锁为什么会失效

前段时间在发送短信的代码块上通过网上找的工具类基于Redis实现了分布式锁的功能 对应的链接https://www.cnblogs.com/c-h-y/p/9391602.html 周末想细细看一下。 之后郁闷的是为什么java自带的...

陈灬大灬海
08/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

内存模型是怎么解决缓存一致性的?

在再有人问你Java内存模型是什么,就把这篇文章发给他。这篇文章中,我们介绍过关于Java内容模型的来龙去脉。 我们在文章中提到过,由于CPU和主存的处理速度上存在一定差别,为了匹配这种差距...

Java填坑之路
17分钟前
1
0
vue-cli 3.0 初体验

最近复习了下vue,突然发现vue-cli已经更新到3.0版本了,并且变化蛮大,看来要不停的学习,真是一入前端深似海。 安装步骤: 1、全局安装 npm install -g @vue/cli Vue CLI 的包名称由 vue-...

tianyawhl
19分钟前
0
0
Angular进阶之路

【初级】会写页面,能出东西。 给定环境和 rest API,不用第三方库,能在十分钟内完成一个 master/detail 结构的带路由的应用(可以不管美观)。 知识点:Angular CLI、组件、路由、HTTP 服务...

陆小七的主页
21分钟前
0
0
Redis缓存数据库安全加固指导(一)

背景 在众多开源缓存技术中,Redis无疑是目前功能最为强大,应用最多的缓存技术之一,参考2018年国外数据库技术权威网站DB-Engines关于key-value数据库流行度排名,Redis暂列第一位,但是原生...

中间件小哥
22分钟前
0
0
百万级数据mysql分区

1. 什么是表分区? 表分区,是指根据一定规则,将数据库中的一张表分解成多个更小的,容易管理的部分。从逻辑上看,只有一张表,但是底层却是由多个物理分区组成。 2. 表分区与分表的区别 分表...

罗文浩
24分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部