基于zookeeper的分布式锁实现

原创
2016/08/07 14:24
阅读数 73

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

 

准备工作

有几个帮助类,先把代码放上来

ZKClient 对zk的操作做了一个简单的封装

 

Java代码  收藏代码

  1. package zk.lock;  
  2.   
  3. import org.apache.zookeeper.*;  
  4. import org.apache.zookeeper.data.Stat;  
  5. import zk.util.ZKUtil;  
  6.   
  7. import java.util.concurrent.CountDownLatch;  
  8. import java.util.concurrent.TimeUnit;  
  9.   
  10. /** 
  11.  * User: zhenghui 
  12.  * Date: 14-3-26 
  13.  * Time: 下午8:50 
  14.  * 封装一个zookeeper实例. 
  15.  */  
  16. public class ZKClient implements Watcher {  
  17.   
  18.     private ZooKeeper zookeeper;  
  19.   
  20.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
  21.   
  22.   
  23.     public ZKClient(String connectString, int sessionTimeout) throws Exception {  
  24.         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
  25.         System.out.println("connecting zk server");  
  26.         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
  27.             System.out.println("connect zk server success");  
  28.         } else {  
  29.             System.out.println("connect zk server error.");  
  30.             throw new Exception("connect zk server error.");  
  31.         }  
  32.     }  
  33.   
  34.     public void close() throws InterruptedException {  
  35.         if (zookeeper != null) {  
  36.             zookeeper.close();  
  37.         }  
  38.     }  
  39.   
  40.     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
  41.         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
  42.         path = ZKUtil.normalize(path);  
  43.         if (!this.exists(path)) {  
  44.             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
  45.         }  
  46.     }  
  47.   
  48.     public boolean exists(String path) throws Exception {  
  49.         path = ZKUtil.normalize(path);  
  50.         Stat stat = zookeeper.exists(path, null);  
  51.         return stat != null;  
  52.     }  
  53.   
  54.     public String getData(String path) throws Exception {  
  55.         path = ZKUtil.normalize(path);  
  56.         try {  
  57.             byte[] data = zookeeper.getData(path, null, null);  
  58.             return new String(data);  
  59.         } catch (KeeperException e) {  
  60.             if (e instanceof KeeperException.NoNodeException) {  
  61.                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
  62.             } else {  
  63.                 throw new Exception(e);  
  64.             }  
  65.         } catch (InterruptedException e) {  
  66.             Thread.currentThread().interrupt();  
  67.             throw new Exception(e);  
  68.         }  
  69.     }  
  70.   
  71.     @Override  
  72.     public void process(WatchedEvent event) {  
  73.         if (event == null) return;  
  74.   
  75.         // 连接状态  
  76.         Watcher.Event.KeeperState keeperState = event.getState();  
  77.         // 事件类型  
  78.         Watcher.Event.EventType eventType = event.getType();  
  79.         // 受影响的path  
  80. //        String path = event.getPath();  
  81.         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
  82.             // 成功连接上ZK服务器  
  83.             if (Watcher.Event.EventType.None == eventType) {  
  84.                 System.out.println("zookeeper connect success");  
  85.                 connectedSemaphore.countDown();  
  86.             }  
  87.         }  
  88.         //下面可以做一些重连的工作.  
  89.         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
  90.             System.out.println("zookeeper Disconnected");  
  91.         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
  92.             System.out.println("zookeeper AuthFailed");  
  93.         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
  94.             System.out.println("zookeeper Expired");  
  95.         }  
  96.     }  
  97. }  

 ZKUtil 针对zk路径的一个工具类

Java代码  收藏代码

  1. package zk.util;  
  2.   
  3. /** 
  4.  * User: zhenghui 
  5.  * Date: 14-3-26 
  6.  * Time: 下午9:56 
  7.  */  
  8. public class ZKUtil {  
  9.   
  10.     public static final String SEPARATOR = "/";  
  11.   
  12.     /** 
  13.      * 转换path为zk的标准路径 以/开头,最后不带/ 
  14.      */  
  15.     public static String normalize(String path) {  
  16.         String temp = path;  
  17.         if(!path.startsWith(SEPARATOR)) {  
  18.             temp = SEPARATOR + path;  
  19.         }  
  20.         if(path.endsWith(SEPARATOR)) {  
  21.             temp = temp.substring(0, temp.length()-1);  
  22.             return normalize(temp);  
  23.         }else {  
  24.             return temp;  
  25.         }  
  26.     }  
  27.   
  28.     /** 
  29.      * 链接两个path,并转化为zk的标准路径 
  30.      */  
  31.     public static String contact(String path1,String path2){  
  32.         if(path2.startsWith(SEPARATOR)) {  
  33.             path2 = path2.substring(1);  
  34.         }  
  35.         if(path1.endsWith(SEPARATOR)) {  
  36.             return normalize(path1 + path2);  
  37.         } else {  
  38.             return normalize(path1 + SEPARATOR + path2);  
  39.         }  
  40.     }  
  41.   
  42.     /** 
  43.      * 字符串转化成byte类型 
  44.      */  
  45.     public static byte[] toBytes(String data) {  
  46.         if(data == null || data.trim().equals("")) return null;  
  47.         return data.getBytes();  
  48.     }  
  49. }  

 NetworkUtil 获取本机IP的工具方法

