文档章节

zookeepr监听器和api应用(高级)

zhanggong
 zhanggong
发布于 2017/03/08 23:21
字数 1393
阅读 11
收藏 0

##Zookeeper的监听器工作机制

监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
监听器的注册是在获取数据的操作中实现:
getData(path,watch?)监听的事件是:节点数据变化事件
getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件

#zookeeper应用案例(分布式应用HA||分布式锁) ##1 实现分布式应用的(主节点HA)及客户端动态更新主节点状态 某分布式系统中,主节点可以有多台,可以动态上下线
任意一台客户端都能实时感知到主节点服务器的上下线

###A、客户端实现 public class AppClient { private String groupNode = "sgroup"; private ZooKeeper zk; private Stat stat = new Stat(); private volatile List<String> serverList;

	/**
	 * 连接zookeeper
	 */
	public void connectZookeeper() throws Exception {
		zk 
= new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {
			public void process(WatchedEvent event) {
				// 如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听
				if (event.getType() == EventType.NodeChildrenChanged 
					&& ("/" + groupNode).equals(event.getPath())) {
					try {
						updateServerList();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		});

		updateServerList();
	}

	/**
	 * 更新server列表
	 */
	private void updateServerList() throws Exception {
		List<String> newServerList = new ArrayList<String>();

		// 获取并监听groupNode的子节点变化
		// watch参数为true, 表示监听子节点变化事件. 
		// 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
		List<String> subList = zk.getChildren("/" + groupNode, true);
		for (String subNode : subList) {
			// 获取每个子节点下关联的server地址
			byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
			newServerList.add(new String(data, "utf-8"));
		}

		// 替换server列表
		serverList = newServerList;

		System.out.println("server list updated: " + serverList);
	}

	/**
	 * client的工作逻辑写在这个方法中
	 * 此处不做任何处理, 只让client sleep
	 */
	public void handle() throws InterruptedException {
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {
		AppClient ac = new AppClient();
		ac.connectZookeeper();

		ac.handle();
	}
}

###B、服务器端实现 public class AppServer { private String groupNode = "sgroup"; private String subNode = "sub";

	/**
	 * 连接zookeeper
	 * [@param](https://my.oschina.net/u/2303379) address server的地址
	 */
	public void connectZookeeper(String address) throws Exception {
		ZooKeeper zk = new ZooKeeper(
"localhost:4180,localhost:4181,localhost:4182", 
5000, new Watcher() {
			public void process(WatchedEvent event) {
				// 不做处理
			}
		});
		// 在"/sgroup"下创建子节点
		// 子节点的类型设置为EPHEMERAL_SEQUENTIAL, 表明这是一个临时节点, 且在子节点的名称后面加上一串数字后缀
		// 将server的地址数据关联到新创建的子节点上
		String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"), 
			Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("create: " + createdPath);
	}
	
	/**
	 * server的工作逻辑写在这个方法中
	 * 此处不做任何处理, 只让server sleep
	 */
	public void handle() throws InterruptedException {
		Thread.sleep(Long.MAX_VALUE);
	}
	
	public static void main(String[] args) throws Exception {
		// 在参数中指定server的地址
		if (args.length == 0) {
			System.err.println("The first argument must be server address");
			System.exit(1);
		}
		
		AppServer as = new AppServer();
		as.connectZookeeper(args[0]);
		as.handle();
	}
}

##2分布式共享锁的简单实现 ###客户端A public class DistributedClient { // 超时时间 private static final int SESSION_TIMEOUT = 5000; // zookeeper server列表 private String hosts = "localhost:4180,localhost:4181,localhost:4182"; private String groupNode = "locks"; private String subNode = "sub";

    private ZooKeeper zk;
    // 当前client创建的子节点
    private String thisPath;
    // 当前client等待的子节点
    private String waitPath;

    private CountDownLatch latch = new CountDownLatch(1);

    /**
     * 连接zookeeper
     */
    public void connectZookeeper() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                try {
                    // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
                    if (event.getState() == KeeperState.SyncConnected) {
                        latch.countDown();
                    }

                    // 发生了waitPath的删除事件
                    if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                        doSomething();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待连接建立
        latch.await();

        // 创建子节点
        thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        // wait一小会, 让结果更清晰一些
        Thread.sleep(10);

        // 注意, 没有必要监听"/locks"的子节点的变化情况
        List<String> childrenNodes = zk.getChildren("/" + groupNode, false);

        // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
        if (childrenNodes.size() == 1) {
            doSomething();
        } else {
            String thisNode = thisPath.substring(("/" + groupNode + "/").length());
            // 排序
            Collections.sort(childrenNodes);
            int index = childrenNodes.indexOf(thisNode);
            if (index == -1) {
                // never happened
            } else if (index == 0) {
                // inddx == 0, 说明thisNode在列表中最小, 当前client获得锁
                doSomething();
            } else {
                // 获得排名比thisPath前1位的节点
                this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
                // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
                zk.getData(waitPath, true, new Stat());
            }
        }
    }

    private void doSomething() throws Exception {
        try {
            System.out.println("gain lock: " + thisPath);
            Thread.sleep(2000);
            // do something
        } finally {
            System.out.println("finished: " + thisPath);
            // 将thisPath删除, 监听thisPath的client将获得通知
            // 相当于释放锁
            zk.delete(this.thisPath, -1);
        }
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                public void run() {
                    try {
                        DistributedClient dl = new DistributedClient();
                        dl.connectZookeeper();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }

        Thread.sleep(Long.MAX_VALUE);
    }
}

##分布式多进程模式实现:

public class DistributedClientMy {
	

	// 超时时间
	private static final int SESSION_TIMEOUT = 5000;
	// zookeeper server列表
	private String hosts = "spark01:2181,spark02:2181,spark03:2181";
	private String groupNode = "locks";
	private String subNode = "sub";
	private boolean haveLock = false;

	private ZooKeeper zk;
	// 当前client创建的子节点
	private volatile String thisPath;

	/**
	 * 连接zookeeper
	 */
	public void connectZookeeper() throws Exception {
		zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {
			public void process(WatchedEvent event) {
				try {

					// 子节点发生变化
					if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
						// thisPath是否是列表中的最小节点
						List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
						String thisNode = thisPath.substring(("/" + groupNode + "/").length());
						// 排序
						Collections.sort(childrenNodes);
						if (childrenNodes.indexOf(thisNode) == 0) {
							doSomething();
							thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
									CreateMode.EPHEMERAL_SEQUENTIAL);
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});

		// 创建子节点
		thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
				CreateMode.EPHEMERAL_SEQUENTIAL);

		// wait一小会, 让结果更清晰一些
		Thread.sleep(new Random().nextInt(1000));

		// 监听子节点的变化
		List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

		// 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
		if (childrenNodes.size() == 1) {
			doSomething();
			thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
					CreateMode.EPHEMERAL_SEQUENTIAL);
		}
	}

	/**
	 * 共享资源的访问逻辑写在这个方法中
	 */
	private void doSomething() throws Exception {
		try {
			System.out.println("gain lock: " + thisPath);
			Thread.sleep(2000);
			// do something
		} finally {
			System.out.println("finished: " + thisPath);
			// 将thisPath删除, 监听thisPath的client将获得通知
			// 相当于释放锁
			zk.delete(this.thisPath, -1);
		}
	}

	public static void main(String[] args) throws Exception {
		DistributedClientMy dl = new DistributedClientMy();
		dl.connectZookeeper();
		Thread.sleep(Long.MAX_VALUE);
	}

	
}

