文档章节

Zookeerper实现分布式锁服务

满小茂
 满小茂
发布于 2016/04/12 13:57
字数 682
阅读 590
收藏 7

利用Curator客户端API,实现分布式事务锁.

1.Maven 坐标配置

<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-recipes</artifactId>  
    <version>2.5.0</version>  
</dependency>  

2.利用API封装类 CuratorUtil.java


import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CuratorUtil implements InitializingBean {
    private static final Logger LOG = LoggerFactory.getLogger(CuratorUtil.class);

    //zookeeper.connection.url=172.30.0.177:2181,172.30.0.173:2181,172.30.0.171:2181
    //zookeeper.iread.lock.path=/iread/source/lock

    @Value("${zookeeper.connection.url}")
    private String zookeeperConnectionString;

    @Value("${zookeeper.lockPath.prefix}")
    private String lockPathPrefix;

    private CuratorFramework client;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    /**
     * 获取锁。返回不为null表示成功获取到锁,用完之后需要调用releaseLock方法释放
     * @param relativePath 锁的相对路径,Not start with '/'
     * @param waitSeconds 等待秒数
     * @return 未获取到锁返回null
     */
    public InterProcessMutex getLock(String relativePath, int waitSeconds) {
        InterProcessMutex lock = new InterProcessMutex(client, lockPathPrefix + relativePath);
        try {
            if (lock.acquire(waitSeconds, TimeUnit.SECONDS)) {
                return lock;
            }
        } catch (Exception e) {
            LOG.error("get lock error", e);
        }
        releaseLock(lock);
        return null;
    }

    /**
     * 释放锁
     */
    public void releaseLock(InterProcessMutex lock) {
        if (lock != null && lock.isAcquiredInThisProcess()) {
            try {
                lock.release();
            } catch (Exception e) {
                LOG.warn("release lock error", e);
            }
        }
    }
}

3.调用方法

import javax.annotation.Resource;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;


/**
 * 抽象分布式Job(自动获取和释放ZK分布式锁) <br>
 * 子类实现process()方法进行业务处理
 */
public abstract class AbstractDistributedJob {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    /** 至少锁60秒 */
    private static final long LOCK_MIN_TIME = 60000;

    @Resource
    private CuratorUtil curatorUtil;

    public void run() {
        InterProcessMutex lock = curatorUtil.getLock(getClass().getSimpleName() + "/lock", 1);
        if (lock == null) {
            LOG.info("can not get lock, exit job.");
            return;
        }

        long st = System.currentTimeMillis();
        LOG.info("start job...");

        try {
            process();
        } catch (Exception e) {
            LOG.error("job error", e);
        } finally {
            long cost = System.currentTimeMillis() - st;
            LOG.info("job finished, cost {} ms.", cost);

            if (cost < LOCK_MIN_TIME) {
                try {
                    Thread.sleep(LOCK_MIN_TIME - cost);
                } catch (InterruptedException e) {}
            }
            curatorUtil.releaseLock(lock);
        }
    }

    public abstract void process() throws Exception;
}

4.使用案例

        使用场景:当部署多台服务器时,有一个任务需要,如果没有分布式锁,则多台服务器都会执行这个任务,但是我们往往只想让其中一台服务器执行这个任务。

                  1.    5台tomcat服务器,部署相同的war包,每个tomcat服务器都会在凌晨2点执行一次消息推送,为了防止5台服务器都推送消息,部署三台zookeeper 服务器,5台Tomcat服务器都连接上zookeeper服务器,然后在推送消息的时候,获取锁的那台服务器执行任务,从而保证了Tomcat服务器集群只有一台服务器获取锁,执行任务。

                 2.    分布式调度,一台消息队列服务器MQ,多个业务逻辑服务器,多个业务逻辑服务器可以使用一个分布式锁去竞争消息队列数据,获取到锁的服务器获取数据,保证了消息队列的每条数据只被一台服务器获取,从而保证多台服务器并发执行任务。

 

redis实现分布式锁  

  https://yq.aliyun.com/articles/307547?spm=5176.100239.blogrightarea309637.19.af4dc28ybSYCy

 

© 著作权归作者所有

共有 人打赏支持
满小茂
粉丝 75
博文 119
码字总数 131884
作品 0
成都
程序员
私信 提问
一文弄懂“分布式锁”,一直以来你的选择依据正确吗?

我们本文主要会关注的问题是“分布式锁”的问题。 多线程情况下对共享资源的操作需要加锁,避免数据被写乱,在分布式系统中,这个问题也是存在的,此时就需要一个分布式锁服务。 常见的分布式...

向南
12/04
0
0
分布式锁与实现(二)基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包...

rechardchensir
10/08
0
0
Redis与Zookeeper实现分布式锁的区别

简介 一般而言,大多数系统实现分布式锁服务都会优先使用Redis;但阅读Zookeeper时可知,Zookeeper的一个很重要应用方向就是分布式锁。那么两者实现分布式锁服务的区别是什么呢。 实现难度 ...

沈渊
2017/10/15
0
0
分布式锁与实现(二)——基于ZooKeeper实现

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配...

刘祖鹏
05/08
0
0
使用数据库悲观锁实现不可重入的分布式锁

一、前言 在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源时候,juc包的锁就...

阿里加多
06/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

jquery通过id显示隐藏

var $div3 = $('#div3'); 显示 $div3.show(); 隐藏 $div3.hide();

yan_liu
今天
3
0
《乱世佳人》读书笔记及相关感悟3900字

《乱世佳人》读书笔记及相关感悟3900字: 之前一直听「荔枝」,后来不知怎的转向了「喜马拉雅」,一听就是三年。上班的时候听房产,买房了以后听装修,兴之所至时听旅行,分手后听亲密关系,...

原创小博客
今天
3
0
大数据教程(9.6)map端join实现

上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现; 一、需求 实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志...

em_aaron
今天
3
0
cookie与session详解

session与cookie是什么? session与cookie属于一种会话控制技术.常用在身份识别,登录验证,数据传输等.举个例子,就像我们去超市买东西结账的时候,我们要拿出我们的会员卡才会获取优惠.这时...

士兵7
今天
3
0
十万个为什么之为什么大家都说dubbo

Dubbo是什么? 使用背景 dubbo为什么这么流行, 为什么大家都这么喜欢用dubbo; 通过了解分布式开发了解到, 为适应访问量暴增,业务拆分后, 子应用部署在多台服务器上,而多台服务器通过可以通过d...

尾生
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部