文档章节

Redis分布式锁全局锁(悲观锁,Redisson实现)

y
 yiqifendou
发布于 2016/10/08 14:14
字数 2283
阅读 143
收藏 4
点赞 0
评论 0

1. 前因

以前实现过一个Redis实现的全局锁, 虽然能用, 但是感觉很不完善, 不可重入, 参数太多等等.

最近看到了一个新的Redis客户端Redisson, 看了下源码, 发现了一个比较好的锁实现RLock, 于是记录下.

2. Maven依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>1.2.1</version>
</dependency>

3. 初试

Redisson中RLock的使用很简单, 来看看一个最简单的例子.

import org.redisson.Redisson;
import org.redisson.core.RLock;

public class Temp {

    public static void main(String[] args) throws Exception {
        Redisson redisson = Redisson.create();

        RLock lock = redisson.getLock("haogrgr");
        lock.lock();
        try {
            System.out.println("hagogrgr");
        }
        finally {
            lock.unlock();
        }

        redisson.shutdown();
    }

}

4. RLock接口

通过上面的例子可以看出, 使用起来和juc里面的Lock接口使用很类似, 那么来看看RLock这个接口.

Rlock
|
----------Lock

          |
          ----------void lock()

          |
          ----------void lockInterruptibly()

          |
          ----------boolean tryLock()

          |
          ----------boolean tryLock(long time, TimeUnit unit)

          |
          ----------void unlock()

          |
          ----------Condition newCondition()

|
----------RObject

          |
          ----------String getName()

          |
          ----------void delete()

|
----------void lockInterruptibly(long leaseTime, TimeUnit unit)

|
----------boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)

|
----------void lock(long leaseTime, TimeUnit unit)

|
----------void forceUnlock()

|
----------boolean isLocked();

|
----------boolean isHeldByCurrentThread()

|
----------int getHoldCount()

可以看到, 该接口主要继承了Lock接口, 然后扩展了部分方法, 比如:

 boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)

新加入的leaseTime主要是用来设置锁的过期时间, 形象的解释就是, 如果超过leaseTime还没有解锁的话, 我就强制解锁.

5. RLock接口的实现

具体的实现类是RedissonLock, 下面来大概看看实现原理. 先看看 (3) 中例子执行时, 所运行的命令(通过monitor命令):

127.0.0.1:6379> monitor
OK
1434959509.494805 [0 127.0.0.1:57911] "SETNX" "haogrgr" "{\"@class\":\"org.redisson.RedissonLock$LockValue\",\"counter\":1,\"id\":\"c374addc-523f-4943-b6e0-c26f7ab061e3\",\"threadId\":1}"
1434959509.494805 [0 127.0.0.1:57911] "GET" "haogrgr"
1434959509.524805 [0 127.0.0.1:57911] "MULTI"
1434959509.529805 [0 127.0.0.1:57911] "DEL" "haogrgr"
1434959509.529805 [0 127.0.0.1:57911] "PUBLISH" "redisson__lock__channel__{haogrgr}" "0"
1434959509.529805 [0 127.0.0.1:57911] "EXEC"

可以看到, 大概原理是, 通过判断Redis中是否有某一key, 来判断是加锁还是等待, 最后的publish是一个解锁后, 通知阻塞在lock的线程.

分布式锁的实现依赖的单点, 这里Redis就是单点, 通过在Redis中维护状态信息来实现全局的锁. 那么来看看RedissonLock如何

实现可重入, 保证原子性等等细节.

6. 加锁源码分析

从最简单的无参数的lock参数来看源码.

public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return;
    }
}

