java利用zookeeper的Leader选举实现主从的切换

原创
2017/05/16 23:03
阅读数 1W

在工作中需要一个程序来处理,但是业务逻辑有不能有多个程序同时处理,但也不能没有程序处理。

但是考虑到原程序可能会挂,所以要启动多个,但是第二个第三个是从,待主挂了后他再处理。

LeaderLatch和LeaderLatchListener方法介绍

LeaderLatch提供了如下方法:

 start()/close():启动/停止LeaderLatch

 addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener

 hasLeadership():如果LeaderLatch是Leader,那么返回true,否则false。

 getLeader():

 await:等待Leaderlatch成为Leader。


LeaderLatchListener提供了如下方法:

  isLeader():当LeaderLatch的hasLeaderShip()从false到true后,就会调用isLeader(),表明这个LeaderLatch成为leader了。

  notLeader():当LeaderLatch的hahLeaderShip从true到false后,就会调用notLeader(),表明这个LeaderLatch不再是leader了。
LeaderLatch在Master-Slave中的应用

在一个典型的master-slave场景下。你可以在isLeader中做如下处理:

  1.每一个master类都有一个state属性,初始值为standby.

  2.在isLeader中,从持久话引擎中读取要恢复的数据到一个临时的内存缓存中

  3.将这个master的state修改为recovering

  4.通知所有worker将其内部的master修改为当前master。

  5.将临时内存缓存中的数据恢复到master内部。

  6.将master状态修改为alive,然后这个master就可以对外服务了。

注意第5步,由于将持久话引擎中的数据添加到了master内部的内存中,所以需要确保之多恢复一次语义。

 

lolLeaderLatch 项目(maven)

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.imddy</groupId>
	<artifactId>lolLeaderLatch</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>lolLeaderLatch</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<scope>test</scope>
		</dependency>
		<!-- commons-logging -->
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.2</version>
		</dependency>
		<!-- log4j -->
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
		<dependency>
		    <groupId>com.github.sgroschupf</groupId>
		    <artifactId>zkclient</artifactId>
		    <version>0.1</version>
		</dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.5.0</version>
        </dependency>
		
	</dependencies>

	<build>
		<plugins>
			<!-- 资源文件 utf-8编码 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-resources-plugin</artifactId>
				<configuration>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<!-- 编译源码版本和目标版本 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
			<!-- shade -->
			<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.4.1</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<transformers>
							<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
								<mainClass>com.imddy.lolLeaderLatch.test.TestMain</mainClass>
							</transformer>
						</transformers>
					</configuration>
				</execution>
			</executions>
		</plugin>
		</plugins>
	</build>
</project>

 

新建包名称:“package com.imddy.lolLeaderLatch.test;“

先创建 ZKClientInfo 类表示zk客户端的信息类

package com.imddy.lolLeaderLatch.test;

public class ZKClientInfo {

	// 是否是leader 默认为false
	public static boolean isLeader = false;

	// 客户端ID
	private String id;

	// 连接信息字符串
	private String connectString;

	// 节点路径
	private String path;

	// 连接超时时间
	private Integer connectTimeOut;

	// 最大连接次数
	private Integer maxRetries;

	// 重连休眠时间
	private Integer retrySleepTime;
	
	
	public String getConnectString() {
		return connectString == null ? null : connectString.replaceAll("\\s+", "");
	}

	public void setConnectString(String connectString) {
		this.connectString = connectString;
	}

	public String getPath() {
		return path;
	}

