文档章节

基于zookeeper的分布式锁实现

 梵蒂冈考虑过
发布于 2016/08/11 10:20
字数 1215
阅读 30
收藏 5
  • 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

     

    准备工作

    有几个帮助类,先把代码放上来

    ZKClient 对zk的操作做了一个简单的封装

     

    Java代码  

    1. package zk.lock;  
    2.   
    3. import org.apache.zookeeper.*;  
    4. import org.apache.zookeeper.data.Stat;  
    5. import zk.util.ZKUtil;  
    6.   
    7. import java.util.concurrent.CountDownLatch;  
    8. import java.util.concurrent.TimeUnit;  
    9.   
    10. /** 
    11.  * User: zhenghui 
    12.  * Date: 14-3-26 
    13.  * Time: 下午8:50 
    14.  * 封装一个zookeeper实例. 
    15.  */  
    16. public class ZKClient implements Watcher {  
    17.   
    18.     private ZooKeeper zookeeper;  
    19.   
    20.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
    21.   
    22.   
    23.     public ZKClient(String connectString, int sessionTimeout) throws Exception {  
    24.         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
    25.         System.out.println("connecting zk server");  
    26.         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
    27.             System.out.println("connect zk server success");  
    28.         } else {  
    29.             System.out.println("connect zk server error.");  
    30.             throw new Exception("connect zk server error.");  
    31.         }  
    32.     }  
    33.   
    34.     public void close() throws InterruptedException {  
    35.         if (zookeeper != null) {  
    36.             zookeeper.close();  
    37.         }  
    38.     }  
    39.   
    40.     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
    41.         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
    42.         path = ZKUtil.normalize(path);  
    43.         if (!this.exists(path)) {  
    44.             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
    45.         }  
    46.     }  
    47.   
    48.     public boolean exists(String path) throws Exception {  
    49.         path = ZKUtil.normalize(path);  
    50.         Stat stat = zookeeper.exists(path, null);  
    51.         return stat != null;  
    52.     }  
    53.   
    54.     public String getData(String path) throws Exception {  
    55.         path = ZKUtil.normalize(path);  
    56.         try {  
    57.             byte[] data = zookeeper.getData(path, null, null);  
    58.             return new String(data);  
    59.         } catch (KeeperException e) {  
    60.             if (e instanceof KeeperException.NoNodeException) {  
    61.                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
    62.             } else {  
    63.                 throw new Exception(e);  
    64.             }  
    65.         } catch (InterruptedException e) {  
    66.             Thread.currentThread().interrupt();  
    67.             throw new Exception(e);  
    68.         }  
    69.     }  
    70.   
    71.     @Override  
    72.     public void process(WatchedEvent event) {  
    73.         if (event == null) return;  
    74.   
    75.         // 连接状态  
    76.         Watcher.Event.KeeperState keeperState = event.getState();  
    77.         // 事件类型  
    78.         Watcher.Event.EventType eventType = event.getType();  
    79.         // 受影响的path  
    80. //        String path = event.getPath();  
    81.         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
    82.             // 成功连接上ZK服务器  
    83.             if (Watcher.Event.EventType.None == eventType) {  
    84.                 System.out.println("zookeeper connect success");  
    85.                 connectedSemaphore.countDown();  
    86.             }  
    87.         }  
    88.         //下面可以做一些重连的工作.  
    89.         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
    90.             System.out.println("zookeeper Disconnected");  
    91.         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
    92.             System.out.println("zookeeper AuthFailed");  
    93.         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
    94.             System.out.println("zookeeper Expired");  
    95.         }  
    96.     }  
    97. }  

     ZKUtil 针对zk路径的一个工具类

    Java代码  

    1. package zk.util;  
    2.   
    3. /** 
    4.  * User: zhenghui 
    5.  * Date: 14-3-26 
    6.  * Time: 下午9:56 
    7.  */  
    8. public class ZKUtil {  
    9.   
    10.     public static final String SEPARATOR = "/";  
    11.   
    12.     /** 
    13.      * 转换path为zk的标准路径 以/开头,最后不带/ 
    14.      */  
    15.     public static String normalize(String path) {  
    16.         String temp = path;  
    17.  
    18.         if(!path.startsWith(SEPARATOR)) {  
    19.             temp = SEPARATOR + path;  
    20.         }  
    21.         if(path.endsWith(SEPARATOR)) {  
    22.             temp = temp.substring(0, temp.length()-1);  
    23.             return normalize(temp);  
    24.         }else {  
    25.             return temp;  
    26.         }  
    27.     }  
    28.   
    29.     /** 
    30.      * 链接两个path,并转化为zk的标准路径 
    31.      */  
    32.     public static String contact(String path1,String path2){  
    33.         if(path2.startsWith(SEPARATOR)) {  
    34.             path2 = path2.substring(1);  
    35.         }  
    36.         if(path1.endsWith(SEPARATOR)) {  
    37.             return normalize(path1 + path2);  
    38.         } else {  
    39.             return normalize(path1 + SEPARATOR + path2);  
    40.         }  
    41.     }  
    42.   
    43.     /** 
    44.      * 字符串转化成byte类型 
    45.      */  
    46.     public static byte[] toBytes(String data) {  
    47.         if(data == null || data.trim().equals("")) return null;  
    48.         return data.getBytes();  
    49.     }  
    50. }  

     NetworkUtil 获取本机IP的工具方法

    Java代码  

    1. package zk.util;  
    2.   
    3. import java.net.InetAddress;  
    4. import java.net.NetworkInterface;  
    5. import java.util.Enumeration;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-4-1 
    10.  * Time: 下午4:47 
    11.  */  
    12. public class NetworkUtil {  
    13.   
    14.     static private final char COLON = ':';  
    15.   
    16.     /** 
    17.      * 获取当前机器ip地址 
    18.      * 据说多网卡的时候会有问题. 
    19.      */  
    20.     public static String getNetworkAddress() {  
    21.         Enumeration<NetworkInterface> netInterfaces;  
    22.         try {  
    23.             netInterfaces = NetworkInterface.getNetworkInterfaces();  
    24.             InetAddress ip;  
    25.             while (netInterfaces.hasMoreElements()) {  
    26.                 NetworkInterface ni = netInterfaces  
    27.                         .nextElement();  
    28.                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  
    29.                 while(addresses.hasMoreElements()){  
    30.                     ip = addresses.nextElement();  
    31.                     if (!ip.isLoopbackAddress()  
    32.                             && ip.getHostAddress().indexOf(COLON) == -1) {  
    33.                         return ip.getHostAddress();  
    34.                     }  
    35.                 }  
    36.             }  
    37.             return "";  
    38.         } catch (Exception e) {  
    39.             return "";  
    40.         }  
    41.     }  
    42. }  

     

    --------------------------- 正文开始  -----------------------------------

    这种实现非常简单,具体的流程如下



     对应的实现如下

    Java代码  

    1. package zk.lock;  
    2.   
    3.   
    4. import zk.util.NetworkUtil;  
    5. import zk.util.ZKUtil;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-3-26 
    10.  * Time: 下午8:37 
    11.  * 分布式锁实现. 
    12.  * 
    13.  * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得 
    14.  * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP 
    15.  */  
    16. public class DistributedLock01 {  
    17.   
    18.     private ZKClient zkClient;  
    19.   
    20.   
    21.     public static final String LOCK_ROOT = "/lock";  
    22.     private String lockName;  
    23.   
    24.   
    25.     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
    26.         //先创建zk链接.  
    27.         this.createConnection(connectString,sessionTimeout);  
    28.   
    29.         this.lockName = lockName;  
    30.     }  
    31.   
    32.     public boolean tryLock(){  
    33.         String path = ZKUtil.contact(LOCK_ROOT,lockName);  
    34.         String localIp = NetworkUtil.getNetworkAddress();  
    35.         try {  
    36.             if(zkClient.exists(path)){  
    37.                 String ownnerIp = zkClient.getData(path);  
    38.                 if(localIp.equals(ownnerIp)){  
    39.                     return true;  
    40.                 }  
    41.             } else {  
    42.                 zkClient.createPathIfAbsent(path,false);  
    43.                 if(zkClient.exists(path)){  
    44.                     String ownnerIp = zkClient.getData(path);  
    45.                     if(localIp.equals(ownnerIp)){  
    46.                         return true;  
    47.                     }  
    48.                 }  
    49.             }  
    50.         } catch (Exception e) {  
    51.             e.printStackTrace();  
    52.         }  
    53.         return false;  
    54.     }  
    55.   
    56.   
    57.     /** 
    58.      * 创建zk连接 
    59.      * 
    60.      */  
    61.     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
    62.         if(zkClient != null){  
    63.             releaseConnection();  
    64.         }  
    65.         zkClient = new ZKClient(connectString,sessionTimeout);  
    66.         zkClient.createPathIfAbsent(LOCK_ROOT,true);  
    67.     }  
    68.     /** 
    69.      * 关闭ZK连接 
    70.      */  
    71.     protected void releaseConnection() throws InterruptedException {  
    72.         if (zkClient != null) {  
    73.             zkClient.close();  
    74.         }  
    75.     }  
    76.   
    77. }  

     

    总结

    网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更

  • 核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 
    1.     项目核心代码结构截图

    分布式框架介绍 - kafkaee - kafkaee的博客

       项目模块依赖分布式框架介绍 - kafkaee - kafkaee的博客

    特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化

    2.    项目依赖介绍

       2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:
     

    分布式框架介绍 - kafkaee - kafkaee的博客

           2.2 Dubbo独立服务项目依赖如下图:

     分布式框架介绍 - kafkaee - kafkaee的博客

    3.  项目功能部分截图:

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客
     

    zookeeper、dubbo服务启动 

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客
     

    dubbo管控台 

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     REST服务平台

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