public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);    //leaseTime : -1 表示key不设置过期时间
}

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    Long ttl;
    if (leaseTime != -1) {
        ttl = tryLockInner(leaseTime, unit);
    } else {
        ttl = tryLockInner();
    }
    // lock acquired
    if (ttl == null) {
        return;
    }

    subscribe().awaitUninterruptibly();

    try {
        while (true) {
            if (leaseTime != -1) {
                ttl = tryLockInner(leaseTime, unit);
            } else {
                ttl = tryLockInner();
            }
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            RedissonLockEntry entry = ENTRIES.get(getEntryName());
            if (ttl >= 0) {
                entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                entry.getLatch().acquire();
            }
        }
    } finally {
        unsubscribe();
    }
}

代码有点多, 但是没关系, 慢慢分解, 由于这里我们是调用无参数的lock方法, 所以最后执行到的方法是:

private Long tryLockInner() {
    final LockValue currentLock = new LockValue(id, Thread.currentThread().getId());    //保存锁的状态: 客户端UUID+线程ID来唯一标识某一JVM实例的某一线程
    currentLock.incCounter();    //用来保存重入次数, 实现可重入功能, 初始情况是1

    //Redisson封装了交互的细节, 具体的逻辑为execute方法逻辑.
    return connectionManager.write(getName(), new SyncOperation<LockValue, Long>() {

        @Override
        public Long execute(RedisConnection<Object, LockValue> connection) {
            Boolean res = connection.setnx(getName(), currentLock);    //如果key:haogrgr不存在, 就set并返回true, 否则返回false
            if (!res) {    //如果设置失败, 那么表示有锁竞争了, 于是获取当前锁的状态, 如果拥有者是当前线程, 就累加重入次数并set新值
                connection.watch(getName());    //通过watch命令配合multi来实现简单的事务功能
                LockValue lock = (LockValue) connection.get(getName());
                if (lock != null && lock.equals(currentLock)) {    //LockValue的equals实现为比较客户id和threadid是否一样
                    lock.incCounter();    //如果当前线程已经获取过锁, 则累加加锁次数, 并set更新
                    connection.multi();
                    connection.set(getName(), lock);
                    if (connection.exec().size() == 1) {
                        return null;    //set成功, 
                    }
                }
                connection.unwatch();

                //走到这里, 说明上面set的时候, 其他客户端在  watch之后->set之前 有其他客户端修改了key值
                //则获取key的过期时间, 如果是永不过期, 则返回-1, 具体处理后面说明
                Long ttl = connection.pttl(getName());
                return ttl;
            }
            return null;
        }
    });
}

tryLockInner的逻辑已经看完了, 可以知道, 有三种情况:

(1) key不存在, 加锁:

当key不存在时, 设置锁的初始状态并set, 具体来看就是 setnx haogrgr LockValue{ id: Redisson对象的id, threadId: 当前线程id, counter: 当前重入次数,这里为第一次获取,所以为1}

通过上面的操作. 达到获取锁的目的, 通过setnx来达到实现类似于 if(map.get(key) == null) { map.put(key) } 的功能, 防止多个客户端同时set时, 新值覆盖老值.

(2)key存在, 且获取锁的当前线程, 重入:

这里就是锁重入的情况, 也就是锁的拥有者第二次调用lock方法, 这时, 通过先get, 然后比较客户端ID和当前线程ID来判断拥有锁的线程是不是当前线程.(客户端ID+线程ID才能唯一定位锁拥有者线程)

判断发现当前是重入情况, 则累加LockValue的counter, 然后重新set回去, 这里使用到了watch和multi命令, 防止 get -> set 期间其他客户端修改了key的值.

(3)key存在, 且是其他线程获取的锁, 等待:

首先尝试获取锁(setnx), 失败后发现锁拥有者不是当前线程, 则获取key的过期时间, 返回过期时间

