文档章节

Redisson分布式锁深入解析(二)

JackY-Ji
 JackY-Ji
发布于 08/09 17:52
字数 1530
阅读 330
收藏 5

上一篇文章主要侧重如何获取锁以及所获取成功的场景,本文将着重对失败以及解锁的情况进行分析,探寻Redisson分布式锁最具艺术的地方。

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();

        // 订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
       // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 再次尝试一次申请锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 通过信号量(共享锁)阻塞,等待解锁消息(这一点设计的非常精妙:减少了其他分布式节点的等待或者空转等无效锁申请的操作,整体提高了性能)
                // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 
                // 否则就在wait time 时间范围内等待可以通过信号量
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            //无论是否获得锁,都要取消订阅解锁消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

我们看到当获取锁的时长超过请求等待时间,直接进入acquireFailed(进一步调用acquireFailedAsync),并同步返回false,获取锁失败。接下里我们直接进入该异步(异步处理IO,提高系统吞吐量)方法,对其进行解析:

@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
                "redis.call('zrem', KEYS[2], ARGV[1]); " +
                "redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
                Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId));
}

可以看到,这一步就是把该线程从获取锁操作的等待队列中直接删掉;

接着往下看,如果未达到请求超时时间,则首先订阅该锁的信息。当其他线程释放锁的时候,会同时根据锁的唯一通道publish一条分布式的解锁信息,接收到分布式消息后, 等待获取锁的Semaphore中的监听队列中的listenser线程可重新申请锁,这个后面会深入讲解。下面是订阅的具体细节:

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    //根据channelName拿到信号量,channelName=UUID+":"+name,对应一个锁。
    final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
    final RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    //把生成的监听线程listenser加入到信号量的监听集合中去,后面发布解锁消息的时候,会唤醒
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

接着回到tryLock方法,看到finally里面:无论是否获得锁,都要取消订阅解锁消息,这里不做赘述。

接着我们一并分析一下解锁的过程

public void unlock() {
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            // 解锁成功之后取消更新锁expire的时间任务,针对于没有锁过期时间的
            cancelExpirationRenewal();
        }

//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

 

解锁的逻辑相对简单,具体步骤如下:

  1. 如果lock键不存在,发消息说锁已经可用

  2. 如果锁不是被当前线程锁定,则返回nil

  3. 由于支持可重入,在解锁时将重入次数需要减1

  4. 如果计算后的重入次数>0,则重新设置过期时间

  5. 如果计算后的重入次数<=0,则发消息说锁已经可用

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

下面我们再看一下Redisson是如何处理解锁消息的(LockPubSub.unlockMessage):

/**
 * 
 * @author Nikita Koksharov
 *
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
           // 释放一个许可,唤醒等待的entry.getLatch().tryAcquire去再次尝试获取锁。
            value.getLatch().release();

            while (true) {
                Runnable runnableToExecute = null;
                 // 如果entry还有其他Listeners回调,也唤醒执行。
                 synchronized (value) {
                    Runnable runnable = value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }
                
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                } else {
                    return;
                }
            }
        }
    }

}

Redisson还有很多东西可以挖掘,不仅局限分布式锁(对于分布式锁的一些细节,本文摘抄了网络中比较靠谱的一些片段,方便大家理解)。作者Nikita Koksharov 把原来Conrrent包下很多同步类(比如:CountDownLatch,Semaphore),用分布式的方式实现了一遍,还是很厉害的。这些增强的实现,以后在工作都将大有用处。这些点,以后有空的时候再慢慢研究。

© 著作权归作者所有

共有 人打赏支持
JackY-Ji
粉丝 11
博文 35
码字总数 18318
作品 0
杭州
Redisson项目介绍

Redisson项目介绍 Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提...

jackygurui
2016/12/08
698
2
并发编程-锁的发展和主流分布式锁比较总结

一、锁的发展 系统结构由传统的“单应用服务--》SOA --》微服务 --》无服务器” 的演进过程中,场景越来越复杂,由单体应用的但进程中多线程并发的内存锁,随着互联网场景越来越复杂,在复杂...

贾浩v
2017/10/24
0
0
阿里云专访Redisson作者Rui Gu:构建开源企业级Redis客户端之路

当初为什么参与设计开发Redisson? 自04年从事工业自动化、工业IoT工作至今,涉及到很多场景需要对一系列设备进行监控和信号处理等工作。该类场景对实时处理能力,系统稳定性,高可用性,容灾...

白宸
06/25
0
0
Redis 客户端--Redisson

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】 Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Jav...

君枫
2014/08/10
15K
2
redisson实现分布式锁原理

Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的。 不同版本实现锁的机制并不相同 引用的...

技术小阿哥
2017/11/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

windbg调试C源码级驱动

联机方式不多说了。我博客里有,英文的。 windbg联机文档 https://docs.microsoft.com/zh-cn/windows-hardware/drivers/debugger/debug-universal-drivers---step-by-step-lab--echo-kernel......

simpower
25分钟前
0
0
redis快照和AOF简介

数据持久化到硬盘:一是快照(snapshotting),二是只追加文件(append-only file AOF) 快照 核心原理:redis某个时间内存内的所有数据写入硬盘 场景:redis快照内存里面的数据 1. 用户发送bgsav...

拐美人
25分钟前
0
0
这个七夕,送你一份程序员教科书级别的告白指南

给广大爱码士们的高能预警: 今天,就是七夕了…… (单身非作战人群请速速退场!) 时常有技术GG向个推君抱怨 经过网民多年的教育 以及技术人持之以恒的自黑 冲锋衣狂热分子·格子衫骨灰级粉...

个推
30分钟前
0
0
python爬虫日志(15)cookie详解

转载:原文地址 早期Web开发面临的最大问题之一是如何管理状态。服务器端没有办法知道两个请求是否来自于同一个浏览器。那时的办法是在请求的页面中插入一个token,并且在下一次请求中将这个...

茫羽行
31分钟前
0
0
qlv视频格式转换器

  腾讯视频中的视频影视资源有很多,小编经常在里面下载视频观看,应该也有很多朋友和小编一样吧,最近热播的电视剧也不少,如《香蜜沉沉烬如霜》、《夜天子》还有已经完结的《扶摇》,这么...

萤火的萤火
35分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部