文档章节

基于zookeeper的分布式锁实现

 梵蒂冈考虑过
发布于 2016/08/11 18:40
字数 1215
阅读 94
收藏 8
  • 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

     

    准备工作

    有几个帮助类,先把代码放上来

    ZKClient 对zk的操作做了一个简单的封装

     

    Java代码  

    1. package zk.lock;  
    2.   
    3. import org.apache.zookeeper.*;  
    4. import org.apache.zookeeper.data.Stat;  
    5. import zk.util.ZKUtil;  
    6.   
    7. import java.util.concurrent.CountDownLatch;  
    8. import java.util.concurrent.TimeUnit;  
    9.   
    10. /** 
    11.  * User: zhenghui 
    12.  * Date: 14-3-26 
    13.  * Time: 下午8:50 
    14.  * 封装一个zookeeper实例. 
    15.  */  
    16. public class ZKClient implements Watcher {  
    17.  
    18.   
    19.     private ZooKeeper zookeeper;  
    20.   
    21.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
    22.   
    23.   
    24.     public ZKClient(String connectString, int sessionTimeout) throws Exception {  
    25.         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
    26.         System.out.println("connecting zk server");  
    27.         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
    28.             System.out.println("connect zk server success");  
    29.         } else {  
    30.             System.out.println("connect zk server error.");  
    31.             throw new Exception("connect zk server error.");  
    32.         }  
    33.     }  
    34.   
    35.     public void close() throws InterruptedException {  
    36.         if (zookeeper != null) {  
    37.             zookeeper.close();  
    38.         }  
    39.     }  
    40.   
    41.     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
    42.         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
    43.         path = ZKUtil.normalize(path);  
    44.         if (!this.exists(path)) {  
    45.             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
    46.         }  
    47.     }  
    48.   
    49.     public boolean exists(String path) throws Exception {  
    50.         path = ZKUtil.normalize(path);  
    51.         Stat stat = zookeeper.exists(path, null);  
    52.         return stat != null;  
    53.     }  
    54.   
    55.     public String getData(String path) throws Exception {  
    56.         path = ZKUtil.normalize(path);  
    57.         try {  
    58.             byte[] data = zookeeper.getData(path, null, null);  
    59.             return new String(data);  
    60.         } catch (KeeperException e) {  
    61.             if (e instanceof KeeperException.NoNodeException) {  
    62.                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
    63.             } else {  
    64.                 throw new Exception(e);  
    65.             }  
    66.         } catch (InterruptedException e) {  
    67.             Thread.currentThread().interrupt();  
    68.             throw new Exception(e);  
    69.         }  
    70.     }  
    71.   
    72.     @Override  
    73.     public void process(WatchedEvent event) {  
    74.         if (event == null) return;  
    75.   
    76.         // 连接状态  
    77.         Watcher.Event.KeeperState keeperState = event.getState();  
    78.         // 事件类型  
    79.         Watcher.Event.EventType eventType = event.getType();  
    80.         // 受影响的path  
    81. //        String path = event.getPath();  
    82.         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
    83.             // 成功连接上ZK服务器  
    84.             if (Watcher.Event.EventType.None == eventType) {  
    85.                 System.out.println("zookeeper connect success");  
    86.                 connectedSemaphore.countDown();  
    87.             }  
    88.         }  
    89.         //下面可以做一些重连的工作.  
    90.         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
    91.             System.out.println("zookeeper Disconnected");  
    92.         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
    93.             System.out.println("zookeeper AuthFailed");  
    94.         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
    95.             System.out.println("zookeeper Expired");  
    96.         }  
    97.     }  
    98. }  

     ZKUtil 针对zk路径的一个工具类

    Java代码  

    1. package zk.util;  
    2.   
    3. /** 
    4.  * User: zhenghui 
    5.  * Date: 14-3-26 
    6.  * Time: 下午9:56 
    7.  */  
    8. public class ZKUtil {  
    9.   
    10.     public static final String SEPARATOR = "/";  
    11.   
    12.     /** 
    13.      * 转换path为zk的标准路径 以/开头,最后不带/ 
    14.      */  
    15.     public static String normalize(String path) {  
    16.         String temp = path;  
    17.         if(!path.startsWith(SEPARATOR)) {  
    18.             temp = SEPARATOR + path;  
    19.         }  
    20.         if(path.endsWith(SEPARATOR)) {  
    21.             temp = temp.substring(0, temp.length()-1);  
    22.             return normalize(temp);  
    23.         }else {  
    24.             return temp;  
    25.         }  
    26.     }  
    27.   
    28.     /** 
    29.      * 链接两个path,并转化为zk的标准路径 
    30.      */  
    31.     public static String contact(String path1,String path2){  
    32.         if(path2.startsWith(SEPARATOR)) {  
    33.             path2 = path2.substring(1);  
    34.         }  
    35.         if(path1.endsWith(SEPARATOR)) {  
    36.             return normalize(path1 + path2);  
    37.         } else {  
    38.             return normalize(path1 + SEPARATOR + path2);  
    39.         }  
    40.     }  
    41.   
    42.     /** 
    43.      * 字符串转化成byte类型 
    44.      */  
    45.     public static byte[] toBytes(String data) {  
    46.         if(data == null || data.trim().equals("")) return null;  
    47.         return data.getBytes();  
    48.     }  
    49. }  

     NetworkUtil 获取本机IP的工具方法

    Java代码  

    1. package zk.util;  
    2.   
    3. import java.net.InetAddress;  
    4. import java.net.NetworkInterface;  
    5. import java.util.Enumeration;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-4-1 
    10.  * Time: 下午4:47 
    11.  */  
    12. public class NetworkUtil {  
    13.   
    14.     static private final char COLON = ':';  
    15.   
    16.     /** 
    17.      * 获取当前机器ip地址 
    18.      * 据说多网卡的时候会有问题. 
    19.      */  
    20.     public static String getNetworkAddress() {  
    21.         Enumeration<NetworkInterface> netInterfaces;  
    22.         try {  
    23.             netInterfaces = NetworkInterface.getNetworkInterfaces();  
    24.             InetAddress ip;  
    25.             while (netInterfaces.hasMoreElements()) {  
    26.                 NetworkInterface ni = netInterfaces  
    27.                         .nextElement();  
    28.                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  
    29.                 while(addresses.hasMoreElements()){  
    30.                     ip = addresses.nextElement();  
    31.                     if (!ip.isLoopbackAddress()  
    32.                             && ip.getHostAddress().indexOf(COLON) == -1) {  
    33.                         return ip.getHostAddress();  
    34.                     }  
    35.                 }  
    36.             }  
    37.             return "";  
    38.         } catch (Exception e) {  
    39.             return "";  
    40.         }  
    41.     }  
    42. }  

     

    --------------------------- 正文开始  -----------------------------------

    这种实现非常简单,具体的流程如下



     对应的实现如下

    Java代码  

    1. package zk.lock;  
    2.   
    3.   
    4. import zk.util.NetworkUtil;  
    5. import zk.util.ZKUtil;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-3-26 
    10.  * Time: 下午8:37 
    11.  * 分布式锁实现. 
    12.  * 
    13.  * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得 
    14.  * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP 
    15.  */  
    16. public class DistributedLock01 {  
    17.   
    18.     private ZKClient zkClient;  
    19.   
    20.   
    21.     public static final String LOCK_ROOT = "/lock";  
    22.     private String lockName;  
    23.   
    24.   
    25.     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
    26.         //先创建zk链接.  
    27.         this.createConnection(connectString,sessionTimeout);  
    28.   
    29.         this.lockName = lockName;  
    30.     }  
    31.   
    32.     public boolean tryLock(){  
    33.         String path = ZKUtil.contact(LOCK_ROOT,lockName);  
    34.         String localIp = NetworkUtil.getNetworkAddress();  
    35.         try {  
    36.             if(zkClient.exists(path)){  
    37.                 String ownnerIp = zkClient.getData(path);  
    38.                 if(localIp.equals(ownnerIp)){  
    39.                     return true;  
    40.                 }  
    41.             } else {  
    42.                 zkClient.createPathIfAbsent(path,false);  
    43.                 if(zkClient.exists(path)){  
    44.                     String ownnerIp = zkClient.getData(path);  
    45.                     if(localIp.equals(ownnerIp)){  
    46.                         return true;  
    47.                     }  
    48.                 }  
    49.             }  
    50.         } catch (Exception e) {  
    51.             e.printStackTrace();  
    52.         }  
    53.         return false;  
    54.     }  
    55.   
    56.   
    57.     /** 
    58.      * 创建zk连接 
    59.      * 
    60.      */  
    61.     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
    62.         if(zkClient != null){  
    63.             releaseConnection();  
    64.         }  
    65.         zkClient = new ZKClient(connectString,sessionTimeout);  
    66.         zkClient.createPathIfAbsent(LOCK_ROOT,true);  
    67.     }  
    68.     /** 
    69.      * 关闭ZK连接 
    70.      */  
    71.     protected void releaseConnection() throws InterruptedException {  
    72.         if (zkClient != null) {  
    73.             zkClient.close();  
    74.         }  
    75.     }  
    76.   
    77. }  

     

    总结

    网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更

  • 核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 
    1.     项目核心代码结构截图

    分布式框架介绍 - kafkaee - kafkaee的博客

       项目模块依赖分布式框架介绍 - kafkaee - kafkaee的博客

    特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化

    2.    项目依赖介绍

       2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:
     

    分布式框架介绍 - kafkaee - kafkaee的博客

           2.2 Dubbo独立服务项目依赖如下图:

     分布式框架介绍 - kafkaee - kafkaee的博客

    3.  项目功能部分截图:

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客
     

    zookeeper、dubbo服务启动 

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客
     

    dubbo管控台 

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     REST服务平台

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

