文档章节

Zookeerper实现分布式锁服务

满小茂
 满小茂
发布于 2016/04/12 13:57
字数 682
阅读 585
收藏 7

利用Curator客户端API,实现分布式事务锁.

1.Maven 坐标配置

<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-recipes</artifactId>  
    <version>2.5.0</version>  
</dependency>  

2.利用API封装类 CuratorUtil.java


import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CuratorUtil implements InitializingBean {
    private static final Logger LOG = LoggerFactory.getLogger(CuratorUtil.class);

    //zookeeper.connection.url=172.30.0.177:2181,172.30.0.173:2181,172.30.0.171:2181
    //zookeeper.iread.lock.path=/iread/source/lock

    @Value("${zookeeper.connection.url}")
    private String zookeeperConnectionString;

    @Value("${zookeeper.lockPath.prefix}")
    private String lockPathPrefix;

    private CuratorFramework client;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    /**
     * 获取锁。返回不为null表示成功获取到锁,用完之后需要调用releaseLock方法释放
     * @param relativePath 锁的相对路径,Not start with '/'
     * @param waitSeconds 等待秒数
     * @return 未获取到锁返回null
     */
    public InterProcessMutex getLock(String relativePath, int waitSeconds) {
        InterProcessMutex lock = new InterProcessMutex(client, lockPathPrefix + relativePath);
        try {
            if (lock.acquire(waitSeconds, TimeUnit.SECONDS)) {
                return lock;
            }
        } catch (Exception e) {
            LOG.error("get lock error", e);
        }
        releaseLock(lock);
        return null;
    }

    /**
     * 释放锁
     */
    public void releaseLock(InterProcessMutex lock) {
        if (lock != null && lock.isAcquiredInThisProcess()) {
            try {
                lock.release();
            } catch (Exception e) {
                LOG.warn("release lock error", e);
            }
        }
    }
}

3.调用方法

import javax.annotation.Resource;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;


/**
 * 抽象分布式Job(自动获取和释放ZK分布式锁) <br>
 * 子类实现process()方法进行业务处理
 */
public abstract class AbstractDistributedJob {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    /** 至少锁60秒 */
    private static final long LOCK_MIN_TIME = 60000;

    @Resource
    private CuratorUtil curatorUtil;

    public void run() {
        InterProcessMutex lock = curatorUtil.getLock(getClass().getSimpleName() + "/lock", 1);
        if (lock == null) {
            LOG.info("can not get lock, exit job.");
            return;
        }

        long st = System.currentTimeMillis();
        LOG.info("start job...");

        try {
            process();
        } catch (Exception e) {
            LOG.error("job error", e);
        } finally {
            long cost = System.currentTimeMillis() - st;
            LOG.info("job finished, cost {} ms.", cost);

            if (cost < LOCK_MIN_TIME) {
                try {
                    Thread.sleep(LOCK_MIN_TIME - cost);
                } catch (InterruptedException e) {}
            }
            curatorUtil.releaseLock(lock);
        }
    }

    public abstract void process() throws Exception;
}

4.使用案例

        使用场景:当部署多台服务器时,有一个任务需要,如果没有分布式锁,则多台服务器都会执行这个任务,但是我们往往只想让其中一台服务器执行这个任务。

                  1.    5台tomcat服务器,部署相同的war包,每个tomcat服务器都会在凌晨2点执行一次消息推送,为了防止5台服务器都推送消息,部署三台zookeeper 服务器,5台Tomcat服务器都连接上zookeeper服务器,然后在推送消息的时候,获取锁的那台服务器执行任务,从而保证了Tomcat服务器集群只有一台服务器获取锁,执行任务。

                 2.    分布式调度,一台消息队列服务器MQ,多个业务逻辑服务器,多个业务逻辑服务器可以使用一个分布式锁去竞争消息队列数据,获取到锁的服务器获取数据,保证了消息队列的每条数据只被一台服务器获取,从而保证多台服务器并发执行任务。

 

redis实现分布式锁  

  https://yq.aliyun.com/articles/307547?spm=5176.100239.blogrightarea309637.19.af4dc28ybSYCy

 

© 著作权归作者所有

共有 人打赏支持
满小茂
粉丝 68
博文 119
码字总数 131754
作品 0
成都
程序员
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
05/08
0
0
分布式锁与实现(一)——基于Redis实现

概述 目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consisten...

飓风2000
09/12
0
0
Redis与Zookeeper实现分布式锁的区别

简介 一般而言,大多数系统实现分布式锁服务都会优先使用Redis;但阅读Zookeeper时可知,Zookeeper的一个很重要应用方向就是分布式锁。那么两者实现分布式锁服务的区别是什么呢。 实现难度 ...

沈渊
2017/10/15
0
0
分布式锁原理及常用实现

起因 前段时间,看到redis作者发布的一篇文章《Is Redlock safe?》,Redlock是redis作者基于redis设计的分布式锁的算法。文章起因是有一位分布式的专家写了一篇文章《How to do distributed...

偶尔诗文
2016/11/09
46
0
一句话说清分布式锁,进程锁,线程锁

  在分布式集群系统的开发中,线程锁往往并不能支持全部场景的使用,必须引入新的技术方案分布式锁。 线程锁,进程锁,分布式锁   线程锁:大家都不陌生,主要用来给方法、代码块加锁。当...

刘洋intsmaze
2017/09/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

GO 数组相关操作

package mainimport("fmt""math/rand""time")func main() {//数组的几种定义方式var arr1 [3]int = [3]int{1,2,3}var arr2 = [3]int{4,5,6}arr3 := [3]string{"h", "w", ......

汤汤圆圆
22分钟前
0
0
JAVA 中interrupt、interrupted和isInterrupted的区别

首先,我们说明下三个方法的功能 interrupt() 向当前调用者线程发出中断信号 isinterrupted() 查看当前中断信号是true还是false interrupted() 是静态方法,查看返回当前中断信号并将中断信号...

我爱春天的毛毛雨
26分钟前
0
0
Coding and Paper Letter(二十二)

资源整理。 1 Coding: 1.开源项目openeo api。oponEO开发了一个开放的API,以简单统一的方式将R,python和javascript客户端连接到对地观测大数据云平台的后台。 此存储库包含此API,即oponE...

胖胖雕
52分钟前
1
0
RxJS的另外四种实现方式(三)——性能最高的库

接上篇 RxJS的另外四种实现方式(二)——代码最小的库(续) 代码最小的库rx4rx-lite虽然在性能测试中超过了callbag,但和most库较量的时候却落败了,于是我下载了most库,要解开most库性能...

一个灰
今天
4
0
马太效应

马太效应

yizhichao
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部