文档章节

ZooKeeper与Curator注册和监控

 如风达
发布于 2016/11/15 17:35
字数 1420
阅读 181
收藏 2

Curator提供了对zookeeper客户端的封装,并监控连接状态和会话session,特别是会话session过期后,curator能够重新连接zookeeper,并且创建一个新的session。

对于zk的使用者来说,session的概念至关重要,如果想了解更多session的说明,请访问:

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html

zk客户端和zk服务器间主要可能存在下面几种异常情况:

  1. 短暂失去连接:此时客户端检测到与服务端的连接已经断开,但是服务端维护的客户端session尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于session没有过期,zookeeper能够保证连接恢复后保持正常服务。
  2. 失去连接时间很长:此时服务器相对于客户端的session已经过期了,与先前session相关的watcher和ephemeral的路径和数据都会消失;当Curator重新创建了与zk的连接后,会获取到session expired异常,Curator会销毁先前的session,并且会创建一个新的session,需要注意的是,与之前session相关的watcher和ephemeral类型的路径和数据在新的session中也不会存在,需要开发者在CuratorFramework.getConnectionStateListenable().addListener()中添加状态监听事件,对ConnectionState.LOST事件进行监听,当session过期后,使得之前的session状态得以恢复。对于ephemeral类型,在客户端应该保持数据的状态,以便及时恢复。
  3. 客户端重新启动:不论先前的zk session是否已经过期,都需要重新创建临时节点、添加数据和watch事件,先前的session也会在稍后的一段时间内过期。
  4. Zk服务器重新启动:由于zk将session信息存放到了硬盘上,因此重启后,先前未过期的session仍然存在,在zk服务器启动后,客户端与zk服务器创建新的连接,并使用先前的session,与1相同。
  5. 需要注意的是,当session过期了,在session过期期间另外的客户端修改了zk的值,那么这个修改在客户端重新连接到zk上时,zk客户端不会接收到这个修改的watch事件(尽管添加了watch),如果需要严格的watch逻辑,就需要在curator的状态监控中添加逻辑。

特别提示:watcher仅仅是一次性的,zookeeper通知了watcher事件后,就会将这个watcher从session中删除,因此,如果想继续监控,就要添加新的watcher。

 

下面提供了对persistent和ephemeral两种类型节点的监控方法,其中get方法说明了persistent节点如何监控,而register方法说明了ephemeral类型的节点如何监控。

package demo;

import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorTest {
    private CuratorFramework zkTools;
    private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>();
    private static Charset charset = Charset.forName("utf-8");

    public CuratorTest() {
        zkTools = CuratorFrameworkFactory.builder()
                .connectString("192.168.0.216:3306")
                .namespace("zk/test")
                .retryPolicy(new RetryNTimes(2000, 20000))
                .build();
        zkTools.start();
    }

    public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType,
            final CuratorWatcher watcher) {
        synchronized (this) {
            if (!watchers.contains(watcher.toString()))// 不要添加重复的监听事件
            {
                watchers.add(watcher.toString());
                System.out.println("add new watcher " + watcher);
                zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                    @Override
                    public void stateChanged(CuratorFramework client, ConnectionState newState) {
                        System.out.println(newState);
                        if (newState == ConnectionState.LOST) {// 处理session过期
                            try {
                                if (watcherType == ZookeeperWatcherType.EXITS) {
                                    zkTools.checkExists().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) {
                                    zkTools.getChildren().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.GET_DATA) {
                                    zkTools.getData().usingWatcher(watcher).forPath(path);
                                } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) {
                                    // ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中,
                                    // 会处理create事件,将路径值恢复到先前状态
                                    Stat stat = zkTools.checkExists().usingWatcher(watcher)
                                            .forPath(path);
                                    if (stat == null) {
                                        System.err.println("to create");
                                        zkTools.create().creatingParentsIfNeeded()
                                                .withMode(CreateMode.EPHEMERAL)
                                                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
        }
    }

    public void create() throws Exception {
        zkTools.create()// 创建一个路径
                .creatingParentsIfNeeded()// 如果指定的节点的父节点不存在,递归创建父节点
                .withMode(CreateMode.PERSISTENT)// 存储类型(临时的还是持久的)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 访问权限
                .forPath("zk/test");// 创建的路径
    }

    public void put() throws Exception {
        // 对路径节点赋值
        zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8")));
    }

    public void get() throws Exception {
        String path = "zk/test";
        ZKWatch watch = new ZKWatch(path);
        byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path);
        System.out.println(new String(buffer, charset));
        // 添加session过期的监控
        addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);
    }

    public void register() throws Exception {
        String ip = InetAddress.getLocalHost().getHostAddress();
        String registeNode = "zk/register/" + ip;// 节点路径
        byte[] data = "disable".getBytes(charset);// 节点值
        CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 创建一个register watcher
        Stat stat = zkTools.checkExists().forPath(registeNode);
        if (stat != null) {
            zkTools.delete().forPath(registeNode);
        }
        zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 创建的路径和值
        // 添加到session过期监控事件中
        addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher);
        data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);
        System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset));
    }

    public static void main(String[] args) throws Exception {
        CuratorTest test = new CuratorTest();
        test.get();
        test.register();
        Thread.sleep(10000000000L);
    }


    public class ZKWatch implements CuratorWatcher {
        private final String path;

        public String getPath() {
            return path;
        }

        public ZKWatch(String path) {
            this.path = path;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            if (event.getType() == EventType.NodeDataChanged) {
                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);
                System.out.println(path + ":" + new String(data, Charset.forName("utf-8")));
            }
        }
    }

    public class ZKWatchRegister implements CuratorWatcher {
        private final String path;
        private byte[] value;

        public String getPath() {
            return path;
        }

        public ZKWatchRegister(String path, byte[] value) {
            this.path = path;
            this.value = value;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            if (event.getType() == EventType.NodeDataChanged) {
                // 节点数据改变了,需要记录下来,以便session过期后,能够恢复到先前的数据状态
                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);
                value = data;
                System.out.println(path + ":" + new String(data, charset));
            } else if (event.getType() == EventType.NodeDeleted) {
                // 节点被删除了,需要创建新的节点
                System.out.println(path + ":" + path + " has been deleted.");
                Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);
                if (stat == null) {
                    zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                }
            } else if (event.getType() == EventType.NodeCreated) {
                // 节点被创建时,需要添加监听事件(创建可能是由于session过期后,curator的状态监听部分触发的)
                System.out.println(path + ":" + " has been created!" + "the current data is "
                        + new String(value));
                zkTools.setData().forPath(path, value);
                zkTools.getData().usingWatcher(this).forPath(path);
            }
        }
    }
    public enum ZookeeperWatcherType {
        GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS
    }

}

 