	public void setPath(String path) {
		this.path = path;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public Integer getConnectTimeOut() {
		return connectTimeOut;
	}

	public void setConnectTimeOut(Integer connectTimeOut) {
		this.connectTimeOut = connectTimeOut;
	}

	public Integer getMaxRetries() {
		return maxRetries;
	}

	public void setMaxRetries(Integer maxRetries) {
		this.maxRetries = maxRetries;
	}

	public Integer getRetrySleepTime() {
		return retrySleepTime;
	}

	public void setRetrySleepTime(Integer retrySleepTime) {
		this.retrySleepTime = retrySleepTime;
	}
	
	@Override
	public String toString() {
		return "ZKClientInfo{" +
				"id='" + id + '\'' +
				", isLeader=" + isLeader +
				", connectString='" + connectString + '\'' +
				", path='" + path + '\'' +
				", connectTimeOut=" + connectTimeOut +
				", maxRetries=" + maxRetries +
				", retrySleepTime=" + retrySleepTime +
				'}';
	}

}

 

ZKClient   zk的客服端类:

package com.imddy.lolLeaderLatch.test;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;


public class ZKClient {

    private LeaderLatch leader;

    private CuratorFramework client;
    
    public ZKClient (LeaderLatch leader,CuratorFramework client){
        this.client = client;
        this.leader = leader;
    }
    
    /**
     * 启动客户端
     * @throws Exception
     */
    public void startZKClient() throws Exception {
        client.start();
        leader.start();
    }

    /**
     * 关闭客户端
     * @throws Exception
     */
    public void closeZKClient() throws Exception {
        leader.close();
        client.close();
    }

    /**
     * 判断是否变为领导者
     * @return
     */
    public boolean hasLeadership(){
        return leader.hasLeadership() && ZKClientInfo.isLeader;
    }


	public LeaderLatch getLeader() {
		return leader;
	}

	public void setLeader(LeaderLatch leader) {
		this.leader = leader;
	}

	public CuratorFramework getClient() {
		return client;
	}

	public void setClient(CuratorFramework client) {
		this.client = client;
	}
    
    
	public static void main(String[] args) throws Exception {
		CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(new ExponentialBackoffRetry(5000, 3))
                        .connectionTimeoutMs(5000)
                        .build();

        LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatch", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        ZKClientListener zkClientListener = new ZKClientListener();
        leaderLatch.addListener(zkClientListener);


        ZKClient zkClient = new ZKClient(leaderLatch,client);
        zkClient.startZKClient();
        Thread.sleep(5000);

        int i = 0;
        while (i<15){
            //System.out.println("hasLeadership = "+zkClient.hasLeadership());
            Thread.sleep(1000);
            i++;
        }
        zkClient.closeZKClient();
        Thread.sleep(5000);
	}

}

 

ZKClientListener 类:

package com.imddy.lolLeaderLatch.test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;


public class ZKClientListener implements LeaderLatchListener {

	private static Log log = LogFactory.getLog(ZKClientListener.class);
	
	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
	
	@Override
	public void isLeader() {
		log.info( simpleDateFormat.format(new Date()) + "当前服务已变为leader,将从事消费业务");
		 ZKClientInfo.isLeader = true;
		
	}