© 著作权归作者所有

粉丝 27
博文 53
码字总数 105771
作品 0
深圳
私信 提问
加载中

评论(1)

梵蒂冈考虑过 博主
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
分布式锁与实现(二)基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包...

rechardchensir
2018/10/08
900
2
Redis实现分布式锁与Zookeeper实现分布式锁区别

# Redis实现分布式锁与Zookeeper实现分布式锁区别 **前言: 在学习过程中,简单的整理了一些redis跟zookeeper实现分布式锁的区别,有需要改正跟补充的地方,希望各位大佬及时指出 Redis实现分...

Java周某人
07/24
0
0
ZooKeeper 实现分布式锁

ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。 节点 ...

BeckJin
06/16
0
0
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
2018/05/08
152
0
基于redis和zookeeper的分布式锁实现方式

先来说说什么是分布式锁,简单来说,分布式锁就是在分布式并发场景中,能够实现多节点的代码同步的一种机制。从实现角度来看,主要有两种方式:基于redis的方式和基于zookeeper的方式,下面分...

技术小能手
2018/06/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

移动深度学习:人工智能的深水区

人工智能技术经历6年的快速发展,重新定义了很多行业的用户体验,而这仅是开始。 随着5G商用大规模落地,以及智能手机硬件性能越来越强、AIoT设备的快速普及,基于云-边缘-端算法和算力结构的...

