文档章节

ZooKeeper客户端Curator(监听篇)

星逝流
 星逝流
发布于 2015/12/01 10:49
字数 1758
阅读 1911
收藏 3

maven依赖

<dependency>

            <groupId>org.apache.curator</groupId>

            <artifactId>curator-recipes</artifactId>

            <version>2.8.0</version>   

        </dependency>

Path Cache

Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态。(此处需要注意,他只会监听一级子节点,不会循环监听子节点下面的child)

实际使用时会涉及到四个类:

  • PathChildrenCache

  • PathChildrenCacheEvent

  • PathChildrenCacheListener

  • ChildData

创建Path Cache

通过下面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)

想使用cache,必须调用它的start方法,不用之后调用close方法。
start有两个, 其中一个可以传入StartMode,用来为初始的cache设置暖场方式(warm):

个人建议使用此构造方法,ExecutorService使用自己构建的线程池,保证线程可控

添加监控节点状态

添加监控节点状态的改变的Listener时,建议使用此方法,手动传入我们构建的线程池,保证线程可控。

childrenCache.getListenable().(T listener, Executor executor);

为什么要自己传入线程池呢?看下图和源码

 public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
    {
        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
    }
 public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

因为在zk丢失链接的时候,默认不传线程池的方法,在每次重连时都会会新new 出来一个线程池,线程一直处于活跃状态,如果zk服务端长时间未能恢复,会导致客户端现成打满

设置/更新、移除

设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:

client.setData().forPath(path, bytes);
client.create().creatingParentsIfNeeded().forPath(path, bytes);
client.delete().forPath(path);

而查询缓存使用下面的方法:

for (ChildData data : cache.getCurrentData()) {	
    System.out.println(data.getPath() + " = " + new String(data.getData()));
}

最后附上一段完整的代码

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * ZKClient
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:21
 */
public class ChildPathZkClient {
	// ip和端口url
	private String url;
	// 需要监听的base path
	private String basePath;

	private static CuratorFramework client = null;
	private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 5l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());

	public void init() throws Throwable {
		if (basePath == null) {
			basePath = "o2o/zk/cache";
		}
		client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 0)).build();
		client.start();
		/**
		 * 监听子节点的变化情况
		 */
		watchChild("/");
	}

	protected static void watchChild(String path) throws Exception {
		PathChildrenCache childrenCache = new PathChildrenCache(client, path, true, false, executor);
		ZkPathListener listener = new ZkPathListener();
		listener.setPathChildrenCache(childrenCache);
		childrenCache.getListenable().addListener(listener, executor);
		childrenCache.start();
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getBasePath() {
		return basePath;
	}

	public void setBasePath(String basePath) {
		this.basePath = basePath;
	}

	public static void main(String[] args) throws Throwable {
		CountDownLatch latch = new CountDownLatch(1);
		client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000)
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		client.start();

		/**
		 * 监听子节点的变化情况
		 */
		watchChild("/");
		latch.await();
	}

}
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;

import com.jd.o2o.web.product.constants.zk.ZkSwitchEnum;
import com.jd.o2o.web.product.constants.zk.ZkValueEnum;
import com.jd.o2o.zk.cache.serializer.ZooKeeperSerializer;
import com.jd.o2o.zk.cache.serializer.impl.JdkSerializationZooKeeperSerializer;

/**
 * ZK监听器
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:38
 */
public class ZkPathListener implements PathChildrenCacheListener {
	private final Logger logger = LogManager.getLogger(ZkPathListener .class);
	private final ZooKeeperSerializer<?> defaultSerializer = new JdkSerializationZooKeeperSerializer();
	private PathChildrenCache pathChildrenCache;

	@Override
	public void childEvent(CuratorFramework paramCuratorFramework, PathChildrenCacheEvent event) throws Exception {
		switch (event.getType()) {
		case CHILD_ADDED:
			// TODO
			System.out.println(defaultSerializer.deserialize(event.getData().getData()));
			break;
		case CHILD_UPDATED:
			// TODO
			System.out.println(defaultSerializer.deserialize(event.getData().getData()));
			break;
		case CHILD_REMOVED:
			// TODO
			System.out.println(defaultSerializer.deserialize(event.getData().getData()));
			break;
		default:
			break;
		}
	}
}



Node Cache

Node Cache用来监控一个ZNode. 当节点的数据修改或者删除时,Node Cache能更新它的状态包含最新的改变。

涉及到下面的三个类:

  • NodeCache

  • NodeCacheListener

  • ChildData

构建NodeCache

 public NodeCache(CuratorFramework client, String path)

想使用cache,依然要调用它的start方法,不用之后调用close方法。

添加监控节点状态

添加监控节点状态的改变的Listener时,建议使用此方法,手动传入我们构建的线程池,保证线程可控。

NodeCache.getListenable().(T listener, Executor executor);

同样附一段完整的代码

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * ZK链接
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:21
 */
public class ZkNodeClient {
	// ip和端口url
	private String url;
	// 需要监听的base path
	private String basePath;
        private static NodeCache nodeCache;
	private static CuratorFramework client = null;
	private final static ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();

