文档章节

Redisson分布式锁深入解析(二)

JackY-Ji
 JackY-Ji
发布于 08/09 17:52
字数 1530
阅读 360
收藏 8

上一篇文章主要侧重如何获取锁以及所获取成功的场景,本文将着重对失败以及解锁的情况进行分析,探寻Redisson分布式锁最具艺术的地方。

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();

        // 订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
       // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 再次尝试一次申请锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 通过信号量(共享锁)阻塞,等待解锁消息(这一点设计的非常精妙:减少了其他分布式节点的等待或者空转等无效锁申请的操作,整体提高了性能)
                // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 
                // 否则就在wait time 时间范围内等待可以通过信号量
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            //无论是否获得锁,都要取消订阅解锁消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

我们看到当获取锁的时长超过请求等待时间,直接进入acquireFailed(进一步调用acquireFailedAsync),并同步返回false,获取锁失败。接下里我们直接进入该异步(异步处理IO,提高系统吞吐量)方法,对其进行解析:

@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
                "redis.call('zrem', KEYS[2], ARGV[1]); " +
                "redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
                Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId));
}

可以看到,这一步就是把该线程从获取锁操作的等待队列中直接删掉;

接着往下看,如果未达到请求超时时间,则首先订阅该锁的信息。当其他线程释放锁的时候,会同时根据锁的唯一通道publish一条分布式的解锁信息,接收到分布式消息后, 等待获取锁的Semaphore中的监听队列中的listenser线程可重新申请锁,这个后面会深入讲解。下面是订阅的具体细节:

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    //根据channelName拿到信号量,channelName=UUID+":"+name,对应一个锁。
    final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
    final RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    //把生成的监听线程listenser加入到信号量的监听集合中去,后面发布解锁消息的时候,会唤醒
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

接着回到tryLock方法,看到finally里面:无论是否获得锁,都要取消订阅解锁消息,这里不做赘述。

接着我们一并分析一下解锁的过程

public void unlock() {
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            // 解锁成功之后取消更新锁expire的时间任务,针对于没有锁过期时间的
            cancelExpirationRenewal();
        }

//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

 

解锁的逻辑相对简单,具体步骤如下:

  1. 如果lock键不存在,发消息说锁已经可用

  2. 如果锁不是被当前线程锁定,则返回nil

  3. 由于支持可重入,在解锁时将重入次数需要减1

  4. 如果计算后的重入次数>0,则重新设置过期时间

  5. 如果计算后的重入次数<=0,则发消息说锁已经可用

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

下面我们再看一下Redisson是如何处理解锁消息的(LockPubSub.unlockMessage):

/**
 * 
 * @author Nikita Koksharov
 *
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
           // 释放一个许可,唤醒等待的entry.getLatch().tryAcquire去再次尝试获取锁。
            value.getLatch().release();

            while (true) {
                Runnable runnableToExecute = null;
                 // 如果entry还有其他Listeners回调,也唤醒执行。
                 synchronized (value) {
                    Runnable runnable = value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }
                
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                } else {
                    return;
                }
            }
        }
    }

}

Redisson还有很多东西可以挖掘,不仅局限分布式锁(对于分布式锁的一些细节,本文摘抄了网络中比较靠谱的一些片段,方便大家理解)。作者Nikita Koksharov 把原来Conrrent包下很多同步类(比如:CountDownLatch,Semaphore),用分布式的方式实现了一遍,还是很厉害的。这些增强的实现,以后在工作都将大有用处。这些点,以后有空的时候再慢慢研究。

© 著作权归作者所有

共有 人打赏支持
JackY-Ji
粉丝 12
博文 35
码字总数 18318
作品 0
杭州
Redisson项目介绍

Redisson项目介绍 Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提...

jackygurui
2016/12/08
698
2
并发编程-锁的发展和主流分布式锁比较总结

一、锁的发展 系统结构由传统的“单应用服务--》SOA --》微服务 --》无服务器” 的演进过程中,场景越来越复杂,由单体应用的但进程中多线程并发的内存锁,随着互联网场景越来越复杂,在复杂...

贾浩v
2017/10/24
0
0
聊聊redisson的分布式锁

序 本文主要研究一下redisson的分布式锁 maven 实例 源码解析 RedissonLock.tryLock redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java 这里leaseTime没有设置的话,默认是-1,使...

go4it
09/21
0
0
阿里云专访Redisson作者Rui Gu:构建开源企业级Redis客户端之路

当初为什么参与设计开发Redisson? 自04年从事工业自动化、工业IoT工作至今,涉及到很多场景需要对一系列设备进行监控和信号处理等工作。该类场景对实时处理能力,系统稳定性,高可用性,容灾...

白宸
06/25
0
0
Redis 客户端--Redisson

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】 Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Jav...

君枫
2014/08/10
15K
2

没有更多内容

加载失败,请刷新页面

加载更多

webSocket前台实现

webSocket前台实现 简单实现: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script type="application/javascript" src="js/base64.js"></script......

Airship
13分钟前
0
0
从零到一,使用实时音视频 SDK 一起开发一款 Zoom 吧

zoom(zoom.us) 是一款受到广泛使用的在线会议软件。相信各位一定在办公、会议、聊天等各种场景下体验或者使用过,作为一款成熟的商业软件,zoom 提供了稳定的实时音视频通话质量,以及白板、...

七牛云
14分钟前
0
0
Linux学习-10月16

9.1 正则介绍_grep 9.2 grep中 9.3 grep下 一、什么是正则 正则就是一串有规律的字符串,包括各种特殊符号 掌握正则对于编写shell有很大帮助 各种编程中都有正则,原理是一样的 二、grep简介...

wxy丶
21分钟前
0
0
设计模式学习与应用——单例模式

单例模式 作用:一个类只有一个实例,并且提供访问该实例的全局访问点 创建方式 1.懒汉方式 public class Singleton{//使外部无法访问这个变量,而要使用公共方法来获取private static ...

隔壁老余在这
30分钟前
0
0
亿级爆款背后,网易云音乐的生长之道

两年时间,破亿;四年时间,破4亿…… 据国内知名移动大数据监测平台Trustdata发布的《2017年下半年中国移动互联网发展分析报告》显示,2017年12月,网易云音乐MAU同比增长达43.1%,是移动音...

安卓绿色联盟
33分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部