文档章节

Zookeerper实现分布式锁服务

满小茂
 满小茂
发布于 2016/04/12 13:57
字数 715
阅读 3K
收藏 8

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

利用Curator客户端API,实现分布式事务锁.

1.Maven 坐标配置

<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-recipes</artifactId>  
    <version>2.5.0</version>  
</dependency>  

2.利用API封装类 CuratorUtil.java


import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CuratorUtil implements InitializingBean {
    private static final Logger LOG = LoggerFactory.getLogger(CuratorUtil.class);

    //zookeeper.connection.url=172.30.0.177:2181,172.30.0.173:2181,172.30.0.171:2181
    //zookeeper.iread.lock.path=/iread/source/lock

    @Value("${zookeeper.connection.url}")
    private String zookeeperConnectionString;

    @Value("${zookeeper.lockPath.prefix}")
    private String lockPathPrefix;

    private CuratorFramework client;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    /**
     * 获取锁。返回不为null表示成功获取到锁,用完之后需要调用releaseLock方法释放
     * @param relativePath 锁的相对路径,Not start with '/'
     * @param waitSeconds 等待秒数
     * @return 未获取到锁返回null
     */
    public InterProcessMutex getLock(String relativePath, int waitSeconds) {
        InterProcessMutex lock = new InterProcessMutex(client, lockPathPrefix + relativePath);
        try {
            if (lock.acquire(waitSeconds, TimeUnit.SECONDS)) {
                return lock;
            }
        } catch (Exception e) {
            LOG.error("get lock error", e);
        }
        releaseLock(lock);
        return null;
    }

    /**
     * 释放锁
     */
    public void releaseLock(InterProcessMutex lock) {
        if (lock != null && lock.isAcquiredInThisProcess()) {
            try {
                lock.release();
            } catch (Exception e) {
                LOG.warn("release lock error", e);
            }
        }
    }
}

3.调用方法

import javax.annotation.Resource;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;


/**
 * 抽象分布式Job(自动获取和释放ZK分布式锁) <br>
 * 子类实现process()方法进行业务处理
 */
public abstract class AbstractDistributedJob {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    /** 至少锁60秒 */
    private static final long LOCK_MIN_TIME = 60000;

    @Resource
    private CuratorUtil curatorUtil;

    public void run() {
        InterProcessMutex lock = curatorUtil.getLock(getClass().getSimpleName() + "/lock", 1);
        if (lock == null) {
            LOG.info("can not get lock, exit job.");
            return;
        }

        long st = System.currentTimeMillis();
        LOG.info("start job...");

        try {
            process();
        } catch (Exception e) {
            LOG.error("job error", e);
        } finally {
            long cost = System.currentTimeMillis() - st;
            LOG.info("job finished, cost {} ms.", cost);

            if (cost < LOCK_MIN_TIME) {
                try {
                    Thread.sleep(LOCK_MIN_TIME - cost);
                } catch (InterruptedException e) {}
            }
            curatorUtil.releaseLock(lock);
        }
    }

    public abstract void process() throws Exception;
}

4.使用案例

        使用场景:当部署多台服务器时,有一个任务需要,如果没有分布式锁,则多台服务器都会执行这个任务,但是我们往往只想让其中一台服务器执行这个任务。

                  1.    5台tomcat服务器,部署相同的war包,每个tomcat服务器都会在凌晨2点执行一次消息推送,为了防止5台服务器都推送消息,部署三台zookeeper 服务器,5台Tomcat服务器都连接上zookeeper服务器,然后在推送消息的时候,获取锁的那台服务器执行任务,从而保证了Tomcat服务器集群只有一台服务器获取锁,执行任务。

                 2.    分布式调度,一台消息队列服务器MQ,多个业务逻辑服务器,多个业务逻辑服务器可以使用一个分布式锁去竞争消息队列数据,获取到锁的服务器获取数据,保证了消息队列的每条数据只被一台服务器获取,从而保证多台服务器并发执行任务。

 

redis实现分布式锁  

  https://yq.aliyun.com/articles/307547?spm=5176.100239.blogrightarea309637.19.af4dc28ybSYCy

 redisson分布式锁

  https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#81-lock

 

© 著作权归作者所有

满小茂
粉丝 80
博文 122
码字总数 139030
作品 0
成都
程序员
私信 提问
加载中

评论(0)

2.Zookeeper-Kafka-Flume-Sqoop

Zookeeper知识点分为: 系统模型、客户端设计、会话管理、集群业务处理、序列化、持久化、服务器启动、选举流程 Zookeeper提供了什么: 一.文件系统(znode) 1)PERSISTENT-持久化目录节点 客...

osc_3rorki68
2018/07/26
2
0
安装kafka过程及出现的问题解决

第一步:下载kafka安装包 下载地址:http://kafka.apache.org/downloads 解压 到/usr/local 目录 tar -zxvf kafka_2.12-2.2.0 第二步 下载zookeerper(之所以下载 是因为我使用kafka自带的 会...

osc_uo9elnxq
2019/03/28
3
0
分布式改造剧集2---DIY分布式锁

前言: 好了,终于又开始播放分布式改造剧集了。前面一集中(http://www.cnblogs.com/Kidezyq/p/8748961.html)我们DIY了一个Hessian转发实现,最后我们也留下了一个展望方向:可以实现一个管理...

osc_6nds72mv
2018/05/01
2
0
一文弄懂“分布式锁”,一直以来你的选择依据正确吗?

我们本文主要会关注的问题是“分布式锁”的问题。 多线程情况下对共享资源的操作需要加锁,避免数据被写乱,在分布式系统中,这个问题也是存在的,此时就需要一个分布式锁服务。 常见的分布式...

向南
2018/12/04
0
0
java就业指南 zookeeper分布式系统 zookeeper实现分布式锁 有用

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个 分布式系统都无法同时满足一致性(Consistency)...

osc_jnyhv0pz
2018/05/09
4
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring中@Import的三种情况

我们在使用Spring框架中,特别是框架级的功能,经常看到有@Import导入功能, 我就介绍下它能导入什么,首先声明下@Import是注解,导入类型可分为三类: 1. 导入配置 @Configuration,类似于s...

董广明
3分钟前
3
0
Java中的类型推断和lambda表达式

简介 java是强类型的编程语言,每个java中使用到的变量都需要定义它的类型,否则会编译失败。强类型语言的好处就是可以尽可能的在编译期间就发现代码中可能出现的问题,从而减少在运行时出现...

flydean
4分钟前
7
1
How to install Docker on CentOS8

How to install Docker on CentOS8 Step 1, Prepare Update OS # yum update -y Install download tool wget # yum install wget -y Step 2, Install containerd.io To install docker in ......

Iridium
5分钟前
0
0
Java字节码

Java字节码对于虚拟机,就好像汇编语言对于计算机,属于基本执行指令。每一个Java字节码指令是一个byte数字,并且有一个对应的助记符。 Java虚拟机常用指令 常量入栈指令 常量入栈指令的功能...

算法之名
今天
66
0
腾讯副总裁魏颖:提瓢入市,倚杖而归

  魏颖,腾讯公司副总裁,2008 年加入腾讯,全面负责公司薪酬福利、绩效管理、员工关系以及海外业务人力资源。   ————————   很多人对人力资源(HR)工作的理解就是一些人事流...

alkcendkljk
今天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部