文档章节

java 分布式锁

w
 walle-Liao
发布于 2016/01/27 22:32
字数 2167
阅读 259
收藏 11

实现分布式锁的基本思路

采用 redis 和 zookeeper 的方式实现

  • 采用 redis 实现分布式锁的基本思路通过一个单节点的 redis 服务器,然后所有客户端线程通过设置同一个 key 来实现锁的获取,这里的关键是要区分开不同节点机器的不同线程(例如每个 JVM 里面的不同线程都通过一个单例对象来获取锁,该单例对象中对应一个 UUID,这样就区分开了不同的 JVM,然后一个 JVM 的不同线程通过线程 ID 来区分,UUID+线程ID 就区分开了不同进程的不同线程),对应缓存值需要保存两部分信息,一个就是线程的唯一标识(UUID+线程ID);另外一个就是该线程获取锁的次数,用来支持重入锁的特性,可以将这两个信息保存到一个 json 对象中,这样 key 和 value 就都有了,如果线程设置 key 成功,则表示锁获取成功,如果当前 key 已经被设置,则说明锁已经被人占用,如果当前 key 对应的 value 里面的线程标识正好好当前线程标识相同,则锁重入

  • 采用 zookeeper 方式实现分布式锁的基本思路是利用,ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名。例如:我们在Zookeeper目录 /test 目录下创建,两个客户端创建一个名为 Lock 节点,只有一个能够成功。利用名称唯一性,加锁操作时,只需要所有客户端一起创建 /test/Lock 节点,只有一个创建成功,成功者获得锁。解锁时,只需删除 /test/Lock 节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁。


RedissonLock 实现分布式锁部分源码介绍

参考 redisson 项目 

RLock 接口介绍

注:这里的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有区别

lockInterruptibly 方法中多了 leaseTime 参数,表示锁最大持有时间,即客户申请到锁之后不管是否手动 unlock 了,超过 leaseTime 设定的时间后都将自动释放锁,防止客户程序异常导致锁无法释放的问题

tryLock 方法除了 waitTime 参数外,也多了一个 leaseTime 时间,其原理也是一样的


下面是 RedissonLock 里面的部分源码分析

lockInterruptibly 获取锁的主要入口方法

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }
        // 锁获取成功,直接返回
        if (ttl == null) {
            return;
        }

        // redis 消息机制
        // "redisson_lock__channel__{" + getName() + "}"; 在该 channel 上订阅消息,当 unlock 发生时,将 channel 上所有监听者将收到通知
        Future<RedissonLockEntry> future = subscribe();
        future.sync();

        try {
            while (true) {
                // 再尝试一次获取锁,如果获取到了就直接返回
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // 阻塞等待,当其他线程调用 unlock 方法时被唤醒,或者 ttl 时间超时
                // 唤醒之后需要重新竞争锁,因为可能多个线程被同时唤醒,而每次只会有一个线程成功获取锁
                RedissonLockEntry entry = getEntry();
                if (ttl >= 0) {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    entry.getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future);
        }
    }

tryLockInner 方法的内部实现

    Long tryLockInner(long leaseTime, TimeUnit unit) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 这里通过一条 redis 的批处理命令来设置 key(这里由 redis 的特性来保证整条批处理命令的原子性)
        // 第一个 if 如果 key 不存在,则设置 uuid+线ID 对应的值为 1,并设值 key 对应的超时时间
        // 第二个 if 为重入锁特性的支持,并且刷新 key 的超时时间
        // 否则返回 key 对应超时时间,即锁获取失败
        return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +   // KEYS[1] == Collections.<Object>singletonList(getName())
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // ARGV[2] == getLockName()
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +   // ARGV[1] == internalLockLeaseTime 
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName());
    }

unlock 方法的内部实现

    @Override
    public void unlock() {
        // 这里需要先理解 redis 中缓存的 key 和 value 的结构,key 对应的就是锁的名称,所有节点的所有线程都是采用同一个 key (同一把锁)
        // value 对应的是一个对象的结构,其中有两个属性 {UUID+线程ID, 获取锁的次数}
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +   // 如果对应的 key 已经不存在,说明 key 已经超时,redis 自动删除了该 key,getChannelName() 发布 unlockMessage 通知其他在阻塞等待获取锁的线程
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  // key 存在,但是已经不是被当前线程占有
                            "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  // getLockName() 其实等于 UUID+线程ID,将这个值-1,如果结果大于0,说明锁存在重入,则重新刷新锁超时时间
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('del', KEYS[1]); " +  // -1之后=0,则正常删除 key,并且发布 unlockMessage 事件
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; "+
                        "end; " +
                        "return nil;",
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName());
        if (opStatus == null) {
            // IllegalMonitorStateException 表示当前调用 unlock 方法的线程不是持有 lock 的线程
            throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }


RedissonLock 分布式锁的基本使用

RedissonLock 使用起来非常简单,如果需要详细了解 RedissonLock 的使用,可以看看 Redisson 项目中相关的测试用例

public void testLock() {
	RLock lock = redisson.getLock("lockName");
	try {
		// .....
	} finally {
		if(lock.isHeldByCurrentThread())
			lock.unlock();
	}
}

// redisson 创建示例代码,一般实现成单例模式
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);


其他问题探讨

