文档章节

【转】分布式锁实现(二):Zookeeper

talen
 talen
发布于 08/15 14:48
字数 2038
阅读 0
收藏 0

设计实现

一、基本算法

1.在某父节点下创建临时有序节点
2.判断创建的节点是否是当前父节点下所有子节点中序号最小的
3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁
4.解锁的时候删除当前节点

二、关键点

临时有序节点

实现Zookeeper分布式锁关键就在于其[临时有序节点]的特性,在Zookeeper中有四种节点
1.PERSISTENT 持久,若不手动删除就永久存在
2.PERSISTENT_SEQUENTIAL 持久有序节点,zookeeper会为节点编号(保证有序)
3.EPHEMERAL 临时,一个客户端会话断开后会自动删除
4.EPHEMERAL_SEQUENTIAL 临时有序节点,zookeeper会为节点编号(保证有序)

监听

Zookeeper提供事件监听机制,通过对节点、节点数据、子节点都提供了监听,我们通过这种监听watcher机制实现锁的等待

三、代码实现

我们基于ZkClient这个客户端来实现,当然也可以用原生Zookeeper API,大致是一样的
坐标如下:
  <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.2</version>
    </dependency>

代码如下:

public class MyDistributedLock {


    private ZkClient zkClient;
    private String name;
    private String currentLockPath;
    private CountDownLatch countDownLatch;

    private static final String PARENT_LOCK_PATH = "/distribute_lock";

    public MyDistributedLock(ZkClient zkClient, String name) {
        this.zkClient = zkClient;
        this.name = name;
    }

	//加锁
    public void lock() {
    	//判断父节点是否存在,不存在就创建
        if (!zkClient.exists(PARENT_LOCK_PATH)) {
            try {
            	//多个线程只会成功建立一次
                zkClient.createPersistent(PARENT_LOCK_PATH);
            } catch (Exception ignored) {
            }
        }
        //创建当前目录下的临时有序节点
        currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());
        //校验是否最小节点
        checkMinNode(currentLockPath);
    }

	//解锁
    public void unlock() {
        System.out.println("delete : " + currentLockPath);
        zkClient.delete(currentLockPath);
    }


    private boolean checkMinNode(String lockPath) {
		//获取当前目录下所有子节点
        List<String> children = zkClient.getChildren(PARENT_LOCK_PATH);
        Collections.sort(children);
        int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1));
        if (index == 0) {
            System.out.println(name + ":success");
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
            return true;
        } else {
            String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1);
            //等待前一个节点释放的监听
            waitForLock(waitPath);
            return false;
        }
    }


    private void waitForLock(String prev) {
        System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev);
        countDownLatch = new CountDownLatch(1);
        zkClient.subscribeDataChanges(prev, new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("prev node is done");
                checkMinNode(currentLockPath);
            }
        });
        if (!zkClient.exists(prev)) {
            return;
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch = null;
    }
}

加锁

  1. zkClient.exists先判断父节点是否存在,不存在就创建,zookeeper可以保证只会创建成功一次

  2. 在当前目录下zkClient.createEphemeralSequential创建临时有序节点,再判断当前目录下此节点是否为序号最小的,如果是,成功获取锁,否则的话拿比自己小的节点,并做监听

  3. waitForLock等待比自己小的节点,subscribeDataChanges监听一个节点的变化,handleDataDeleted里面再次做checkMinNode的判断

  4. 监听完毕后,再判断一次此节点是否存在,因为在监听的过程中有可能之前小的那个节点重新释放了锁,如果之前节点不存在的话,无需在这里等待,这里的等待是通过countDownLatch实现的

解锁

解锁就是通过zkClient的delete删除当前节点

测试用例

通过启动多个线程来测试lock、unlock的过程,查看是否有序

public class MyDistributedLockTest {


    public static void main(String[] args) {

        ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000);

        for (int i = 0; i < 20; i++) {

            String name = "thread" + i;
            Thread thread = new Thread(() -> {
                MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name);
                myDistributedLock.lock();
//                try {
//                    Thread.sleep(1 * 1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                myDistributedLock.unlock();
            });
            thread.start();
        }

    }
}

执行结果如下,多线程情况下lock/unlock和监听一切正常:

thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006
thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005
thread3:success
delete : /distribute_lock2/0000000000
thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004
thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003
thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008
thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007
thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000
thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001
thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002
delete : /distribute_lock2/0000000001
prev node is done
thread8:success
delete : /distribute_lock2/0000000002
prev node is done
thread4:success
delete : /distribute_lock2/0000000003
prev node is done
thread7:success
delete : /distribute_lock2/0000000004
prev node is done
thread2:success
delete : /distribute_lock2/0000000005
prev node is done
thread6:success
delete : /distribute_lock2/0000000006
prev node is done
thread1:success
delete : /distribute_lock2/0000000007
prev node is done
thread5:success
delete : /distribute_lock2/0000000008
prev node is done
thread9:success
delete : /distribute_lock2/0000000009

