基于zookeeper的分布式锁实现

原创
2016/08/11 18:40
阅读数 122
  • 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

     

    准备工作

    有几个帮助类,先把代码放上来

    ZKClient 对zk的操作做了一个简单的封装

     

    Java代码  

    1. package zk.lock;  
    2.   
    3. import org.apache.zookeeper.*;  
    4. import org.apache.zookeeper.data.Stat;  
    5. import zk.util.ZKUtil;  
    6.   
    7. import java.util.concurrent.CountDownLatch;  
    8. import java.util.concurrent.TimeUnit;  
    9.   
    10. /** 
    11.  * User: zhenghui 
    12.  * Date: 14-3-26 
    13.  * Time: 下午8:50 
    14.  * 封装一个zookeeper实例. 
    15.  */  
    16. public class ZKClient implements Watcher {  
    17.  
    18.   
    19.     private ZooKeeper zookeeper;  
    20.   
    21.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
    22.   
    23.   
    24.     public ZKClient(String connectString, int sessionTimeout) throws Exception {  
    25.         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
    26.         System.out.println("connecting zk server");  
    27.         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
    28.             System.out.println("connect zk server success");  
    29.         } else {  
    30.             System.out.println("connect zk server error.");  
    31.             throw new Exception("connect zk server error.");  
    32.         }  
    33.     }  
    34.   
    35.     public void close() throws InterruptedException {  
    36.         if (zookeeper != null) {  
    37.             zookeeper.close();  
    38.         }  
    39.     }  
    40.   
    41.     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
    42.         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
    43.         path = ZKUtil.normalize(path);  
    44.         if (!this.exists(path)) {  
    45.             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
    46.         }  
    47.     }  
    48.   
    49.     public boolean exists(String path) throws Exception {  
    50.         path = ZKUtil.normalize(path);  
    51.         Stat stat = zookeeper.exists(path, null);  
    52.         return stat != null;  
    53.     }  
    54.   
    55.     public String getData(String path) throws Exception {  
    56.         path = ZKUtil.normalize(path);  
    57.         try {  
    58.             byte[] data = zookeeper.getData(path, null, null);  
    59.             return new String(data);  
    60.         } catch (KeeperException e) {  
    61.             if (e instanceof KeeperException.NoNodeException) {  
    62.                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
    63.             } else {  
    64.                 throw new Exception(e);  
    65.             }  
    66.         } catch (InterruptedException e) {  
    67.             Thread.currentThread().interrupt();  
    68.             throw new Exception(e);  
    69.         }  
    70.     }  
    71.   
    72.     @Override  
    73.     public void process(WatchedEvent event) {  
    74.         if (event == null) return;  
    75.   
    76.         // 连接状态  
    77.         Watcher.Event.KeeperState keeperState = event.getState();  
    78.         // 事件类型  
    79.         Watcher.Event.EventType eventType = event.getType();  
    80.         // 受影响的path  
    81. //        String path = event.getPath();  
    82.         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
    83.             // 成功连接上ZK服务器  
    84.             if (Watcher.Event.EventType.None == eventType) {  
    85.                 System.out.println("zookeeper connect success");  
    86.                 connectedSemaphore.countDown();  
    87.             }  
    88.         }  
    89.         //下面可以做一些重连的工作.  
    90.         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
    91.             System.out.println("zookeeper Disconnected");  
    92.         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
    93.             System.out.println("zookeeper AuthFailed");  
    94.         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
    95.             System.out.println("zookeeper Expired");  
    96.         }  
    97.     }  
    98. }  

     ZKUtil 针对zk路径的一个工具类

    Java代码  

    1. package zk.util;  
    2.   
    3. /** 
    4.  * User: zhenghui 
    5.  * Date: 14-3-26 
    6.  * Time: 下午9:56 
    7.  */  
    8. public class ZKUtil {  
    9.   
    10.     public static final String SEPARATOR = "/";  
    11.   
    12.     /** 
    13.      * 转换path为zk的标准路径 以/开头,最后不带/ 
    14.      */  
    15.     public static String normalize(String path) {  
    16.         String temp = path;  
    17.         if(!path.startsWith(SEPARATOR)) {  
    18.             temp = SEPARATOR + path;  
    19.         }  
    20.         if(path.endsWith(SEPARATOR)) {  
    21.             temp = temp.substring(0, temp.length()-1);  
    22.             return normalize(temp);  
    23.         }else {  
    24.             return temp;  
    25.         }  
    26.     }  
    27.   
    28.     /** 
    29.      * 链接两个path,并转化为zk的标准路径 
    30.      */  
    31.     public static String contact(String path1,String path2){  
    32.         if(path2.startsWith(SEPARATOR)) {  
    33.             path2 = path2.substring(1);  
    34.         }  
    35.         if(path1.endsWith(SEPARATOR)) {  
    36.             return normalize(path1 + path2);  
    37.         } else {  
    38.             return normalize(path1 + SEPARATOR + path2);  
    39.         }  
    40.     }  
    41.   
    42.     /** 
    43.      * 字符串转化成byte类型 
    44.      */  
    45.     public static byte[] toBytes(String data) {  
    46.         if(data == null || data.trim().equals("")) return null;  
    47.         return data.getBytes();  
    48.     }  
    49. }  

     NetworkUtil 获取本机IP的工具方法

    Java代码  

    1. package zk.util;  
    2.   
    3. import java.net.InetAddress;  
    4. import java.net.NetworkInterface;  
    5. import java.util.Enumeration;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-4-1 
    10.  * Time: 下午4:47 
    11.  */  
    12. public class NetworkUtil {  
    13.   
    14.     static private final char COLON = ':';  
    15.   
    16.     /** 
    17.      * 获取当前机器ip地址 
    18.      * 据说多网卡的时候会有问题. 
    19.      */  
    20.     public static String getNetworkAddress() {  
    21.         Enumeration<NetworkInterface> netInterfaces;  
    22.         try {  
    23.             netInterfaces = NetworkInterface.getNetworkInterfaces();  
    24.             InetAddress ip;  
    25.             while (netInterfaces.hasMoreElements()) {  
    26.                 NetworkInterface ni = netInterfaces  
    27.                         .nextElement();  
    28.                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  
    29.                 while(addresses.hasMoreElements()){  
    30.                     ip = addresses.nextElement();  
    31.                     if (!ip.isLoopbackAddress()  
    32.                             && ip.getHostAddress().indexOf(COLON) == -1) {  
    33.                         return ip.getHostAddress();  
    34.                     }  
    35.                 }  
    36.             }  
    37.             return "";  
    38.         } catch (Exception e) {  
    39.             return "";  
    40.         }  
    41.     }  
    42. }  

     

    --------------------------- 正文开始  -----------------------------------

    这种实现非常简单,具体的流程如下



     对应的实现如下

    Java代码  

    1. package zk.lock;  
    2.   
    3.   
    4. import zk.util.NetworkUtil;  
    5. import zk.util.ZKUtil;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-3-26 
    10.  * Time: 下午8:37 
    11.  * 分布式锁实现. 
    12.  * 
    13.  * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得 
    14.  * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP 
    15.  */  
    16. public class DistributedLock01 {  
    17.   
    18.     private ZKClient zkClient;  
    19.   
    20.   
    21.     public static final String LOCK_ROOT = "/lock";  
    22.     private String lockName;  
    23.   
    24.   
    25.     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
    26.         //先创建zk链接.  
    27.         this.createConnection(connectString,sessionTimeout);  
    28.   
    29.         this.lockName = lockName;  
    30.     }  
    31.   
    32.     public boolean tryLock(){  
    33.         String path = ZKUtil.contact(LOCK_ROOT,lockName);  
    34.         String localIp = NetworkUtil.getNetworkAddress();  
    35.         try {  
    36.             if(zkClient.exists(path)){  
    37.                 String ownnerIp = zkClient.getData(path);  
    38.                 if(localIp.equals(ownnerIp)){  
    39.                     return true;  
    40.                 }  
    41.             } else {  
    42.                 zkClient.createPathIfAbsent(path,false);  
    43.                 if(zkClient.exists(path)){  
    44.                     String ownnerIp = zkClient.getData(path);  
    45.                     if(localIp.equals(ownnerIp)){  
    46.                         return true;  
    47.                     }  
    48.                 }  
    49.             }  
    50.         } catch (Exception e) {  
    51.             e.printStackTrace();  
    52.         }  
    53.         return false;  
    54.     }  
    55.   
    56.   
    57.     /** 
    58.      * 创建zk连接 
    59.      * 
    60.      */  
    61.     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
    62.         if(zkClient != null){  
    63.             releaseConnection();  
    64.         }  
    65.         zkClient = new ZKClient(connectString,sessionTimeout);  
    66.         zkClient.createPathIfAbsent(LOCK_ROOT,true);  
    67.     }  
    68.     /** 
    69.      * 关闭ZK连接 
    70.      */  
    71.     protected void releaseConnection() throws InterruptedException {  
    72.         if (zkClient != null) {  
    73.             zkClient.close();  
    74.         }  
    75.     }  
    76.   
    77. }  

     

    总结

    网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更

  • 核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 
    1.     项目核心代码结构截图

    分布式框架介绍 - kafkaee - kafkaee的博客

       项目模块依赖分布式框架介绍 - kafkaee - kafkaee的博客

    特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化

    2.    项目依赖介绍

       2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:
     

    分布式框架介绍 - kafkaee - kafkaee的博客

           2.2 Dubbo独立服务项目依赖如下图:

     分布式框架介绍 - kafkaee - kafkaee的博客

    3.  项目功能部分截图:

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客
     

    zookeeper、dubbo服务启动 

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客
     

    dubbo管控台 

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     REST服务平台

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

     

    分布式框架介绍 - kafkaee - kafkaee的博客

展开阅读全文
加载中
打赏
1 评论
8 收藏
1
分享
返回顶部
顶部