	@Override
	public void notLeader() {
		log.info( simpleDateFormat.format(new Date()) + "当前服务已退出leader,不再从事消费业务");
		 ZKClientInfo.isLeader = false;

	}

}

 

再来两个任务类:

Test01Thread 作为任务1:

package com.imddy.lolLeaderLatch.test;

import java.text.SimpleDateFormat;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class Test01Thread extends Thread {
	
	private ZKClient zkClient;
	
	public ZKClient getZkClient() {
		return zkClient;
	}

	public void setZkClient(ZKClient zkClient) {
		this.zkClient = zkClient;
	}


	private static Log log = LogFactory.getLog(Test01Thread.class);
	
	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
	
	@Override
	public void run() {
		while (true) {
			
			try {
				//第一步leader验证
                if(!zkClient.hasLeadership()){
                    log.info("当前服务不是leader");
                    Thread.sleep(3000);
                    continue;
                }
                else {
                	log.info("当前服务是leader");
                }
				
                log.info("Test01 do it... ");
                
			} catch (Exception e) {
				
			}
			
		}
		
	}

}

 

Test02Thread 作为任务2:

package com.imddy.lolLeaderLatch.test;

import java.text.SimpleDateFormat;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class Test02Thread extends Thread {
	
	private ZKClient zkClient;
	
	public ZKClient getZkClient() {
		return zkClient;
	}

	public void setZkClient(ZKClient zkClient) {
		this.zkClient = zkClient;
	}


	private static Log log = LogFactory.getLog(Test02Thread.class);
	
	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
	
	@Override
	public void run() {
		while (true) {
			
			try {
				//第一步leader验证
                if(!zkClient.hasLeadership()){
                    log.info("当前服务不是leader");
                    Thread.sleep(3000);
                    continue;
                }
                else {
                	log.info("当前服务是leader");
                }
				
                log.info("Test02 do it... ");
                
			} catch (Exception e) {
				
			}
			
		}
		
	}

}

 

 

TestMain 主任务类:

package com.imddy.lolLeaderLatch.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TestMain {
	
	private static Log log = LogFactory.getLog(TestMain.class);

	public static void main(String[] args) throws Exception {
		CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(new ExponentialBackoffRetry(5000, 3))
                        .connectionTimeoutMs(5000)
                        .build();
		
		LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatch", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        ZKClientListener zkClientListener = new ZKClientListener();
        leaderLatch.addListener(zkClientListener);


        ZKClient zkClient = new ZKClient(leaderLatch,client);
        try {
        	zkClient.startZKClient();
		} catch (Exception e) {
			log.error("zk客户端连接失败");
			return;
		}
        log.info("zk客户端连接成功");
        
        Test01Thread test01Thread = new Test01Thread();
        test01Thread.setZkClient(zkClient);
        test01Thread.start();
        
        Test02Thread test02Thread = new Test02Thread();
        test02Thread.setZkClient(zkClient);
        test02Thread.start();
        
	}
}

 

分别打包,开启zookeeper后,然后执行连个lolLeaderLatch.jar文件 ,当关闭主后,从会自动接替主工作

结果如图:

 

那个可以是模拟程序关闭,还可以在程序中主动close直接切换(TestMain2):

package com.imddy.lolLeaderLatch.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TestMain2 {
	
	private static Log log = LogFactory.getLog(TestMain2.class);

	public static void main(String[] args) throws Exception {
		CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(new ExponentialBackoffRetry(5000, 3))
                        .connectionTimeoutMs(5000)
                        .build();
		
		LeaderLatch leaderLatch = new LeaderLatch(client, "/leaderLatch", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        ZKClientListener zkClientListener = new ZKClientListener();
        leaderLatch.addListener(zkClientListener);


        ZKClient zkClient = new ZKClient(leaderLatch,client);
        try {
        	zkClient.startZKClient();
		} catch (Exception e) {
			log.error("zk客户端连接失败");
			return;
		}
        log.info("zk客户端连接成功");
        
        Test01Thread test01Thread = new Test01Thread();
        test01Thread.setZkClient(zkClient);
        test01Thread.start();
        
        Test02Thread test02Thread = new Test02Thread();
        test02Thread.setZkClient(zkClient);
        test02Thread.start();
        
        
        System.out.println("sleep 100s ");
        Thread.sleep(100000);
        System.out.println("切换");
        zkClient.closeZKClient();
	}
}

 

 

LeaderSelectorListener的使用,多个程序轮流

package com.imddy.lolLeaderLatch.test;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class LeaderSelectorTest {

	public static void main(String[] args) throws Exception {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
				.sessionTimeoutMs(5000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("text").build();
		client.start();

		final LeaderSelector leaderSelector =  new LeaderSelector(client, "/led", new LeaderSelectorListenerAdapter() {
			
			@Override
			public void takeLeadership(CuratorFramework client) throws Exception {
				System.err.println("work ing...");
				Thread.currentThread().sleep(3000);
				System.err.println("end"); 
				
			}
		});
		leaderSelector.autoRequeue();
		leaderSelector.start();
		System.in.read();
	}
}

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部