文档章节

【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版

阿里云官方博客
 阿里云官方博客
发布于 2019/12/09 11:01
字数 955
阅读 7
收藏 0

设计

我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。

DistributedFairLock

public class DistributedFairLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //加锁节点
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
    private volatile int state;

    //当前锁创建的节点id
    private String id;

    //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
    private CountDownLatch countDownLatch;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
        }
    }
}

lock

public void lock() {
    try {
        //加锁
        synchronized (this) {
            //如果当前未持有锁
            if (state <= 0) {
                //创建节点
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                //获取当前路径下所有的节点
                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }

                //获取所有id小于当前节点顺序的节点
                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    //监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的  
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await();
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
        Thread.currentThread().interrupt();
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            Thread.currentThread().interrupt();
        }
    }
}

tryLock

public boolean tryLock() {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    return false;
                }
            }
            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await(time, unit);
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

unlock

public void unlock() {
    synchronized (this) {
        if (state > 0) {
            state--;
        }
        //当不再持有锁时,删除创建的临时节点
        if (state == 0 && zooKeeper != null) {
            try {
                zooKeeper.delete(id, -1);
                id = null;
            } catch (Exception e) {
                logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
            }
        }
    }
}

LockWatcher

private class LockWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        synchronized (this) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}

总结

上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。

本文转载自:https://yq.aliyun.com/articles/738391?utm_content=g_1000092254

阿里云官方博客
粉丝 206
博文 2332
码字总数 5560474
作品 0
杭州
程序员
私信 提问
「从入门到放弃-ZooKeeper」ZooKeeper实战-分布式锁

前言 上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。 本文我们来一起写一个ZooKeeper的实现的分布式锁。...

阿里云官方博客
2019/09/23
9
0
ZooKeeper教程资源收集(简介/原理/示例/解决方案)

菩提树下的杨过: ZooKeeper 笔记(1) 安装部署及hello world ZooKeeper 笔记(2) 监听数据变化 ZooKeeper 笔记(3) 实战应用之【统一配置管理】 ZooKeeper 笔记(4) 实战应用之【消除单点故障】...

easonjim
2017/09/05
0
0
ZooKeeper分布式专题与Dubbo微服务入门

ZooKeeper分布式专题与Dubbo微服务入门 网盘地址:https://pan.baidu.com/s/1TN6BlftB2uvvyVR7IDmODQ 密码: e6zt 备用地址(腾讯微云):https://share.weiyun.com/5539X2S 密码:65b36i Zo...

人气王子333
2018/04/17
0
0
【从入门到放弃-ZooKeeper】ZooKeeper入门

前言 ZooKeeper是一个分布式服务协调框架,可以用来维护分布式配置信息、服务注册中心、实现分布式锁等。在Hbase、Hadoop、kafka等项目中都有广泛的应用。随着分布式、微服务的普及,ZooKeep...

阿里云官方博客
2019/09/09
18
0
docker入门到实战(6)在docker中安装和使用kafka

下载镜像 这里使用了wurstmeister/kafka和wurstmeister/zookeeper这两个版本的镜像,在hub.docker.com中可以搜索到。 1、docker pull wurstmeister/zookeeper 2、docker pull wurstmeister/...

编程老司机
2018/05/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

六、Spring Cloud之配置中心config

前言 前面我们讲了微服务的注册中心、负载均衡、熔断处理、网管服务。接下来我们讲配置中心,为什么要用配置中心呢? 其实我们接触一段时间就可以发现,我们的项目还是非常多的,每个项目都有...

quellanan2
10分钟前
19
0
在Android的EditText视图中允许多行?

如何在Android的EditText视图中允许多行? #1楼 这对我有用 ,实际上这两个属性很重要: inputType和lines 。 此外,您可能需要一个滚动条,下面的代码显示了如何制作一个: <EditText ...

技术盛宴
14分钟前
13
0
分享自己写的JS版日期格式化和解析工具类,绝对好用!

前言 本来想模仿Java里面的SimpleDateFormat()对象的,但是感觉这样用起来不方便,所以还是直接写成单独的方法算了。 原文链接 日期格式化 2.1. 使用说明 formatDate(date, fmt),其中fmt支持...

SuShine
24分钟前
27
0
快递鸟api物流查询接口实现订阅物流轨迹单号查询功能对接调用

背景: 分享一篇关于在电商系统中同步物流轨迹到本地服务器的文章,当前方案使用了快递鸟集成api做为数据来源接口,这个接口是免费使用的,不过提供的功能还是非常强大的,有专门的售后维护团...

程序的小猿
28分钟前
34
0
Day08多态,abstract,接口

1.A:多态的概述:事物存在的多种形态。 B:多态前提:要有继承关系,方法重写和父类引用子类对象。 父类引用子类对象:Animal a = new Cat(); a.eat(); //效果等同于c.eat(); 2.多态中的...

Lao鹰
34分钟前
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部