Curator源码分析

一、基本使用

 		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
        client.start();
        InterProcessMutex lock2 = new InterProcessMutex(client, "/test");

        try {
            lock.acquire();
            //业务
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.release();
        }
  1. CuratorFrameworkFactory.newClient获取zookeeper的客户端,retryPolicy指定重试策略,开启客户端

  2. Curator本身提供了多种锁的实现,这里我们以InterProcessMutex可重入锁为例, lock.acquire()方法获取锁,lock.release()来释放锁,acquire方法也提供了重载的等待时间参数

二、源码分析

加锁

acquire内部就直接internalLock方法,传了-1的等待时间

 public void acquire() throws Exception {
        if(!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }

internalLock方法首先判断是否是重入锁,通过ConcurrentMap维护线程和一个原子计数器,非重入锁的话,再通过attemptLock去获取锁

 private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();

        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

attemptLock在这里进行循环等待,createsTheLock方法去创建节点,internalLockLoop去判断当前节点是否是最小节点

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

createsTheLock就是调用curator封装的api去创建临时有序节点

   public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

internalLockLoop锁判断,内部就是driver.getsTheLock去判断是否是当前目录下最小节点,如果是的话,返回获取锁成功,否则的话对previousSequencePath进行监听,监听动作完成后再对等待时间进行重新判断

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

解锁

release代码相对来说比较简单,就是先判断map里面是否存在当前线程的锁计数,不存在抛出异常,存在的话,进行原子减一操作,releaseLock内部就是删除节点操作,小于0的时候,从map里面移除

  public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

后记

分布式锁的实现目前主流比较常用的实现就是Redis和Zookeeper了,相比较自己的实现,Redission和Curator的设计实现更为优秀,也更值得我们借鉴和学习

千里之行,积于跬步;万里之船,成于罗盘,共勉。

本文转载自:https://my.oschina.net/u/3057247/blog/1928125

共有 人打赏支持
talen
粉丝 0
博文 27
码字总数 8624
作品 0
深圳
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
05/08
0
0
ZooKeeper可以用来做什么(转)

在ZooKeeper的官网上有这么一句话:ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing grou......

easonjim
2017/09/05
0
0
大数据教程(3.3):zookeeper简介

一、概念 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功...

em_aaron
07/28
0
0
ZooKeeper的应用场景(转)

应用场景1 :统一命名服务 分布式应用中,通常需要一套完备的命令机制,既能产生唯一的标识,又方便人识别和记忆。 我们知道,每个ZNode都可以由其路径唯一标识,路径本身也比较简洁直观,另...

easonjim
2017/09/05
0
0
docker zookeeper 集群搭建

前言 其实zookeeper在一般的公司的应用不太多,注册中心、分布式锁、选举等等。之前用dubbo,就会用到zookeeper作为服务注册中心,现在转向springcloud,就很少用zookeeper了。但是近期研究分...

田心双木
09/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

12-利用思维导图梳理JavaSE-

12-利用思维导图梳理JavaSE- 主要内容 1.线程概念 2.线程开发 3.线程的状态 4.线程的同步和死锁 5.Java5.0并发库类 QQ/知识星球/个人WeChat/公众号二维码 本文为原创文章,如果对你有一点点的...

飞鱼说编程
5分钟前
0
0
JAVA集合之ArrayList

一、前言 Java 集合类提供了一套设计良好的支持对一组对象进行操作的接口和类,JAVA常用的集合接口有4类,分别是: Collection:代表一组对象,每一个对象都是它的子元素 Set:不包含重复元素...

木木匠
25分钟前
0
0
转:XMLHttpRequest2 新技巧

”XMLHttpRequest 的异步调用网上找的例子运行没问题,但稍微改了一点点就报错”InvalidStateError: XMLHttpRequest has an invalid context“。断断续续 搞了3天终于通了,可以接收二进制文...

SamXIAO
44分钟前
2
0
=====D服务器定时任务=====

Linux定时任务 crontab linux系统是有cron这个系统服务来控制的,Liunx系统上包含很多的计划性工作,使用者自己可以设置计划任务,所以linux系统提供了使用者控制计划任务的命令 crontab的启...

覃光林
53分钟前
1
0
xilinx资源

本系列教学视频由赛灵思高级战略应用工程师带领你:从零开始,一步步深入 掌握 HLS 以及 UltraFAST 设计方法,帮助您成为系统设计和算法加速的大拿! http://www.eetrend.com/topics/2018-0...

whoisliang
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部