	public void init() throws Throwable {
		if (basePath == null) {
			basePath = "o2o/zk/cache";
		}
		client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 0)).build();
		client.start();
		nodeCache = new NodeCache(client, "/");
		/**
		 * 监听子节点的变化情况
		 */
		watchChild();
		
	}

	protected static void watchChild() throws Exception {
		
		nodeCache.getListenable().addListener(new NodeCacheListener() {

			@Override
			public void nodeChanged() throws Exception {
				System.out.println(nodeCache.getCurrentData().getPath() + ":" + nodeCache.getCurrentData().getData());
			}
		}, EXECUTOR_SERVICE);
		nodeCache.start();
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getBasePath() {
		return basePath;
	}

	public void setBasePath(String basePath) {
		this.basePath = basePath;
	}

	public static void main(String[] args) throws Throwable {
		CountDownLatch latch = new CountDownLatch(1);
		client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000)
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		client.start();

		/**
		 * 监听子节点的变化情况
		 */
		watchChild("/");
		latch.await();
	}

}



Tree Node

这种类型的即可以监控节点的状态,还监控节点的子节点的状态, 类似上面两种cache的组合。 这也就是Tree的概念。 它监控整个树中节点的状态。(只要是所监听的路径下的所有叶子节点都会监听)

涉及到下面四个类。

  • TreeCache

  • TreeCacheListener

  • TreeCacheEvent

  • ChildData

构造TreeCache

而关键的TreeCache的构造函数为

public TreeCache(CuratorFramework client, String path)

添加监控节点状态

添加监控节点状态的改变的Listener时,建议使用此方法,手动传入我们构建的线程池,保证线程可控。

TreeCache.getListenable().(T listener, Executor executor);

想使用cache,依然要调用它的start方法,不用之后调用close方法。

getCurrentChildren()返回cache的状态,类型为Map。 而getCurrentData()返回监控的path的数据。

最后同样附一段完整代码

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * ZK链接
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:21
 */
public class TreeClient {
	// ip和端口url
	private String url;
	// 需要监听的base path
	private String basePath;

	private static CuratorFramework client = null;
	private static TreeCache cache = null;
	private static ZkTreeListener listener = new ZkTreeListener();
	private static ExecutorService executorService = Executors.newSingleThreadExecutor();

	public void init() throws Throwable {
		if (basePath == null) {
			basePath = "o2o/zk/cache";
		}
		// 修改重连次数,使用最初的线程进行重连监听,不重新新建线程 ExponentialBackoffRetry(1000, 0)
		client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
		client.start();
		/**
		 * 监听子节点的变化情况
		 */
		watchChild("/product");
		watchChild("/switch");

	}

	protected static void watchChild(String path) throws Exception {
		// 改用TreeCacheListener,免除循环监听子节点的问题
		cache = new TreeCache(client, path);
		cache.getListenable().addListener(listener, executorService);
		cache.start();
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getBasePath() {
		return basePath;
	}

	public void setBasePath(String basePath) {
		this.basePath = basePath;
	}

	public static void main(String[] args) throws Throwable {
		CountDownLatch latch = new CountDownLatch(1);
		client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000)
				.retryPolicy(new ExponentialBackoffRetry(1000, 0)).build();
		client.start();

		/**
		 * 监听子节点的变化情况
		 */
		watchChild("/product");
                watchChild("/switch");
		latch.await();
	}

}
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;

import com.jd.o2o.web.product.constants.zk.ZkSwitchEnum;
import com.jd.o2o.web.product.constants.zk.ZkValueEnum;
import com.jd.o2o.zk.cache.serializer.ZooKeeperSerializer;
import com.jd.o2o.zk.cache.serializer.impl.JdkSerializationZooKeeperSerializer;

/**
 * TreeCache ZK监听器
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:38
 */
public class ZkTreeListener implements TreeCacheListener {
	private final Logger logger = LogManager.getLogger(TreeCacheListener .class);
	private final ZooKeeperSerializer<?> defaultSerializer = new JdkSerializationZooKeeperSerializer();

	@Override
	public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
		System.out.println(event.getData().getPath());
		switch (event.getType()) {
		case NODE_ADDED:
			if (event.getData().getData() == null) {
				break;
			}
			//TODO
			break;
		case NODE_UPDATED:
			if (event.getData().getData() == null) {
				break;
			}
			//TODO
			break;
		default:
			break;
		}
	}
}


© 著作权归作者所有

星逝流
粉丝 14
博文 54
码字总数 84838
作品 0
昌平
高级程序员
私信 提问
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh
2018/04/28
0
0
ZooKeeper 客户端之 Curator

原文链接:ZooKeeper 客户端之 Curator ZooKeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现。它是集群的管理者,监视着集群中各个节点的状态...

glmapper
04/13
0
0
Apache Curator Zookeeper客户端

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。 1.Zookeeper安装部署 Zookeeper的部署很简单,...

laigous
2017/03/02
105
0
使用Curator实现的zookeeper分布式锁出现的Unimplemented for {root.path}

问题描述 Curator使用 ZooKeeper 作为分布式锁,启动时发生该异常。 Curator 客户端版本:curator-recipes-2.10.0 ZooKeeper 服务器版本:3.4.13 异常日志 以及 问题分析 UnimplementedExcep...

loubobooo
02/27
102
0
Apache Curator 3.0.0 发布,ZooKeeper 客户端简化

Apache Curator 3.0.0 发布,该版本带来的对 ZooKeeper 的新的动态配置 APIs 的兼容,更新内容如下: 子任务 [CURATOR-160] - Support Dynamic Reconfig [CURATOR-161] - Support Watcher R...

oschina
2015/10/15
1K
3

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
808
11
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
15
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部