文档章节

java利用zookeeper的Leader选举实现主从的切换

独钓渔
 独钓渔
发布于 2017/05/16 23:03
字数 1621
阅读 200
收藏 0
点赞 0
评论 0

在工作中需要一个程序来处理,但是业务逻辑有不能有多个程序同时处理,但也不能没有程序处理。

但是考虑到原程序可能会挂,所以要启动多个,但是第二个第三个是从,待主挂了后他再处理。

LeaderLatch和LeaderLatchListener方法介绍

LeaderLatch提供了如下方法:

 start()/close():启动/停止LeaderLatch

 addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener

 hasLeadership():如果LeaderLatch是Leader,那么返回true,否则false。

 getLeader():

 await:等待Leaderlatch成为Leader。


LeaderLatchListener提供了如下方法:

  isLeader():当LeaderLatch的hasLeaderShip()从false到true后,就会调用isLeader(),表明这个LeaderLatch成为leader了。

  notLeader():当LeaderLatch的hahLeaderShip从true到false后,就会调用notLeader(),表明这个LeaderLatch不再是leader了。
LeaderLatch在Master-Slave中的应用

在一个典型的master-slave场景下。你可以在isLeader中做如下处理:

  1.每一个master类都有一个state属性,初始值为standby.

  2.在isLeader中,从持久话引擎中读取要恢复的数据到一个临时的内存缓存中

  3.将这个master的state修改为recovering

  4.通知所有worker将其内部的master修改为当前master。

  5.将临时内存缓存中的数据恢复到master内部。

  6.将master状态修改为alive,然后这个master就可以对外服务了。

注意第5步,由于将持久话引擎中的数据添加到了master内部的内存中,所以需要确保之多恢复一次语义。

 

lolLeaderLatch 项目(maven)

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.imddy</groupId>
	<artifactId>lolLeaderLatch</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>lolLeaderLatch</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<scope>test</scope>
		</dependency>
		<!-- commons-logging -->
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.2</version>
		</dependency>
		<!-- log4j -->
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
		<dependency>
		    <groupId>com.github.sgroschupf</groupId>
		    <artifactId>zkclient</artifactId>
		    <version>0.1</version>
		</dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.5.0</version>
        </dependency>
		
	</dependencies>

	<build>
		<plugins>
			<!-- 资源文件 utf-8编码 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-resources-plugin</artifactId>
				<configuration>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<!-- 编译源码版本和目标版本 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
			<!-- shade -->
			<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.4.1</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<transformers>
							<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
								<mainClass>com.imddy.lolLeaderLatch.test.TestMain</mainClass>
							</transformer>
						</transformers>
					</configuration>
				</execution>
			</executions>
		</plugin>
		</plugins>
	</build>
</project>

 

新建包名称:“package com.imddy.lolLeaderLatch.test;“

先创建 ZKClientInfo 类表示zk客户端的信息类

package com.imddy.lolLeaderLatch.test;

public class ZKClientInfo {

	// 是否是leader 默认为false
	public static boolean isLeader = false;

	// 客户端ID
	private String id;

	// 连接信息字符串
	private String connectString;

	// 节点路径
	private String path;

	// 连接超时时间
	private Integer connectTimeOut;

	// 最大连接次数
	private Integer maxRetries;

	// 重连休眠时间
	private Integer retrySleepTime;
	
	
	public String getConnectString() {
		return connectString == null ? null : connectString.replaceAll("\\s+", "");
	}

	public void setConnectString(String connectString) {
		this.connectString = connectString;
	}

	public String getPath() {
		return path;
	}

	public void setPath(String path) {
		this.path = path;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public Integer getConnectTimeOut() {
		return connectTimeOut;
	}

	public void setConnectTimeOut(Integer connectTimeOut) {
		this.connectTimeOut = connectTimeOut;
	}

	public Integer getMaxRetries() {
		return maxRetries;
	}

	public void setMaxRetries(Integer maxRetries) {
		this.maxRetries = maxRetries;
	}

	public Integer getRetrySleepTime() {
		return retrySleepTime;
	}

	public void setRetrySleepTime(Integer retrySleepTime) {
		this.retrySleepTime = retrySleepTime;
	}
	
	@Override
	public String toString() {
		return "ZKClientInfo{" +
				"id='" + id + '\'' +
				", isLeader=" + isLeader +
				", connectString='" + connectString + '\'' +
				", path='" + path + '\'' +
				", connectTimeOut=" + connectTimeOut +
				", maxRetries=" + maxRetries +
				", retrySleepTime=" + retrySleepTime +
				'}';
	}

}

 

ZKClient   zk的客服端类:

package com.imddy.lolLeaderLatch.test;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;


public class ZKClient {

