文档章节

「从入门到放弃-ZooKeeper」ZooKeeper实战-分布式锁

阿里云官方博客
 阿里云官方博客
发布于 2019/09/23 14:13
字数 1496
阅读 9
收藏 0

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。

本文我们来一起写一个ZooKeeper的实现的分布式锁。

设计

参考之前学习的【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock,实现java.util.concurrent.locks.Lock接口。

我们通过重写接口中的方法实现一个可重入锁。

  • lock:请求锁,如果成功则直接返回,不成功则阻塞 直到获取锁。
  • lockInterruptibly:请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断
  • tryLock:1、尝试获取锁,获取失败的话 直接返回false,不会再等待。2、尝试获取锁,获取成功返回true,否则一直请求,直到超时返回false
  • unlock:释放锁

我们使用ZooKeeper的EPHEMERAL临时节点机制,如果能创建成功的话,则获取锁成功,释放锁或客户端断开连接后,临时节点自动删除,这样可以避免误删除或漏删除的情况。

获取锁失败后,这里我们使用轮询的方式来不断尝试创建。其实应该使用Watcher机制来实现,这样能避免大量的无用请求。在下一节更优雅的分布式锁实现机制中我们会用到。

DistributedLock

public class DistributedLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class); //ZooKeeper客户端,进行ZooKeeper操作 private ZooKeeper zooKeeper; //根节点名称 private String dir; //加锁节点 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; //要加锁节点 private String fullPath; //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。 private volatile int state; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedLock#init] error : " + e.toString(), e); } } } 
 

lock

public void lock() { //通过state实现重入机制,如果已经获取锁,则将state++即可。 if (addLockCount()) { return; } //一直尝试获取锁,知道获取成功 for (;;) { try { //创建临时节点 zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); //第一次获取锁,state++,这里不需要使用加锁机制保证原子性,因为同一时间,最多只有一个线程能create节点成功。 state++; break; } catch (InterruptedException ie) { //如果捕获中断异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lock] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } } 
 

lockInterruptibly

public void lockInterruptibly() throws InterruptedException { //通过state实现重入机制,如果已经获取锁,则将state++即可。 if (addLockCount()) { return; } for (;;) { //如果当前线程为中断状态,则抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(); } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; break; } catch (InterruptedException ie) { //如果捕获中断异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } } 
 

tryLock

public boolean tryLock() { //通过state实现重入机制,如果已经获取锁,则将state++即可。 if (addLockCount()) { return true; } //如果获取成功则返回true,失败则返回false try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (Exception e) { logger.error("[DistributedLock#tryLock] error : " + e.toString(), e); } return false; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //通过state实现重入机制,如果已经获取锁,则将state++即可。 if (addLockCount()) { return true; } //如果尝试获取超时,则返回false long nanosTimeout = unit.toNanos(time); if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; for (;;) { //如果当前线程为中断状态,则抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(); } //如果尝试获取超时,则返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { return false; } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (InterruptedException ie) { //如果捕获中断异常,则返回false logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie); return false; } catch (KeeperException ke) { //如果捕获到的异常是 节点已存在 外的其他异常,则返回false logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } } } 
 

unlock

public void unlock() { //通过state实现重入机制,如果已经获取锁,释放锁时,需要将state--。 delLockCount(); //如果state为0时,说明不再持有锁,需要将连接关闭,自动删除临时节点 if (state == 0 && zooKeeper != null) { try { zooKeeper.close(); } catch (InterruptedException e) { logger.error("[DistributedLock#unlock] error : " + e.toString(), e); } } } 
 

addLockCount

private boolean addLockCount() { //如果state大于0,即已持有锁,将state数量加一 if (state > 0) { synchronized (this) { if (state > 0) { state++; return true; } } } return false; } 
 

delLockCount

private boolean delLockCount() { //如果state大于0,即还持有锁,将state数量减一 if (state > 0) { synchronized (this) { if (state > 0) { state--; return true; } } } return false; } 
 

总结

上面就是一个通过ZooKeeper实现的分布式可重入锁,利用了临时节点的特性。源代码可见:aloofJr

其中有几个可以优化的点。

  • 轮询的方式换成Watcher机制
  • 可重入锁实现方式的优化
  • 所有线程竞争一个节点的创建,容易出现羊群效应,且是一种不公平的锁竞争模式

下节我们使用新的方式实现分布式锁来解决上面的几个问题,如果大家好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

 

原文链接

本文为云栖社区原创内容,未经允许不得转载。

© 著作权归作者所有

阿里云官方博客
粉丝 210
博文 2377
码字总数 5560474
作品 0
杭州
程序员
私信 提问
加载中

评论(0)

ZooKeeper教程资源收集(简介/原理/示例/解决方案)

菩提树下的杨过: ZooKeeper 笔记(1) 安装部署及hello world ZooKeeper 笔记(2) 监听数据变化 ZooKeeper 笔记(3) 实战应用之【统一配置管理】 ZooKeeper 笔记(4) 实战应用之【消除单点故障】...

easonjim
2017/09/05
0
0
ZooKeeper分布式专题与Dubbo微服务入门

ZooKeeper分布式专题与Dubbo微服务入门 网盘地址:https://pan.baidu.com/s/1TN6BlftB2uvvyVR7IDmODQ 密码: e6zt 备用地址(腾讯微云):https://share.weiyun.com/5539X2S 密码:65b36i Zo...

人气王子333
2018/04/17
0
0
【从入门到放弃-ZooKeeper】ZooKeeper入门

前言 ZooKeeper是一个分布式服务协调框架,可以用来维护分布式配置信息、服务注册中心、实现分布式锁等。在Hbase、Hadoop、kafka等项目中都有广泛的应用。随着分布式、微服务的普及,ZooKeep...

阿里云官方博客
2019/09/09
20
0
docker入门到实战(6)在docker中安装和使用kafka

下载镜像 这里使用了wurstmeister/kafka和wurstmeister/zookeeper这两个版本的镜像,在hub.docker.com中可以搜索到。 1、docker pull wurstmeister/zookeeper 2、docker pull wurstmeister/...

编程老司机
2018/05/14
0
0
分布式锁与实现(二)基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包...

rechardchensir
2018/10/08
1.1K
2

没有更多内容

加载失败,请刷新页面

加载更多

Tomcat优化配置

启动tomcat,访问配置: http://localhost:8080/manager/status

小小小施爷
30分钟前
71
0
如何更改为旧版本的Node.js

我在Ubuntu 10.10上运行Node.js版本v0.5.9-pre 。 我想使用v0.5.0-pre版本。 如何回滚到旧版本的节点? #1楼 用于管理Node的多个版本的另一个好的库是N: https : //github.com/visionmedia...

技术盛宴
36分钟前
86
0
三极管工作原理

随着科学技的发展,电子技术的应用几乎渗透到了人们生产生活的方方面面。晶体三极管作为电子技术中一个最为基本的常用器件,其原理对于学习电子技术的人自然应该是一个重点。三极管原理的关键...

黑客画家
39分钟前
105
0
WordCounter for mac(字数统计器) v1.6.2

想要快速的完成mac电脑上的文档字数统计,字数统计器WordCounter mac版是您的首先,该软件可以查看您的写作统计,如字数,字数,句数,行数,段数等简单而强大,别犹豫了来macdown下载体验吧...

云不若
41分钟前
47
0
mybatis-generator-maven-plugin:Communications link failure

使用 mybatis-generator 的 maven 插件可能会报出如下问题 Failed to execute goal org.mybatis.generator:mybatis-generator-maven-plugin:1.3.7:generate (default-cli) on project MyProj......

恒宝乐园
41分钟前
83
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部