Zookeeper动态上下线感应(代码演示)

原创
2017/03/29 17:14
阅读数 89

客户端:

package com.dd171290.zk.line;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

/**
 * 用于模拟客户端的启动操作
 * @author root
 *
 */
public class Client {
	private static final String CONNECTSTRING="sf-node1.hadoop.com:2181,sf-node2.hadoop.com:2181,sf-node3.hadoop.com:2181";
	private static final int SESSION_TIME_OUT=2000;
	private static final String SERVER_PATH="/servers";
	//使用volalite关键字时为了确保多线程在进行共享该变量操作时,都能够正确访问数据。
	//但是该关键字并不能确保线程安全,只有当该变量进行的是原子操作时,线程是安全的。
	private volatile List<String> serverList=new ArrayList<String>();
	private static ZooKeeper zk;
	static CountDownLatch latch=new CountDownLatch(1);
	public Client() {
		try {
			zk=new ZooKeeper(CONNECTSTRING, SESSION_TIME_OUT, new Watcher(){
				public void process(WatchedEvent event) {
					//获取服务器列表,并再次注册监听
					try {
						if(event.getState()==KeeperState.SyncConnected){
							latch.countDown();
						}
						getServerList();
						doBussiness();
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				
			});
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 获取已经注册服务器列表
	 * @throws Exception
	 */
	public void getServerList() throws Exception{
		//获取子节点,并监听该节点的变化,当该节点的子节点有增加或者删除时,触发监听
		List<String> children=zk.getChildren(SERVER_PATH, true);
		List<String> servers=new ArrayList<String>();
		for (String child : children) {
			//获取子节点存取的数据
			byte[] server=zk.getData(SERVER_PATH+"/"+child, false, null);
			servers.add(new String(server));
		}
		serverList=servers;
	}
	
	public void doBussiness(){
		System.out.println("client:"+serverList);
	}
	
	public static void main(String[] args) throws Exception {
		//实例化时已经建立zk连接
		Client client=new Client();
		if(States.CONNECTING==zk.getState()){
			latch.await();
		}
		//获取服务武器列表,并注册监听
		client.getServerList();
		//多个线程开始处理业务
		client.doBussiness();
		Thread.sleep(Long.MAX_VALUE);
		
	}
}

服务器端:

package com.dd171290.zk.line;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper.States;

public class Server {
	private static final String CONNECTSTRING="sf-node1.hadoop.com:2181,sf-node2.hadoop.com:2181,sf-node3.hadoop.com:2181";
	private static final int SESSION_TIME_OUT=2000;
	private static final String SERVER_PATH="/servers";
	private static ZooKeeper zk;
	static CountDownLatch latch=new CountDownLatch(1);
	public Server() {
		try {
			//获取zk连接
			zk=new ZooKeeper(CONNECTSTRING, SESSION_TIME_OUT, new Watcher(){
				public void process(WatchedEvent event) {
					if(event.getState()==KeeperState.SyncConnected){
						latch.countDown();
					}
				}
			});
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public void doBussiness(String hostname){
		System.out.println(hostname+" starts working....");
	}
	public static void main(String[] args) {
		Server server=new Server();
		if(args.length!=1){
			System.out.println("服务器参数名未指定!");
			System.exit(-1);
		}
		String hostname=args[0];
		try {
			//判断节点是否是否存在,不存在返回null值,并且创建该节点。这里容易出现错误: KeeperErrorCode = ConnectionLoss for /servers
			//原因在于此时正在与zookeeper服务集群连接,即zk的状态时CONNECTING,并没有真正的建立连接
			//就解决办法:CountDownLatch 倒数计数
			
			if(States.CONNECTING==zk.getState()){
				latch.await();
			}
			if(zk.exists(SERVER_PATH, false) == null){
				zk.create(SERVER_PATH, "server".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
			String path=zk.create(SERVER_PATH+"/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			System.out.println("服务器注册的路径是:"+path);
			Thread.sleep(Integer.MAX_VALUE);
		} catch (KeeperException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
}

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部