文档章节

Redisson分布式锁深入解析(二)

JackY-Ji
 JackY-Ji
发布于 08/09 17:52
字数 1530
阅读 380
收藏 8

上一篇文章主要侧重如何获取锁以及所获取成功的场景,本文将着重对失败以及解锁的情况进行分析,探寻Redisson分布式锁最具艺术的地方。

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();

        // 订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
       // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 再次尝试一次申请锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 通过信号量(共享锁)阻塞,等待解锁消息(这一点设计的非常精妙:减少了其他分布式节点的等待或者空转等无效锁申请的操作,整体提高了性能)
                // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 
                // 否则就在wait time 时间范围内等待可以通过信号量
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            //无论是否获得锁,都要取消订阅解锁消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

我们看到当获取锁的时长超过请求等待时间,直接进入acquireFailed(进一步调用acquireFailedAsync),并同步返回false,获取锁失败。接下里我们直接进入该异步(异步处理IO,提高系统吞吐量)方法,对其进行解析:

@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
                "redis.call('zrem', KEYS[2], ARGV[1]); " +
                "redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
                Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId));
}

可以看到,这一步就是把该线程从获取锁操作的等待队列中直接删掉;

接着往下看,如果未达到请求超时时间,则首先订阅该锁的信息。当其他线程释放锁的时候,会同时根据锁的唯一通道publish一条分布式的解锁信息,接收到分布式消息后, 等待获取锁的Semaphore中的监听队列中的listenser线程可重新申请锁,这个后面会深入讲解。下面是订阅的具体细节:

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    //根据channelName拿到信号量,channelName=UUID+":"+name,对应一个锁。
    final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
    final RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    //把生成的监听线程listenser加入到信号量的监听集合中去,后面发布解锁消息的时候,会唤醒
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

接着回到tryLock方法,看到finally里面:无论是否获得锁,都要取消订阅解锁消息,这里不做赘述。

接着我们一并分析一下解锁的过程

public void unlock() {
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            // 解锁成功之后取消更新锁expire的时间任务,针对于没有锁过期时间的
            cancelExpirationRenewal();
        }

//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

 

解锁的逻辑相对简单,具体步骤如下:

  1. 如果lock键不存在,发消息说锁已经可用

  2. 如果锁不是被当前线程锁定,则返回nil

  3. 由于支持可重入,在解锁时将重入次数需要减1

  4. 如果计算后的重入次数>0,则重新设置过期时间

  5. 如果计算后的重入次数<=0,则发消息说锁已经可用

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

下面我们再看一下Redisson是如何处理解锁消息的(LockPubSub.unlockMessage):

/**
 * 
 * @author Nikita Koksharov
 *
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
           // 释放一个许可,唤醒等待的entry.getLatch().tryAcquire去再次尝试获取锁。
            value.getLatch().release();

            while (true) {
                Runnable runnableToExecute = null;
                 // 如果entry还有其他Listeners回调,也唤醒执行。
                 synchronized (value) {
                    Runnable runnable = value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }
                
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                } else {
                    return;
                }
            }
        }
    }

}

Redisson还有很多东西可以挖掘,不仅局限分布式锁(对于分布式锁的一些细节,本文摘抄了网络中比较靠谱的一些片段,方便大家理解)。作者Nikita Koksharov 把原来Conrrent包下很多同步类(比如:CountDownLatch,Semaphore),用分布式的方式实现了一遍,还是很厉害的。这些增强的实现,以后在工作都将大有用处。这些点,以后有空的时候再慢慢研究。

© 著作权归作者所有

共有 人打赏支持
JackY-Ji
粉丝 11
博文 35
码字总数 18318
作品 0
杭州
私信 提问
Redisson项目介绍

Redisson项目介绍 Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提...

jackygurui
2016/12/08
698
2
并发编程-锁的发展和主流分布式锁比较总结

一、锁的发展 系统结构由传统的“单应用服务--》SOA --》微服务 --》无服务器” 的演进过程中,场景越来越复杂,由单体应用的但进程中多线程并发的内存锁,随着互联网场景越来越复杂,在复杂...

贾浩v
2017/10/24
0
0
Redis集群下的RedLock算法(真分布式锁) 实践

在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段。 有很多三方库和文章描述如何用Redis实现一个分布式锁管理器,但是这些库实现的方式差别很大,而且很多简单的实现其...

搜云库技术团队
11/11
0
0
Redission实现分布式锁

版权声明:本文为博主原创文章,欢迎大家讨论,未经博主允许不得转载. https://blog.csdn.net/u010398771/article/details/84650976 分布式锁是啥,就是在分布式环境下来进行资源的锁定,在单台t...

长河
11/30
0
0
聊聊redisson的分布式锁

序 本文主要研究一下redisson的分布式锁 maven 实例 源码解析 RedissonLock.tryLock redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java 这里leaseTime没有设置的话,默认是-1,使...

go4it
09/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 钱不还,我就当你人不在了

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享Bigleaf的单曲《小鹿》 《小鹿》- Bigleaf 手机党少年们想听歌,请使劲儿戳(这里) 周日在家做什么? 做手工呀, @poorfis...

小小编辑
今天
27
3
EOS docker开发环境

使用eos docker镜像是部署本地EOS开发环境的最轻松愉快的方法。使用官方提供的eos docker镜像,你可以快速建立一个eos开发环境,可以迅速启动开发节点和钱包服务器、创建账户、编写智能合约....

汇智网教程
今天
19
0
《唐史原来超有趣》的读后感优秀范文3700字

《唐史原来超有趣》的读后感优秀范文3700字: 作者:花若离。我今天分享的内容《唐史原来超有趣》这本书的读后感,我将这本书看了一遍之后就束之高阁了,不过里面的内容一直在在脑海中回放,...

原创小博客
今天
24
0
IC-CAD Methodology知识图谱

CAD (Computer Aided Design),计算机辅助设计,指利用计算机及其图形设备帮助设计人员进行设计工作,这个定义同样可以用来近似描述IC公司CAD工程师这个岗位的工作。 早期IC公司的CAD岗位最初...

李艳青1987
今天
29
0
CompletableFuture get方法一直阻塞或抛出TimeoutException

问题描述 最近刚刚上线的服务突然抛出大量的TimeoutException,查询后发现是使用了CompletableFuture,并且在执行future.get(5, TimeUnit.SECONDS);时抛出了TimeoutException异常,导致接口响...

xiaolyuh
今天
14
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部