    private LeaderLatch leader;

    private CuratorFramework client;
    
    public ZKClient (LeaderLatch leader,CuratorFramework client){
        this.client = client;
        this.leader = leader;
    }
    
    /**
     * 启动客户端
     * @throws Exception
     */
    public void startZKClient() throws Exception {
        client.start();
        leader.start();
    }

    /**
     * 关闭客户端
     * @throws Exception
     */
    public void closeZKClient() throws Exception {
        leader.close();
        client.close();
    }

    /**
     * 判断是否变为领导者
     * @return
     */
    public boolean hasLeadership(){
        return leader.hasLeadership() && ZKClientInfo.isLeader;
    }


	public LeaderLatch getLeader() {
		return leader;
	}

	public void setLeader(LeaderLatch leader) {
		this.leader = leader;
	}

	public CuratorFramework getClient() {
		return client;
	}

	public void setClient(CuratorFramework client) {
		this.client = client;
	}
    
    
	public static void main(String[] args) throws Exception {
		CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(new ExponentialBackoffRetry(5000, 3))
                        .connectionTimeoutMs(5000)
                        .build();

        LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatch", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        ZKClientListener zkClientListener = new ZKClientListener();
        leaderLatch.addListener(zkClientListener);


        ZKClient zkClient = new ZKClient(leaderLatch,client);
        zkClient.startZKClient();
        Thread.sleep(5000);

        int i = 0;
        while (i<15){
            //System.out.println("hasLeadership = "+zkClient.hasLeadership());
            Thread.sleep(1000);
            i++;
        }
        zkClient.closeZKClient();
        Thread.sleep(5000);
	}

}

 

ZKClientListener 类:

package com.imddy.lolLeaderLatch.test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;


public class ZKClientListener implements LeaderLatchListener {

	private static Log log = LogFactory.getLog(ZKClientListener.class);
	
	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
	
	@Override
	public void isLeader() {
		log.info( simpleDateFormat.format(new Date()) + "当前服务已变为leader,将从事消费业务");
		 ZKClientInfo.isLeader = true;
		
	}