本文转载自:http://blog.sina.com.cn/s/blog_616e189f0101891e.html

上一篇: windows下安装git
下一篇: C3P0连接池配置
粉丝 7
博文 256
码字总数 22313
作品 0
深圳
私信 提问
使用Curator实现的zookeeper分布式锁出现的Unimplemented for {root.path}

问题描述 Curator使用 ZooKeeper 作为分布式锁,启动时发生该异常。 Curator 客户端版本:curator-recipes-2.10.0 ZooKeeper 服务器版本:3.4.13 异常日志 以及 问题分析 UnimplementedExcep...

loubobooo
02/27
121
0
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh
2018/04/28
0
0
分布式利器Zookeeper(三)

前言 《分布式利器Zookeeper(一)》 《分布式利器Zookeeper(二):分布式锁》 本篇博客是分布式利器Zookeeper系列的最后一篇,涉及的话题是:Zookeeper分布式锁的代码实现、zkclient的使用、...

zfz_linux_boy
2018/07/01
0
0
Apache Curator 3.0.0 发布,ZooKeeper 客户端简化

Apache Curator 3.0.0 发布,该版本带来的对 ZooKeeper 的新的动态配置 APIs 的兼容,更新内容如下: 子任务 [CURATOR-160] - Support Dynamic Reconfig [CURATOR-161] - Support Watcher R...

oschina
2015/10/15
2K
3
Apache Curator 2.7.1 发布

Apache Curator 2.7.1 发布了,zookeeper 的客户端调用过于复杂,Curator提供了对zookeeper客户端的封装,Apache Curator 就是为了简化zookeeper客户端调用而生,利用它,可以更好的使用zoo...

凯文加内特
2015/03/02
133
0

没有更多内容

加载失败,请刷新页面

加载更多

关于AsyncTask的onPostExcute方法是否会在Activity重建过程中调用的问题

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/XG1057415595/article/details/86774575 假设下面一种情况...

shzwork
今天
6
0
object 类中有哪些方法?

getClass(): 获取运行时类的对象 equals():判断其他对象是否与此对象相等 hashcode():返回该对象的哈希码值 toString():返回该对象的字符串表示 clone(): 创建并返此对象的一个副本 wait...

happywe
今天
6
0
Docker容器实战(七) - 容器中进程视野下的文件系统

前两文中,讲了Linux容器最基础的两种技术 Namespace 作用是“隔离”,它让应用进程只能看到该Namespace内的“世界” Cgroups 作用是“限制”,它给这个“世界”围上了一圈看不见的墙 这么一...

JavaEdge
今天
8
0
文件访问和共享的方法介绍

在上一篇文章中,你了解到文件有三个不同的权限集。拥有该文件的用户有一个集合,拥有该文件的组的成员有一个集合,然后最终一个集合适用于其他所有人。在长列表(ls -l)中这些权限使用符号...

老孟的Linux私房菜
今天
7
0
面试套路题目

作者:抱紧超越小姐姐 链接:https://www.nowcoder.com/discuss/309292?type=3 来源:牛客网 面试时候的潜台词 抱紧超越小姐姐 编辑于 2019-10-15 16:14:56APP内打开赞 3 | 收藏 4 | 回复24 ...

MtrS
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部