那么接下来看看tryLockInner调用完成后的处理代码.

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    Long ttl;
    if (leaseTime != -1) {
        ttl = tryLockInner(leaseTime, unit);
    } else {
        ttl = tryLockInner();   //lock()方法调用会走的逻辑
    }
    // lock acquired
    if (ttl == null) {   //加锁成功(新获取锁, 重入情况) tryLockInner会返回null, 失败会返回key超时时间, 或者-1(key未设置超时时间)
        return;   //加锁成功, 返回
    }

    //subscribe这个方法代码有点多, Redisson通过netty来和redis通讯, 然后subscribe返回的是一个Future类型,
    //Future的awaitUninterruptibly()调用会阻塞, 然后Redisson通过Redis的pubsub来监听unlock的topic(getChannelName())
    //例如, 5中所看到的命令 "PUBLISH" "redisson__lock__channel__{haogrgr}" "0"
    //当解锁时, 会向名为 getChannelName() 的topic来发送解锁消息("0")
    //而这里 subscribe() 中监听这个topic, 在订阅成功时就会唤醒阻塞在awaitUninterruptibly()的方法. 
    //所以线程在这里只会阻塞很短的时间(订阅成功即唤醒, 并不代表已经解锁)
    subscribe().awaitUninterruptibly();

    try {
        while (true) {    //循环, 不断重试lock
            if (leaseTime != -1) {
                ttl = tryLockInner(leaseTime, unit);
            } else {
                ttl = tryLockInner();   //不多说了
            }
            // lock acquired
            if (ttl == null) {
                break;
            }

            
            // 这里才是真正的等待解锁消息, 收到解锁消息, 就唤醒, 然后尝试获取锁, 成功返回, 失败则阻塞在acquire().
            // 收到订阅成功消息, 则唤醒阻塞上面的subscribe().awaitUninterruptibly();
            // 收到解锁消息, 则唤醒阻塞在下面的entry.getLatch().acquire();
            RedissonLockEntry entry = ENTRIES.get(getEntryName());
            if (ttl >= 0) {
                entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                entry.getLatch().acquire();
            }
        }
    } finally {
        unsubscribe();  //加锁成功或异常,解除订阅
    }
}

主要的代码都加上了详细的注释, subscribe() 方法的代码复杂些, 但具体就是利用redis的pubsub提供一个通知机制来减少不断的重试.

很多的Redis锁实现都是失败后sleep一定时间后重试, 在锁被占用时间较长时, 不断的重试是浪费, 而sleep也会导致不必要的时间浪费(在sleep期间可能已经解锁了), sleep时间太长, 时间浪费, 太短, 重试次数会增加~~~.

到这里lock的逻辑已经看完了, 其他的比如tryLock方法逻辑和lock类似, 不过加了超时时间, 然后还有一种lock方法就是对key加上了过期时间.

7. 解锁源码

unlock的逻辑相对简单.

public void unlock() {
    connectionManager.write(getName(), new SyncOperation<Object, Void>() {
        @Override
        public Void execute(RedisConnection<Object, Object> connection) {
            LockValue lock = (LockValue) connection.get(getName());
            if (lock != null) {
                LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
                if (lock.equals(currentLock)) {
                    if (lock.getCounter() > 1) {
                        lock.decCounter();
                        connection.set(getName(), lock);
                    } else {
                        unlock(connection);
                    }
                } else {
                    throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
                            + id + " thread-id: " + Thread.currentThread().getId());
                }
            } else {
                // could be deleted
            }
            return null;
        }
    });
}

private void unlock(RedisConnection<Object, Object> connection) {
    int counter = 0;
    while (counter < 5) {
        connection.multi();
        connection.del(getName());
        connection.publish(getChannelName(), unlockMessage);
        List<Object> res = connection.exec();
        if (res.size() == 2) {
            return;
        }
        counter++;
    }
    throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: "
            + id + " thread-id: " + Thread.currentThread().getId());
}

具体的逻辑比较简单, 我就不注释了, 大概就是, 如果是多次重入的, 就以此递减然后 set, 如果是只lock一次的, 就删除, 然后publish一条解锁的message到getChannelName() tocpic.

这里解锁会重试五次, 失败就抛异常.

8.总结

