文档章节

zookeeper 分布式锁

果子核
 果子核
发布于 2017/06/05 09:54
字数 558
阅读 15
收藏 0

     使用锁存在一个问题,在有限时间内释放,也可能被其他等待的线程获取锁,所以在锁内部仍然需要通过查询来判断是否已经被更新过;

获取锁和根据锁的获取结果回调函数

public class ZkDistributedLockTemplate implements DistributedLockTemplate {
    private static final org.slf4j.Logger log = LoggerFactory.getLogger(ZkDistributedLockTemplate.class);

    private CuratorFramework client;


    public ZkDistributedLockTemplate(CuratorFramework client) {
        this.client = client;
    }



    private boolean tryLock(ZkReentrantLock distributedReentrantLock,Long timeout) throws Exception {
        return distributedReentrantLock.tryLock(timeout, TimeUnit.MILLISECONDS);
    }

    public Object execute(String lockId, int timeout, Callback callback) {
        ZkReentrantLock distributedReentrantLock = null;
        boolean getLock=false;
        try {
            distributedReentrantLock = new ZkReentrantLock(client,lockId);
            if(tryLock(distributedReentrantLock,new Long(timeout))){
                getLock=true;
                return callback.onGetLock();
            }else{
                return callback.onTimeout();
            }
        }catch(InterruptedException ex){
            log.error(ex.getMessage(), ex);
            Thread.currentThread().interrupt();
        }catch (Exception e) {
            log.error(e.getMessage(), e);
        }finally {
            if(getLock){
                distributedReentrantLock.unlock();
            }
        }
        return null;
    }
}

 

分布式锁的核心实现:

public class ZkReentrantLock implements DistributedReentrantLock {
    private static final org.slf4j.Logger log = LoggerFactory.getLogger(ZkReentrantLock.class);

    /**
     * 线程池
     */
    private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);

    /**
     * 所有PERSISTENT锁节点的根位置
     */
    public static final String ROOT_PATH = "/ROOT_LOCK/";

    /**
     * 每次延迟清理PERSISTENT节点的时间  Unit:MILLISECONDS
     */
    private long delayTimeForClean = 1000;

    /**
     * zk 共享锁实现
     */
    private InterProcessMutex interProcessMutex = null;


    /**
     * 锁的ID,对应zk一个PERSISTENT节点,下挂EPHEMERAL节点.
     */
    private String path;


    /**
     * zk的客户端
     */
    private CuratorFramework client;



    public ZkReentrantLock(CuratorFramework client, String lockId) {
        init(client, lockId);
    }

    public void init(CuratorFramework client, String lockId) {
        this.client = client;
        this.path = ROOT_PATH + lockId;
        interProcessMutex = new InterProcessMutex(client, this.path);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            return interProcessMutex.acquire(timeout, unit);
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e) {
            log.error(e.getMessage(),e);
            throw new RuntimeException(e.getMessage(),e);
        }
    }

    @Override
    public void unlock() {
        try {
            interProcessMutex.release();
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        } finally {
            executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
        }
    }

    static class Cleaner implements Runnable {
        CuratorFramework client;
        String path;

        public Cleaner(CuratorFramework client, String path) {
            this.client = client;
            this.path = path;
        }

        public void run() {
            try {
                List list = client.getChildren().forPath(path);
                if (list == null || list.isEmpty()) {
                    client.delete().forPath(path);
                }
            } catch (KeeperException.NoNodeException e1) {
                //nothing
            } catch (KeeperException.NotEmptyException e2) {
                //nothing
            } catch (Exception e) {
                log.error(e.getMessage(), e);//准备删除时,正好有线程创建锁
            }
        }
    }


    private boolean isEmpty(List<String> list){
        return list==null||list.isEmpty();
    }
}

测试代码:

@Test
public void testTry() throws InterruptedException {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.2.6:2181",
        retryPolicy);
    client.start();

    final ZkDistributedLockTemplate template=new ZkDistributedLockTemplate(client);
    int size=100;
    final CountDownLatch startCountDownLatch = new CountDownLatch(1);
    final CountDownLatch endDownLatch=new CountDownLatch(size);
    for (int i =0;i<size;i++){
        new Thread() {
            public void run() {
                try {
                    startCountDownLatch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                final int sleepTime=ThreadLocalRandom.current().nextInt(5)*1000;
                template.execute("locktest",5000, new Callback() {
                    public Object onGetLock() throws InterruptedException {
                        System.out.println(Thread.currentThread().getName() + ":getLock");
                        Thread.currentThread().sleep(sleepTime);
                        System.out.println(Thread.currentThread().getName() + ":sleeped:"+sleepTime);
                        endDownLatch.countDown();
                        return null;
                    }
                    public Object onTimeout() throws InterruptedException {
                        System.out.println(Thread.currentThread().getName() + ":timeout");
                        endDownLatch.countDown();
                        return null;
                    }
                });
            }
        }.start();
    }
    startCountDownLatch.countDown();
    endDownLatch.await();
}

 

预览结果,当锁释放后,线程信息全部置空,locktest变回叶子节点

 

具体参考:

https://github.com/yujiasun/Distributed-Kit

© 著作权归作者所有

共有 人打赏支持
果子核
粉丝 2
博文 39
码字总数 10055
作品 0
海淀
程序员
私信 提问
大数据教程(3.3):zookeeper简介

一、概念 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功...

em_aaron
07/28
0
0
分布式锁与实现(二)基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包...

rechardchensir
10/08
0
0
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
05/08
0
0
Zookeeper 概述及应用场景

一、概述 分布式协调技术,主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成"脏数据"的后果。 ZooKeeper是一个分布式的,开放源码的分布式应用...

PeakFang-BOK
10/20
0
0
分布式设计与开发(三)------高一致性服务ZooKeeper

分布式环境中大多数服务是允许部分失败,也允许数据不一致,但有些最基础的服务是需要高可靠性,高一致性的,这些服务是其他分布式服务运转的基础,比如naming service、分布式lock等,这些分...

山哥
2012/03/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nuc970 uboot nand-boot,kernel, filesystem 烧录位置

一 烧写到Nand Flash **1.1 **相关文件说明 l BSP版本:nuc970bsp-release-20150519.zip l NuWriter版本:2015/04/28-V01,nuvoTon Nu-Writer V1.0 l 烧写文件: u-boot-spl.bin:负责将u-b......

CookieDemo
今天
1
0
python中sort和sorted函数小结

L.sort(cmp=None, key=None, reverse=False) sorted(iterable, cmp=None, key=None, reverse=False) 这样看,sorted函数只比sort函数多一个iterable参数,其余没什么不同,iterable是一个迭代......

上官夏洛特
今天
4
0
thinkphp 常用SQL执行语句总结

第一条:Db::tablera('vr_panomas')->where(['delete_time'=>0,'id'=>['in',$pids]])->field(['id'=>'id','post_thumb'=>'thumb','post_title'=>'title','post_tags'=>'tags','post_price'=>......

koothon
今天
6
0
支付宝返回状态resultStatus意思

上一篇集成支付宝的时候,会有一些支付宝返回的resultStatus,具体意思是: 9000 订单支付成功 8000 正在处理中 4000 订单支付失败 6001 用户中途取消 6002 网络连接出错 还有memo,意思就是...

RainOrz
今天
3
0
electron webview 页面加载事件顺序

1.did-start-loading 页面开始加载 2.load-commit 主页面文档加载 3.page-title-updated title 4.dom-ready 主页面 dom 加载完成 5.load-commit frame文档加载 6.did-frame-finish-load fram......

dubox
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部