文档章节

zookeeper 分布式锁

果子核
 果子核
发布于 2017/06/05 09:54
字数 558
阅读 13
收藏 0
点赞 0
评论 0

     使用锁存在一个问题,在有限时间内释放,也可能被其他等待的线程获取锁,所以在锁内部仍然需要通过查询来判断是否已经被更新过;

获取锁和根据锁的获取结果回调函数

public class ZkDistributedLockTemplate implements DistributedLockTemplate {
    private static final org.slf4j.Logger log = LoggerFactory.getLogger(ZkDistributedLockTemplate.class);

    private CuratorFramework client;


    public ZkDistributedLockTemplate(CuratorFramework client) {
        this.client = client;
    }



    private boolean tryLock(ZkReentrantLock distributedReentrantLock,Long timeout) throws Exception {
        return distributedReentrantLock.tryLock(timeout, TimeUnit.MILLISECONDS);
    }

    public Object execute(String lockId, int timeout, Callback callback) {
        ZkReentrantLock distributedReentrantLock = null;
        boolean getLock=false;
        try {
            distributedReentrantLock = new ZkReentrantLock(client,lockId);
            if(tryLock(distributedReentrantLock,new Long(timeout))){
                getLock=true;
                return callback.onGetLock();
            }else{
                return callback.onTimeout();
            }
        }catch(InterruptedException ex){
            log.error(ex.getMessage(), ex);
            Thread.currentThread().interrupt();
        }catch (Exception e) {
            log.error(e.getMessage(), e);
        }finally {
            if(getLock){
                distributedReentrantLock.unlock();
            }
        }
        return null;
    }
}

 

分布式锁的核心实现:

public class ZkReentrantLock implements DistributedReentrantLock {
    private static final org.slf4j.Logger log = LoggerFactory.getLogger(ZkReentrantLock.class);

    /**
     * 线程池
     */
    private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);

    /**
     * 所有PERSISTENT锁节点的根位置
     */
    public static final String ROOT_PATH = "/ROOT_LOCK/";

    /**
     * 每次延迟清理PERSISTENT节点的时间  Unit:MILLISECONDS
     */
    private long delayTimeForClean = 1000;

    /**
     * zk 共享锁实现
     */
    private InterProcessMutex interProcessMutex = null;


    /**
     * 锁的ID,对应zk一个PERSISTENT节点,下挂EPHEMERAL节点.
     */
    private String path;


    /**
     * zk的客户端
     */
    private CuratorFramework client;



    public ZkReentrantLock(CuratorFramework client, String lockId) {
        init(client, lockId);
    }

    public void init(CuratorFramework client, String lockId) {
        this.client = client;
        this.path = ROOT_PATH + lockId;
        interProcessMutex = new InterProcessMutex(client, this.path);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            return interProcessMutex.acquire(timeout, unit);
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e) {
            log.error(e.getMessage(),e);
            throw new RuntimeException(e.getMessage(),e);
        }
    }

    @Override
    public void unlock() {
        try {
            interProcessMutex.release();
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        } finally {
            executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
        }
    }

    static class Cleaner implements Runnable {
        CuratorFramework client;
        String path;

        public Cleaner(CuratorFramework client, String path) {
            this.client = client;
            this.path = path;
        }

        public void run() {
            try {
                List list = client.getChildren().forPath(path);
                if (list == null || list.isEmpty()) {
                    client.delete().forPath(path);
                }
            } catch (KeeperException.NoNodeException e1) {
                //nothing
            } catch (KeeperException.NotEmptyException e2) {
                //nothing
            } catch (Exception e) {
                log.error(e.getMessage(), e);//准备删除时,正好有线程创建锁
            }
        }
    }


    private boolean isEmpty(List<String> list){
        return list==null||list.isEmpty();
    }
}

测试代码:

@Test
public void testTry() throws InterruptedException {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.2.6:2181",
        retryPolicy);
    client.start();

    final ZkDistributedLockTemplate template=new ZkDistributedLockTemplate(client);
    int size=100;
    final CountDownLatch startCountDownLatch = new CountDownLatch(1);
    final CountDownLatch endDownLatch=new CountDownLatch(size);
    for (int i =0;i<size;i++){
        new Thread() {
            public void run() {
                try {
                    startCountDownLatch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                final int sleepTime=ThreadLocalRandom.current().nextInt(5)*1000;
                template.execute("locktest",5000, new Callback() {
                    public Object onGetLock() throws InterruptedException {
                        System.out.println(Thread.currentThread().getName() + ":getLock");
                        Thread.currentThread().sleep(sleepTime);
                        System.out.println(Thread.currentThread().getName() + ":sleeped:"+sleepTime);
                        endDownLatch.countDown();
                        return null;
                    }
                    public Object onTimeout() throws InterruptedException {
                        System.out.println(Thread.currentThread().getName() + ":timeout");
                        endDownLatch.countDown();
                        return null;
                    }
                });
            }
        }.start();
    }
    startCountDownLatch.countDown();
    endDownLatch.await();
}

 

预览结果,当锁释放后,线程信息全部置空,locktest变回叶子节点

 

具体参考:

https://github.com/yujiasun/Distributed-Kit

© 著作权归作者所有

共有 人打赏支持
果子核
粉丝 2
博文 39
码字总数 10055
作品 0
海淀
程序员
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
05/08
0
0
分布式设计与开发(三)------高一致性服务ZooKeeper