逻辑并不复杂, 但是通过记录客户端ID和线程ID来唯一标识线程, 实现重入功能, 通过pub sub功能来减少空转.

优点: 实现了Lock的大部分功能, 提供了特殊情况方法(如:强制解锁, 判断当前线程是否已经获取锁, 超时强制解锁等功能), 可重入, 减少重试.

缺点: 使用依赖Redisson, 而Redisson依赖netty, 如果简单使用, 引入了较多的依赖, pub sub的实时性需要测试, 没有监控等功能, 查问题麻烦, 统计功能也没有(例如慢lock日志, 2333333).

本文转载自:https://my.oschina.net/haogrgr/blog/469439?p={{page}}

共有 人打赏支持
y
粉丝 5
博文 51
码字总数 8974
作品 0
南京
Redis分布式锁 基于GETSET SETNX REDISSON 的实现

Redis分布式锁的应用 有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多...

Lien6o ⋅ 2017/09/21 ⋅ 0

Redisson项目介绍

Redisson项目介绍 Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提...

jackygurui ⋅ 2016/12/08 ⋅ 2

Redis实现分布式锁全局锁—Redis客户端Redisson中分布式锁RLock实现

前因 以前实现过一个Redis实现的全局锁, 虽然能用, 但是感觉很不完善, 不可重入, 参数太多等等. 最近看到了一个新的Redis客户端Redisson, 看了下源码, 发现了一个比较好的锁实现RLock, 于是记...

德胜 ⋅ 2015/06/22 ⋅ 7

redisson实现分布式锁原理

Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的。 不同版本实现锁的机制并不相同 引用的...

技术小阿哥 ⋅ 2017/11/27 ⋅ 0

redis分布式锁

关于分布式锁的概念网上太多了,这里就不罗嗦了。对于开发者来说,最关心的应该是什么情况下使用分布式锁。 使用分布式锁,一般要满足以下几个条件: · 分布式系统(关键是分布式) · 共享资...

明舞 ⋅ 2015/10/16 ⋅ 3

Redis 客户端--Redisson

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】 Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Jav...

君枫 ⋅ 2014/08/10 ⋅ 2

如何用Redlock实现分布式锁

转载请标明出处: http://blog.csdn.net/forezp/article/details/70305336 本文出自方志朋的博客 之前写过一篇文章《如何在springcloud分布式系统中实现分布式锁?》,由于自己仅仅是阅读了相...

forezp ⋅ 2017/04/20 ⋅ 0

使用数据库悲观锁实现不可重入的分布式锁

一、前言 在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源时候,juc包的锁就...

阿里加多 ⋅ 06/12 ⋅ 0

zookeeper分布式锁

1、pom.xml中添加zookeeper依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version></dependency> 2、DistributedLock.j......

chaun ⋅ 2015/11/08 ⋅ 0

Redis 分布式锁的正确实现方式( Java 版 )

原文出处:吴兆锋 前言 分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已...

吴兆锋 ⋅ 2017/12/02 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Jenkins实践3 之脚本

#!/bin/sh# export PROJ_PATH=项目路径# export TOMCAT_PATH=tomcat路径killTomcat(){pid=`ps -ef | grep tomcat | grep java|awk '{print $2}'`echo "tom...

晨猫 ⋅ 今天 ⋅ 0

Spring Bean的生命周期

前言 Spring Bean 的生命周期在整个 Spring 中占有很重要的位置,掌握这些可以加深对 Spring 的理解。 首先看下生命周期图: 再谈生命周期之前有一点需要先明确: Spring 只帮我们管理单例模...

素雷 ⋅ 今天 ⋅ 0

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 今天 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

实验楼—MySQL基础课程-挑战3实验报告

按照文档要求创建数据库 sudo sercice mysql startwget http://labfile.oss.aliyuncs.com/courses/9/createdb2.sqlvim /home/shiyanlou/createdb2.sql#查看下数据库代码 代码创建了grade......

zhangjin7 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部