Java代码  收藏代码

  1. package zk.util;  
  2.   
  3. import java.net.InetAddress;  
  4. import java.net.NetworkInterface;  
  5. import java.util.Enumeration;  
  6.   
  7. /** 
  8.  * User: zhenghui 
  9.  * Date: 14-4-1 
  10.  * Time: 下午4:47 
  11.  */  
  12. public class NetworkUtil {  
  13.   
  14.     static private final char COLON = ':';  
  15.   
  16.     /** 
  17.      * 获取当前机器ip地址 
  18.      * 据说多网卡的时候会有问题. 
  19.      */  
  20.     public static String getNetworkAddress() {  
  21.         Enumeration<NetworkInterface> netInterfaces;  
  22.         try {  
  23.             netInterfaces = NetworkInterface.getNetworkInterfaces();  
  24.             InetAddress ip;  
  25.             while (netInterfaces.hasMoreElements()) {  
  26.                 NetworkInterface ni = netInterfaces  
  27.                         .nextElement();  
  28.                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  
  29.                 while(addresses.hasMoreElements()){  
  30.                     ip = addresses.nextElement();  
  31.                     if (!ip.isLoopbackAddress()  
  32.                             && ip.getHostAddress().indexOf(COLON) == -1) {  
  33.                         return ip.getHostAddress();  
  34.                     }  
  35.                 }  
  36.             }  
  37.             return "";  
  38.         } catch (Exception e) {  
  39.             return "";  
  40.         }  
  41.     }  
  42. }  

 

--------------------------- 正文开始  -----------------------------------

这种实现非常简单,具体的流程如下



 对应的实现如下

Java代码  收藏代码

  1. package zk.lock;  
  2.   
  3.   
  4. import zk.util.NetworkUtil;  
  5. import zk.util.ZKUtil;  
  6.   
  7. /** 
  8.  * User: zhenghui 
  9.  * Date: 14-3-26 
  10.  * Time: 下午8:37 
  11.  * 分布式锁实现. 
  12.  * 
  13.  * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得 
  14.  * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP 
  15.  */  
  16. public class DistributedLock01 {  
  17.   
  18.     private ZKClient zkClient;  
  19.   
  20.   
  21.     public static final String LOCK_ROOT = "/lock";  
  22.     private String lockName;  
  23.   
  24.   
  25.     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
  26.         //先创建zk链接.  
  27.         this.createConnection(connectString,sessionTimeout);  
  28.   
  29.         this.lockName = lockName;  
  30.     }  
  31.   
  32.     public boolean tryLock(){  
  33.         String path = ZKUtil.contact(LOCK_ROOT,lockName);  
  34.         String localIp = NetworkUtil.getNetworkAddress();  
  35.         try {  
  36.             if(zkClient.exists(path)){  
  37.                 String ownnerIp = zkClient.getData(path);  
  38.                 if(localIp.equals(ownnerIp)){  
  39.                     return true;  
  40.                 }  
  41.             } else {  
  42.                 zkClient.createPathIfAbsent(path,false);  
  43.                 if(zkClient.exists(path)){  
  44.                     String ownnerIp = zkClient.getData(path);  
  45.                     if(localIp.equals(ownnerIp)){  
  46.                         return true;  
  47.                     }  
  48.                 }  
  49.             }  
  50.         } catch (Exception e) {  
  51.             e.printStackTrace();  
  52.         }  
  53.         return false;  
  54.     }  
  55.   
  56.   
  57.     /** 
  58.      * 创建zk连接 
  59.      * 
  60.      */  
  61.     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
  62.         if(zkClient != null){  
  63.             releaseConnection();  
  64.         }  
  65.         zkClient = new ZKClient(connectString,sessionTimeout);  
  66.         zkClient.createPathIfAbsent(LOCK_ROOT,true);  
  67.     }  
  68.     /** 
  69.      * 关闭ZK连接 
  70.      */  
  71.     protected void releaseConnection() throws InterruptedException {  
  72.         if (zkClient != null) {  
  73.             zkClient.close();  
  74.         }  
  75.     }  
  76.   
  77. }  

 

总结

网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加

前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更

框架简介:

本系统一款通用的SOA中间件平台,用来开发各类J2EE企业级应用,节省时间和人力成本。本系统采用MVC模式、AOP引擎、任务调度器、工作流、Ajax、拦截器、过滤器、缓存、日志监控、数据访问、表达式、国际化等技术。

用户权限系统:
组织结构:角色、用户、用户组、组织机构;权限点:页面、方法、按钮、数据权限、分级授权

项目管理新体验:
快速出原型系统、组件树、版本控制、模块移植、协同开发、实时监控、发布管理

可持续集成:
所有组件可移植、可定制、可扩充,开发成果不断积累,形成可持续发展的良性循环

框架/平台构成:
Springmvc + Mybatis + Shiro(权限)+SSO(单点登录) + Tiles(模板) +ActiveMQ(消息队列) + Rest(服务) + WebService(服务)+ EHcache(缓存) + Lucene(搜索引擎) + Quartz(定时调度)+ Html5(支持PC、IOS、Android)

支持平台平台: 
Windows XP、Windows 7 、Windows 10 、 Linux 、 Unix

服务器容器:
Tomcat 5/6/7 、Jetty、JBoss、WebSphere 8.5

项目源码结构截图:

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

项目运行截图:





JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台







JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台





JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台
JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台
JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台









JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台



JEESZ个人版 Maven+SpringMVC+Mybatis+shiro+restful开发平台

 

展开阅读全文
加载中
打赏
1 评论
0 收藏
0
分享
返回顶部
顶部