需要大数据视频资料的同学可以加QQ群:105067810向群主索取。

© 著作权归作者所有

zhanggong
粉丝 5
博文 51
码字总数 26791
作品 0
深圳
程序员
私信 提问
基于zookeeper的配置管理中心

废话不多说,先睹为快: https://github.com/ruanjianlxm/panda-config 基于同事的stc-config整理 解决问题:在分布式系统中,一般一个服务部署在多个机子上,多实例的部署的确提高了服务的整...

明舞
2015/11/06
2.9K
2
分布式异步编程框架--Koper

Koper Koper是一个基于消息和事件驱动的分布式异步编程框架, Koper的设计目标是为大型场景提供高性能高吞吐的简单编程方案。 Koper为分布式环境提供了简化的消息监听和数据事件监听模型,它...

raymondkk
2016/11/18
819
1
iNexus 0.1 发布,基于 Raft 的分布式协调组件

iNexus 0.1 发布,更新内容如下: v0.1 功能列表: 基本的Get,Put,Scan,Delete Watch(注册变更通知),Lock(分布式抢锁) 高可用:集群选主、数据一致性复制 过期日志整理、数据压缩 C++客户端...

fxsjy
2015/08/19
1K
6
zookeepr--概览

zk 一个分布式应用协调服务 zk是一个分布式,开源的,分布式协调服务,他提供了一组简单的原生接口,分布式应用可以基于它实现,高水准的同步,集群,配置管理和命名服务。它基于开发,使用简...

wannshan
2016/01/22
34
0
ZooKeeper教程资源收集(简介/原理/示例/解决方案)

菩提树下的杨过: ZooKeeper 笔记(1) 安装部署及hello world ZooKeeper 笔记(2) 监听数据变化 ZooKeeper 笔记(3) 实战应用之【统一配置管理】 ZooKeeper 笔记(4) 实战应用之【消除单点故障】...

easonjim
2017/09/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0
简述TCP的流量控制与拥塞控制

1. TCP流量控制 流量控制就是让发送方的发送速率不要太快,要让接收方来的及接收。 原理是通过确认报文中窗口字段来控制发送方的发送速率,发送方的发送窗口大小不能超过接收方给出窗口大小。...

鏡花水月
今天
10
0
OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
1K
11
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部