	@Override
	public void notLeader() {
		log.info( simpleDateFormat.format(new Date()) + "当前服务已退出leader,不再从事消费业务");
		 ZKClientInfo.isLeader = false;

	}

}

 

再来两个任务类:

Test01Thread 作为任务1:

package com.imddy.lolLeaderLatch.test;

import java.text.SimpleDateFormat;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class Test01Thread extends Thread {
	
	private ZKClient zkClient;
	
	public ZKClient getZkClient() {
		return zkClient;
	}

	public void setZkClient(ZKClient zkClient) {
		this.zkClient = zkClient;
	}


	private static Log log = LogFactory.getLog(Test01Thread.class);
	
	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
	
	@Override
	public void run() {
		while (true) {
			
			try {
				//第一步leader验证
                if(!zkClient.hasLeadership()){
                    log.info("当前服务不是leader");
                    Thread.sleep(3000);
                    continue;
                }
                else {
                	log.info("当前服务是leader");
                }
				
                log.info("Test01 do it... ");
                
			} catch (Exception e) {
				
			}
			
		}
		
	}

}

 

Test02Thread 作为任务2:

package com.imddy.lolLeaderLatch.test;

import java.text.SimpleDateFormat;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class Test02Thread extends Thread {
	
	private ZKClient zkClient;
	
	public ZKClient getZkClient() {
		return zkClient;
	}

	public void setZkClient(ZKClient zkClient) {
		this.zkClient = zkClient;
	}


	private static Log log = LogFactory.getLog(Test02Thread.class);
	
	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
	
	@Override
	public void run() {
		while (true) {
			
			try {
				//第一步leader验证
                if(!zkClient.hasLeadership()){
                    log.info("当前服务不是leader");
                    Thread.sleep(3000);
                    continue;
                }
                else {
                	log.info("当前服务是leader");
                }
				
                log.info("Test02 do it... ");
                
			} catch (Exception e) {
				
			}
			
		}
		
	}

}

 

 

TestMain 主任务类:

package com.imddy.lolLeaderLatch.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TestMain {
	
	private static Log log = LogFactory.getLog(TestMain.class);

	public static void main(String[] args) throws Exception {
		CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(new ExponentialBackoffRetry(5000, 3))
                        .connectionTimeoutMs(5000)
                        .build();
		
		LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatch", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        ZKClientListener zkClientListener = new ZKClientListener();
        leaderLatch.addListener(zkClientListener);


        ZKClient zkClient = new ZKClient(leaderLatch,client);
        try {
        	zkClient.startZKClient();
		} catch (Exception e) {
			log.error("zk客户端连接失败");
			return;
		}
        log.info("zk客户端连接成功");
        
        Test01Thread test01Thread = new Test01Thread();
        test01Thread.setZkClient(zkClient);
        test01Thread.start();
        
        Test02Thread test02Thread = new Test02Thread();
        test02Thread.setZkClient(zkClient);
        test02Thread.start();
        
	}
}

 

分别打包,开启zookeeper后,然后执行连个lolLeaderLatch.jar文件 ,当关闭主后,从会自动接替主工作

结果如图:

 

那个可以是模拟程序关闭,还可以在程序中主动close直接切换(TestMain2):

package com.imddy.lolLeaderLatch.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TestMain2 {
	
	private static Log log = LogFactory.getLog(TestMain2.class);

	public static void main(String[] args) throws Exception {
		CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(new ExponentialBackoffRetry(5000, 3))
                        .connectionTimeoutMs(5000)
                        .build();
		
		LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatch", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        ZKClientListener zkClientListener = new ZKClientListener();
        leaderLatch.addListener(zkClientListener);


        ZKClient zkClient = new ZKClient(leaderLatch,client);
        try {
        	zkClient.startZKClient();
		} catch (Exception e) {
			log.error("zk客户端连接失败");
			return;
		}
        log.info("zk客户端连接成功");
        
        Test01Thread test01Thread = new Test01Thread();
        test01Thread.setZkClient(zkClient);
        test01Thread.start();
        
        Test02Thread test02Thread = new Test02Thread();
        test02Thread.setZkClient(zkClient);
        test02Thread.start();
        
        
        System.out.println("sleep 100s ");
        Thread.sleep(100000);
        System.out.println("切换");
        zkClient.closeZKClient();
	}
}

 

 

LeaderSelectorListener的使用,多个程序轮流

package com.imddy.lolLeaderLatch.test;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class LeaderSelectorTest {

	public static void main(String[] args) throws Exception {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
				.sessionTimeoutMs(5000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("text").build();
		client.start();

		final LeaderSelector leaderSelector =  new LeaderSelector(client, "/led", new LeaderSelectorListenerAdapter() {
			
			@Override
			public void takeLeadership(CuratorFramework client) throws Exception {
				System.err.println("work ing...");
				Thread.currentThread().sleep(3000);
				System.err.println("end"); 
				
			}
		});
		leaderSelector.autoRequeue();
		leaderSelector.start();
		System.in.read();
	}
}

 

© 著作权归作者所有

共有 人打赏支持
独钓渔
粉丝 47
博文 373
码字总数 142587
作品 0
沙坪坝
系统管理员
Dubbo分布式框架:(二)Zookeeper实战

一.Zookeeper简介 讲到Dubbo不得不说它的核心组件Zookeeper,ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它...

HaleyLiu ⋅ 05/04 ⋅ 0

安装ZooKeeper(单机、伪集群、集群)

安装ZooKeeper(单机、伪集群、集群)博客分类: 大数据平台架构移动互联网 关键字:安装ZooKeeper(单机、伪集群、集群) 推荐学习列表: zookeeper jvm设置:http://www.th7.cn/Program/j...

qq_27264789 ⋅ 04/17 ⋅ 0

ZooKeeper学习笔记八 ZooKeeper典型应用场景——命名服务

《从Paxos到ZooKeeper分布式一致性原理与实践》 电子工业出版社 命名服务是分布式系统中比较常见的一类场景。命名服务是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体通常可...

xundh ⋅ 05/02 ⋅ 0

高性能 RPC 框架 Dubbo 从入门到深入-服务注册中心搭建(详细)

一、前言 整体来说,一个公司业务系统的演进流程基本都是从单体应用到多应用。在单体应用时,不同业务模块相互调用直接在本地 JVM 进程内就可以完成,而变为多个应用时,相互之间进行通信的方...

加多 ⋅ 01/26 ⋅ 0

Spark1.6.0功能扩展——为HiveThriftServer2增加HA

前言 HiveThriftServer2是Spark基于HiveServer2实现的多Session管理的Thrift服务,提供对Hive的集中式管理服务。HiveThriftServer2作为Yarn上的Application,目前只支持yarn-client模式——即...

beliefer ⋅ 04/16 ⋅ 0

zookeeper单机多实例部署

主题介绍 介绍zookeeper单机多实例部署,更多适合于实验性质;ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,...

computer306 ⋅ 04/22 ⋅ 0

ZooKeeper的伪分布式集群搭建以及真分布式集群搭建

zk集群的一些基本概念 zookeeper集群搭建: zk集群,主从节点,心跳机制(选举模式) 配置数据文件 myid 1/2/3 对应 server.1/2/3 通过 zkCli.sh -server [ip]:[port] 命令检测集群是否配置成...

ZeroOne01 ⋅ 04/24 ⋅ 0

使用 Docker 一步搞定 ZooKeeper 集群的搭建

ZooKeeper为分布式应用提供高效、高可用的分布式协调服务,它有三种运行模式:单机模式、伪集群模式和集群模式。本文通过探索ZooKeeper的官方Docker镜像,来看看怎样快速搭建一个ZooKeeper的...

qq_41587243 ⋅ 05/25 ⋅ 0

linux下zookeeper的集群搭建

第一次写博客 写的不好不要骂┗|`O′|┛ 嗷~~ 首先说一下Zookeeper的概念: Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务 A、zookeeper是为别的分布式程序服务的...

zhaobocan ⋅ 05/06 ⋅ 0

docker容器中搭建kafka集群环境

Kafka集群管理、状态保存是通过zookeeper实现,所以先要搭建zookeeper集群 zookeeper集群搭建 一、软件环境: zookeeper集群需要超过半数的的node存活才能对外服务,所以服务器的数量应该是2...

qq_41587243 ⋅ 05/25 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

android -------- 颜色的半透明效果配置

最近有朋友问我 Android 背景颜色的半透明效果配置,我网上看资料,总结了一下, 开发中也是常常遇到的,所以来写篇博客 常用的颜色值格式有: RGB ARGB RRGGBB AARRGGBB 这4种 透明度 透明度...

切切歆语 ⋅ 7分钟前 ⋅ 0

CentOS开机启动subversion

建立自启动脚本: vim /etc/init.d/subversion 输入如下内容: #!/bin/bash## subversion startup script for the server## chkconfig: 2345 90 10# description: start the subve......

随风而飘 ⋅ 10分钟前 ⋅ 0

Nginx + uwsgi @ubuntu

uwsgi 安装 sudo apt-get install python3-pip # 注意 ubuntu python3默认没有安装pippython3 -m pip install uwsgi 代码(test.py) def application(env, start_response): start_res......

袁祾 ⋅ 11分钟前 ⋅ 0

版本控制工具

CSV , SVN , GIT ,VSS

颖伙虫 ⋅ 13分钟前 ⋅ 0

【2018.06.19学习笔记】【linux高级知识 13.1-13.3】

13.1 设置更改root密码 13.2 连接mysql 13.3 mysql常用命令

lgsxp ⋅ 21分钟前 ⋅ 0

LVM

LVM: 硬盘划分分区成物理卷->物理卷组成卷组->卷组划分逻辑分区。 1.磁盘分区: fdisk /dev/sdb 划分几个主分区 输入t更改每个分区类型为8e(LVM) 使用partprobe生成分区的文件:如/dev/sd...

ZHENG-JY ⋅ 49分钟前 ⋅ 0

彻底删除Microsoft Office的方法

参照此链接彻底删除Office https://support.office.com/zh-cn/article/%e4%bb%8e-pc-%e5%8d%b8%e8%bd%bd-office-9dd49b83-264a-477a-8fcc-2fdf5dbf61d8?ui=zh-CN&rs=zh-CN&ad=CN......

Kampfer ⋅ 今天 ⋅ 0

大盘与个股之间关系

大盘走多:积极出手 顺势加码 大盘走空: 少量出手 退场观望 大盘做头:逆势减码 少量操作 大盘做底 : 小量建仓 小量试单

guozenhua ⋅ 今天 ⋅ 0

Day16 LVM(逻辑卷管理)与磁盘故障小案例

lvm详解 简述 LVM的产生是因为传统的分区一旦分区好后就无法在线扩充空间,也存在一些工具能实现在线扩充空间但是还是会面临数据损坏的风险;传统的分区当分区空间不足时,一般的解决办法是再...

杉下 ⋅ 今天 ⋅ 0

rsync实现多台linux服务器的文件同步

一、首先安装rsync,怎样安装都行,rpm,yum,还是你用源码安装都可以。因为我用的是阿里云的ESC,yum install rsync就ok了。 二、配置rsync服务 1.先建立个同步数据的帐号 123 groupadd r...

在下头真的很硬 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部