分布式环境中大多数服务是允许部分失败,也允许数据不一致,但有些最基础的服务是需要高可靠性,高一致性的,这些服务是其他分布式服务运转的基础,比如naming service、分布式lock等,这些分...

山哥
2012/03/19
0
0
ZooKeeper学习第一期---Zookeeper简单介绍

一、分布式协调技术 在给大家介绍ZooKeeper之前先来给大家介绍一种技术——分布式协调技术。那么什么是分布式协调技术?那么我来告诉大家,其实分布式协调技术 主要用来解决分布式环境当中多...

卯金刀GG
2017/10/25
0
0
ZooKeeper分布式专题与Dubbo微服务入门

ZooKeeper分布式专题与Dubbo微服务入门 网盘地址:https://pan.baidu.com/s/1TN6BlftB2uvvyVR7IDmODQ 密码: e6zt 备用地址(腾讯微云):https://share.weiyun.com/5539X2S 密码:65b36i Zo...

人气王子333
04/17
0
0
ZooKeeper可以用来做什么(转)

在ZooKeeper的官网上有这么一句话:ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing grou......

easonjim
2017/09/05
0
0
分布式网站架构后续:zookeeper技术浅析

 Zookeeper是hadoop的一个子项目,虽然源自hadoop,但是我发现zookeeper脱离hadoop的范畴开发分布式框架的运用越来越多。今天我想谈谈zookeeper,本文不谈如何使用zookeeper,而是zookeeper...

蓝狐乐队
2014/04/21
0
0
阿里大数据攻城师教你怎样理解ZooKeeper(六)

Zookeeper是什么 是一个针对大型分布式系统的可靠协调系统; 提供的功能包括:配置维护、名字服务、分布式同步、组服务等; 目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效...

JAVA丶学习
04/13
0
0
ZooKeeper的应用场景(转)

应用场景1 :统一命名服务 分布式应用中,通常需要一套完备的命令机制,既能产生唯一的标识,又方便人识别和记忆。 我们知道,每个ZNode都可以由其路径唯一标识,路径本身也比较简洁直观,另...

easonjim
2017/09/05
0
0
基于Zookeeper的分布式锁

zookeeper中几个关于节点的有趣的性质: 有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock...

素雷
2017/10/25
0
0
ZooKeeper学习第四期---构建ZooKeeper应用

一、配置服务 配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。简单地说,ZooKeeper可以作为一个具有高可用性的配置存储器,允许分布式应用的...

卯金刀GG
2017/10/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

c++ qt 组播总结

每个人都有不同的认知规律和习惯, 有的人喜欢搞一套严密的大理论, 论述起来滔滔不绝, 不管自己懂不懂, 反正读者/听者是没搞懂。 有的人喜欢从实践出发, 没看到代码, 不运行一下, 不看...

backtrackx
7分钟前
0
0
Sublime text2安装json格式化插件SublimePrettyJson[Windows]

一、下载SublimePrettyJson插件包 https://github.com/dzhibas/SublimePrettyJson 二、将下载的文件解压放到在package目录下面 C:\Users\lucky\AppData\Roaming\Sublime Text 3\Packages 每个......

lazy~
7分钟前
0
0
安装vue-cli 报4058错误

1. 4058是网络代理错误。 安装淘宝源修改一下就可以了: npm --registry https://registry.npm.taobao.org info underscore 改为cnpm执行: cnpm install --global vue-cli 安装成功: 试试版...

MrBoyce
8分钟前
0
0
CPU飙升分析

1、top -----看具体的进程 2、top -H -p pid ------该进程的线程 3、printf 0x%x 15248 ------将线程改为16进制 4、jstack 进程...

北极之北
11分钟前
1
0
新生代Eden与两个Survivor区的解释

聊聊JVM的年轻代 1.为什么会有年轻代 我们先来屡屡,为什么需要把堆分代?不分代不能完成他所做的事情么?其实不分代完全可以,分代的唯一理由就是优化GC性能。你先想想,如果没有分代,那我...

浮躁的码农
13分钟前
0
0
【JVM】JSTATD结合Java VisualVM进行远程监控JVM运行情况(二)

内存泄露指的是程序中动态分配内存给一些临时对象,但是对象不会被GC(java垃圾回收机制gabage collection)所回收,它始终占用内存。即被分配的对象很大但已无用; 内存溢出指的是程序运行过...

大白来袭
16分钟前
2
0
聊聊ribbon的超时时间设置

序 本文主要研究一下ribbon的超时时间设置 配置 实例 ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 eureka: enabled: ......

go4it
24分钟前
0
0
一行代码结果叹为观止,能做到这么极致的也只有python了

Python 这门语言非常的有趣,不仅可以做高大上的人工智能、大数据、机器学习。还可以用来做 Web、爬虫。还有其它很多的应用。今天我就给大家展示下一行 Python 代码都可以做些什么。 一行打印...

猫咪编程
28分钟前
2
0
KingShard使用

对于kingshard的功能,在git中可以看到明确的功能说明 主要功能: 1. 基础功能 支持SQL读写分离。 支持透明的MySQL连接池,不必每次新建连接。 支持平滑上线DB或下线DB,前端应用无感知。 支...

mickelfeng
30分钟前
0
0
Linux 下 查找某个字符串

如果你想在当前项目下 查找 "test" 这个字符串,可以这样: grep -rn "test" * * : 表示当前目录所有文件,也可以是某个文件名-r 是递归查找-n 是显示行号-R ...

nsns
30分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部