© 著作权归作者所有

粉丝 27
博文 53
码字总数 105771
作品 0
深圳
私信 提问
加载中

评论(1)

梵蒂冈考虑过 博主
zan
分布式锁与实现(二)基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包...

rechardchensir
2018/10/08
904
2
Redis实现分布式锁与Zookeeper实现分布式锁区别

# Redis实现分布式锁与Zookeeper实现分布式锁区别 **前言: 在学习过程中,简单的整理了一些redis跟zookeeper实现分布式锁的区别,有需要改正跟补充的地方,希望各位大佬及时指出 Redis实现分...

Java周某人
07/24
0
0
ZooKeeper 实现分布式锁

ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。 节点 ...

BeckJin
06/16
0
0
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
2018/05/08
153
0
基于redis和zookeeper的分布式锁实现方式

先来说说什么是分布式锁,简单来说,分布式锁就是在分布式并发场景中,能够实现多节点的代码同步的一种机制。从实现角度来看,主要有两种方式:基于redis的方式和基于zookeeper的方式,下面分...

技术小能手
2018/06/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nginx学习笔记

中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。 是连接两个独立应用程序或独立系统的软件。 web请求通过中间件可以直接调用操作系统,也可以经过中间件把请求分发到多...

码农实战
今天
5
0
Spring Security 实战干货:玩转自定义登录

1. 前言 前面的关于 Spring Security 相关的文章只是一个预热。为了接下来更好的实战,如果你错过了请从 Spring Security 实战系列 开始。安全访问的第一步就是认证(Authentication),认证...

码农小胖哥
今天
12
0
JAVA 实现雪花算法生成唯一订单号工具类

import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.util.Calendar;/** * Default distributed primary key generator. * * <p> * Use snowflake......

huangkejie
昨天
12
0
PhotoShop 色调:RGB/CMYK 颜色模式

一·、 RGB : 三原色:红绿蓝 1.通道:通道中的红绿蓝通道分别对应的是红绿蓝三种原色(RGB)的显示范围 1.差值模式能模拟三种原色叠加之后的效果 2.添加-颜色曲线:调整图像RGB颜色----R色增强...

东方墨天
昨天
11
1
将博客搬至CSDN

将博客搬至CSDN

算法与编程之美
昨天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部