本文详细介绍了采用 RedissonLock 方式实现分布式锁的相关源码和使用方式,但是这种方式也存在一个问题是,用于实现分布式锁的获取和释放是一个单实例的 redis 实例,如果该实例宕机,系统中所有的分布式锁获取程序都无法正常工作,那么第一个问题:

如何通过一个集群环境的 redis 实例来实现分布式锁的管理,以实现分布式锁的高可用性

一个可行的解决思路如下(参考:http://www.open-open.com/lib/view/open1415107259996.html):

我们假设有 N 个 Redis 主节点。这些节点是相互独立的,因此我们不使用复制或其他隐式同步机制。我们已经描述过在单实例情况下如何安全地获取锁。我们也指出此算法将使用这种方法从单实例获取和释放锁。在以下示例中,我们设置N=5(这是个比较适中的值),这样我们需要在不同物理机或虚拟机上运行 5 个 Redis 主节点,以确保它们的出错是尽可能独立的。

为了获取锁,客户端执行以下操作:

  1. 获取当前时间,以毫秒为单位。

  2. 以串行的方式尝试从所有的N个实例中获取锁,使用的是相同的key值和相同的随机value值。在从每个实例获取锁时,客户端会设置一个连接超时,其时长相比锁的自动释放时间要短得多。例如,若锁的自动释放时间是10秒,那么连接超时大概设在5到50毫秒之间。这可以避免当Redis节点挂掉时,会长时间堵住客户端:如果某个节点没及时响应,就应该尽快转到下个节点。

  3. 客户端计算获取所有锁耗费的时长,方法是使用当前时间减去步骤1中的时间戳。当且仅当客户端能从多数节点(至少3个)中获得锁,并且耗费的时长小于锁的有效期时,可认为锁已经获得了。

  4. 如果锁获得了,它的最终有效时长将重新计算为其原时长减去步骤3中获取锁耗费的时长。

  5. 如果锁获取失败了(要么是没有锁住N/2+1个节点,要么是锁的最终有效时长为负数),客户端会对所有实例进行解锁操作(即使对那些没有加锁成功的实例也一样)。


© 著作权归作者所有

共有 人打赏支持
w
粉丝 3
博文 15
码字总数 15775
作品 0
深圳
使用zookeeper序列节点实现不可重入分布式锁

一、前言 在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源时候,juc包的锁就...

加多
01/12
0
0
阿里的面试官都喜欢问哪些技术问题?

金九银十是招聘的旺季,小编在这里也给大家整理了一套阿里面试官最喜欢问的问题或者出场率较高的面试题,助校招或者社招路上的你一臂之力! 首先我们需要明白一个事实,招聘的一个很关键的因...

Java填坑之路
08/26
0
0
Java开发:错过金三银四 你还要错过金九银十吗?面试大纲总结

前言: 一年之计在于春 金三银四已过,2018也已经年过一半多,作为一个开发人员,你是否面上了自己理想的公司,薪资达到心中理想的高度? 面试:如果不准备充分的面试,完全是浪费时间,更是...

Java大蜗牛
08/16
0
0
Java高级程序员面试大纲——错过了金三,你还要错过银四吗

跳槽时时刻刻都在发生,但是我建议大家跳槽之前,先想清楚为什么要跳槽。切不可跟风,看到同事一个个都走了,自己也盲目的开始面试起来(期间也没有准备充分),到底是因为技术原因(影响自己...

Java高级架构
04/27
0
0
分布式锁的理解,java自带的锁为什么会失效

前段时间在发送短信的代码块上通过网上找的工具类基于Redis实现了分布式锁的功能 对应的链接https://www.cnblogs.com/c-h-y/p/9391602.html 周末想细细看一下。 之后郁闷的是为什么java自带的...

陈灬大灬海
08/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

wordpress 汉化

在 wp-config.php 添加 define('WPLANG','zh_CN'); 在后台,更新 now ,即可。

james_laughing
28分钟前
1
0
Android JNI开发系列(十一) JNI 访问父类的构造方法和父类实例方法

JNI 访问父类的构造方法和父类实例方法 构造方法和父类实例方法 先看一段Java代码, Java package org.professor.jni.animal;import android.util.Log;public class Animal {protecte...

蔡小鹏
35分钟前
2
0
腾讯投资最高1.75亿美元正式进军菲律宾移动支付市场

菲律宾长途电话公司(PLDT)公司今日宣布,中国互联网巨头腾讯和私募股权公司KKR将获得该公司旗下金融科技公司Voyager Innovations的少数股权。 PLDT在一份声明中称:“腾讯和KKR最多将分别收...

linuxCool
今天
3
0
正则介绍及grep/egrep用法

10月16日任务 9.1 正则介绍_grep上 9.2 grep中 9.3 grep下 扩展 把一个目录下,过滤所有*.php文档中含有eval的行 grep -r --include="*.php" 'eval' /data 正则介绍 正则就是一串有规律的字符...

hhpuppy
今天
4
0
J2Cache 中使用 Lettuce 替代 Jedis 管理 Redis 连接

一直以来 J2Cache 都是使用 Jedis 连接 Redis 服务的。Jedis 是一个很老牌的 Redis 的 Java 开发包,使用很稳定,作者维护很勤勉,社区上能搜到的文章也非常非常多。算是使用范围最广的 Redi...

红薯
今天
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部