Redis——基于redis实现分布式锁

原创
10/16 18:22
阅读数 57

原生命令

# 只有在 key 不存在时设置 key 的值。
> setnx key value 

# 将 key 的过期时间设为 seconds (以秒为单位)。
> expire key seconds

RedisTemplate实现演进

此处业务逻辑全部是伪代码,请勿参考,只关注锁的实现

step1. 单体架构下无锁状态,容易造成重复扣件库存
public boolean deductStock(){
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock",realStock + "");
        System.out.println("真实库存为 " + realStock );
        return true;
    }else{
        System.out.println("库存不足");
        return false;
    }
}
step2. 使用java同步锁,可以保证单机状态下不会重复扣减库存
public String deductStock(){
     //同步块
     synchronized (this){
         int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
         if(stock > 0){
             int realStock = stock - 1;
             stringRedisTemplate.opsForValue().set("stock",realStock + "");
             System.out.println("真实库存为 " + realStock );
             return true;
         }else{
             System.out.println("库存不足");
             return false;
         }
     }
 }
Step3. 使用Redis中的setnx,简单实现分布式锁。
/**
 *此方式最大的问题在于,当执行到中间部分,整个应用挂掉的时候无法,此锁无人释放,将导致上游服务一直等待,最后造成线上事故。
 */
public boolean deductStock(String productId){
    try{
        //自旋等待,直到拿到锁再执行扣件库存,否则就一直等待
        for (;;){
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(productId, "redis_lock");
            if(result){
                break;
            }
        }
        return doDeductStock(productId);
    }finally {
        //释放锁
        stringRedisTemplate.delete(productId);
    }

}
Step4. 使用设置超时时间的方式,防止程序挂掉导致的死锁问题
/**
 * 通过设置超时时间,来解决整个应用挂掉导致的死锁,但是高并发场景下,因为线程之间混乱,
 * 当第一个线程执行到中途的时候因为下游服务的原因,导致业务还没有执行结束的时候,锁自动失效了,
 * 这个时候后面的线程发现可以加锁了,于是后面线程进入争夺锁并且加锁成功,于是当前线程在finally中释放的是后面线程加上的锁。
 * 前面的线程会释放后面的线程加上的锁,导致锁永久失效
 */
public boolean deductStock(String productId){

    try{
        //自旋等待,直到拿到锁再执行扣件库存,否则就一直等待, 并且设置超时时间,当超时以后自动释放锁
        for (;;){
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(productId, "redis_lock", 30, TimeUnit.SECONDS);
            if(result){
                break;
            }
        }
        return doDeductStock(productId);
    }finally {
        //释放锁
        stringRedisTemplate.delete(productId);
    }
}
Step5: 设置线程唯一ID防止锁永久失效问题
/**
 * 通过设置线程唯一ID,来确定当前线程是这把锁的持有者。
 * 但是这样设置,如果锁失效时间太久会导致效率底下,并发数无法提升
 */
public boolean deductStock(String productId){
    //判断当前线程是不是这个线程的持有者,并且判断当前线程中的client是否当前锁的value,可以实现可重入锁,
    String clientId = UUID.randomUUID().toString();
    try{
        //自旋等待,直到拿到锁再执行扣件库存,否则就一直等待, 并且设置超时时间,当超时以后自动释放锁
        for (;;){
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(productId, clientId, 30, TimeUnit.SECONDS);
            if(result){
                break;
            }
        }
        //执行业务扣减库存
        return doDeductStock(productId);
    }finally {
        //判断当前线程持有的clientId是当前的锁的value,如果是释放锁
        if(clientId.equals(stringRedisTemplate.opsForValue().get(productId))){
            stringRedisTemplate.delete(productId);
        }
    }
}

Redission实现分布式锁

代码实现
public boolean deductStock(String productId){
    RLock redissonLock = redisson.getLock(productId);
    redissonLock.lock(30, TimeUnit.SECONDS);
    try {
        return doDeductStock(productId);
    }finally {
        redissonLock.unlock();
    }
}
原理解析
  1. 获取锁
// 调用构造器,构建锁对象,设置锁的ID,
RLock redissonLock = redisson.getLock(productId);
  1. 尝试锁
//此命令执行后开始锁
redissonLock.lock(30, TimeUnit.SECONDS);
  1. 开启分线程强行续命
后台采用watch dog模式,如果发现当前线程没有执行完毕,就不断给当前锁执行 expire key 10 命令强行续命,续命时间为lock方法参数中第二个参数的1/3
  1. 释放锁
lock.unlock();
  1. 详情原理参考这篇帖子:https://www.cnblogs.com/AnXinliang/p/10019389.html

最后、本文参考了网上一些写法,也有个人的想法,如有侵权请联系我!

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部