博文视点Bv
17分钟前
2
0
vim 分屏 操作

$vim -On file file2 #大写O垂直分屏打开文件 $vim -on file file2 #小写水平打开 # n 表示分屏数,直接n等于文件个数 如果n小于文件,按顺序打开前面的n个,如果大于,打开空编辑页面 分屏快...

突突突酱
19分钟前
2
0
MySQL/Mariadb设置中文字符集(linux)

编辑/etc/my.cnf,添加以下设置 [mysql]default-character-set=utf8[mysqld]character_set_server=utf8[mysqld.safe]default-character-set=utf8[client]default-chara...

编程老陆
22分钟前
2
0
关于linux常用的挂载命令

挂载:就把一块磁盘(可以是光盘,U盘)绑定到一个空目录下面 一般情况下会挂载到mnt目录下面 挂载光盘(把光盘挂载到/mnt/cdrom这个目录中) mount -t iso9660 /dev/cdrom /mnt/cdrom 退出当前目录...

chenhongjiang
23分钟前
3
0
如何分享brain内容外部共享?几个需要知道的TheBrain问答

TheBrain(点击下载)是一款与众不同的思维导图软件,其所有信息通过一个又一个的节点进行联系,最终形成一个杂而不乱的网状结构。从头开始设计,让您捕获您的想法和信息在一个网络的联想,匹...

mnrssj
23分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部