文档章节

跟着实例学习ZooKeeper的用法: Curator扩展库

longbadx
 longbadx
发布于 2015/02/11 15:37
字数 1547
阅读 2184
收藏 11
点赞 0
评论 0

还记得Curator提供哪几个组件吗? 我们不妨回顾一下:

  • Recipes
  • Framework
  • Utilities
  • Client
  • Errors
  • Extensions

前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等。 还有最后一个组件我们还没有介绍,那就是Curator扩展组件。

Recipes组件包含了丰富的Curator应用的组件。 但是这些并不是ZooKeeper Recipe的全部。 大量的分布式应用已经抽象出了许许多多的的Recipe,其中有些还是可以通过Curator来实现。 如果不断都将这些Recipe都增加到Recipes中, Recipes会变得越来越大。 为了避免这种状况, Curator把一些其它的Recipe放在单独的包中, 命名方式就是curator-x-<name>,比如curator-x-discovery, curator-x-rpc。 本文就是介绍curator-x-discovery。

这是一个服务发现的Recipe。
我们在介绍临时节点Ephemeral Node的时候就讲到, 可以通过临时节点创建一个服务注册机制。 服务启动后创建临时节点, 服务断掉后临时节点就不存在了。 这个扩展抽象了这种功能,听过了一套API,可以实现服务发现机制。

服务类

我们先介绍一下例子中的服务类。InstanceDetails定义了服务实例的基本信息,实际中可能会定义更详细的信息。

package com.colobu.zkrecipe.discovery; import org.codehaus.jackson.map.annotate.JsonRootName; /**
 * In a real application, the Service payload will most likely be more detailed
 * than this. But, this gives a good example.
 */ @JsonRootName("details") public class InstanceDetails { private String description; public InstanceDetails() { this("");
    } public InstanceDetails(String description) { this.description = description;
    } public void setDescription(String description) { this.description = description;
    } public String getDescription() { return description;
    }
}

ExampleServer相当与你在分布式环境中的服务应用。 每个服务应用实例都类似这个类, 应用启动时调用start, 关闭时调用close。

package com.colobu.zkrecipe.discovery; import java.io.Closeable; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; /**
 * This shows a very simplified method of registering an instance with the
 * service discovery. Each individual instance in your distributed set of
 * applications would create an instance of something similar to ExampleServer,
 * start it when the application comes up and close it when the application
 * shuts down.
 */ public class ExampleServer implements Closeable { private final ServiceDiscovery<InstanceDetails> serviceDiscovery; private final ServiceInstance<InstanceDetails> thisInstance; public ExampleServer(CuratorFramework client, String path, String serviceName, String description) throws Exception { // in a real application, you'd have a convention of some kind for the // URI layout UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");
        thisInstance = ServiceInstance.<InstanceDetails> builder().name(serviceName).payload(new InstanceDetails(description))
                .port((int) (65535 * Math.random())) // in a real application, // you'd use a common // port .uriSpec(uriSpec).build(); // if you mark your payload class with @JsonRootName the provided // JsonInstanceSerializer will work JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class);
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(path).serializer(serializer)
                .thisInstance(thisInstance).build();
    } public ServiceInstance<InstanceDetails> getThisInstance() { return thisInstance;
    } public void start() throws Exception {
        serviceDiscovery.start();
    } @Override public void close() throws IOException {
        CloseableUtils.closeQuietly(serviceDiscovery);
    }
}

发现中心

DiscoveryExample提供了增加,删除,显示,注册已有的服务的功能。 注意此处服务注册是由ExampleServer自己完成的, 这比较符合实际的情况。 实际情况是服务自己起来后主动注册服务。 但是此处启动又是由DiscoveryExample来调用, 纯粹为了演示使用。 你可以根据你自己的情况合理安排服务的注册和启动。

random命令提供了一个完全由DiscoveryExample控制的服务。 它负责注册一个服务并启动。

调用close就关闭了服务。

package com.colobu.zkrecipe.discovery; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RandomStrategy; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class DiscoveryExample { private static final String PATH = "/discovery/example"; public static void main(String[] args) throws Exception { // This method is scaffolding to get the example up and running TestingServer server = new TestingServer();
        CuratorFramework client = null;
        ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
        Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap(); try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class);
            serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH).serializer(serializer).build();
            serviceDiscovery.start();
            processCommands(serviceDiscovery, providers, client);
        } finally { for (ServiceProvider<InstanceDetails> cache : providers.values()) {
                CloseableUtils.closeQuietly(cache);
            }
            CloseableUtils.closeQuietly(serviceDiscovery);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    } private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery, Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception { // More scaffolding that does a simple command line processor printHelp();
        List<ExampleServer> servers = Lists.newArrayList(); try {
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); boolean done = false; while (!done) {
                System.out.print("> ");
                String line = in.readLine(); if (line == null) { break;
                }
                String command = line.trim();
                String[] parts = command.split("\\s"); if (parts.length == 0) { continue;
                }
                String operation = parts[0];
                String args[] = Arrays.copyOfRange(parts, 1, parts.length); if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                    printHelp();
                } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                    done = true;
                } else if (operation.equals("add")) {
                    addInstance(args, client, command, servers);
                } else if (operation.equals("delete")) {
                    deleteInstance(args, command, servers);
                } else if (operation.equals("random")) {
                    listRandomInstance(args, serviceDiscovery, providers, command);
                } else if (operation.equals("list")) {
                    listInstances(serviceDiscovery);
                }
            }
        } finally { for (ExampleServer server : servers) {
                CloseableUtils.closeQuietly(server);
            }
        }
    } private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery, Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception { // this shows how to use a ServiceProvider // in a real application you'd create the ServiceProvider early for the // service(s) you're interested in if (args.length != 1) {
            System.err.println("syntax error (expected random <name>): " + command); return;
        }
        String serviceName = args[0];
        ServiceProvider<InstanceDetails> provider = providers.get(serviceName); if (provider == null) {
            provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).providerStrategy(new RandomStrategy<InstanceDetails>()).build();
            providers.put(serviceName, provider);
            provider.start();
            Thread.sleep(2500); // give the provider time to warm up - in a real // application you wouldn't need to do this }
        ServiceInstance<InstanceDetails> instance = provider.getInstance(); if (instance == null) {
            System.err.println("No instances named: " + serviceName);
        } else {
            outputInstance(instance);
        }
    } private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception { // This shows how to query all the instances in service discovery try {
            Collection<String> serviceNames = serviceDiscovery.queryForNames();
            System.out.println(serviceNames.size() + " type(s)"); for (String serviceName : serviceNames) {
                Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery.queryForInstances(serviceName);
                System.out.println(serviceName); for (ServiceInstance<InstanceDetails> instance : instances) {
                    outputInstance(instance);
                }
            }
        } finally {
            CloseableUtils.closeQuietly(serviceDiscovery);
        }
    } private static void outputInstance(ServiceInstance<InstanceDetails> instance) {
        System.out.println("\t" + instance.getPayload().getDescription() + ": " + instance.buildUriSpec());
    } private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) { // simulate a random instance going down // in a real application, this would occur due to normal operation, a // crash, maintenance, etc. if (args.length != 1) {
            System.err.println("syntax error (expected delete <name>): " + command); return;
        } final String serviceName = args[0];
        ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() { @Override public boolean apply(ExampleServer server) { return server.getThisInstance().getName().endsWith(serviceName);
            }
        }, null); if (server == null) {
            System.err.println("No servers found named: " + serviceName); return;
        }
        servers.remove(server);
        CloseableUtils.closeQuietly(server);
        System.out.println("Removed a random instance of: " + serviceName);
    } private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers) throws Exception { // simulate a new instance coming up // in a real application, this would be a separate process if (args.length < 2) {
            System.err.println("syntax error (expected add <name> <description>): " + command); return;
        }
        StringBuilder description = new StringBuilder(); for (int i = 1; i < args.length; ++i) { if (i > 1) {
                description.append(' ');
            }
            description.append(args[i]);
        }
        String serviceName = args[0];
        ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString());
        servers.add(server);
        server.start();
        System.out.println(serviceName + " added");
    } private static void printHelp() {
        System.out.println("An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n");
        System.out.println("add <name> <description>: Adds a mock service with the given name and description");
        System.out.println("delete <name>: Deletes one of the mock services with the given name");
        System.out.println("list: Lists all the currently registered services");
        System.out.println("random <name>: Lists a random instance of the service with the given name");
        System.out.println("quit: Quit the example");
        System.out.println();
    }
}

其它扩展

其它两个扩展Curator RPC Proxy(curator-x-rpc)扩展和Service Discovery Server(curator-x-discovery-server)是为了桥接非Java应用的扩展,本系列将不再介绍了。感兴趣的朋友可以看下面的 文档。 Curator Service Discovery Curator RPC Proxy

© 著作权归作者所有

共有 人打赏支持
longbadx
粉丝 6
博文 35
码字总数 35515
作品 0
杭州
程序员
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh ⋅ 04/28 ⋅ 0

[ZooKeeper]基于Java API 实践

前提 建立maven项目中 要导入zookeeper的依赖 我们同时可以打开linux中的zookeeper客户端来验证对比。输入 zkCli.sh 便可以进入zookeeper客户端 。 一、建立连接 直接建立连接后,不进行等待...

瑾兰 ⋅ 06/13 ⋅ 0

Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,...

ZeroOne01 ⋅ 04/29 ⋅ 0

Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,...

ZeroOne01 ⋅ 04/29 ⋅ 0

Spring Cloud Zookeeper 2.0.0 发布,更新 Curator

Spring Cloud Zookeeper 2.0.0 已发布,主要将使用的 Curator 版本更新至了 4.0.1 。 该版本还包含以下 3 个 issue: DiscoveryClient bean not found in DiscoveryClientConfigServiceBoot......

淡漠悠然 ⋅ 昨天 ⋅ 0

ZooKeeper分布式专题与Dubbo微服务入门

ZooKeeper分布式专题与Dubbo微服务入门 网盘地址:https://pan.baidu.com/s/1TN6BlftB2uvvyVR7IDmODQ 密码: e6zt 备用地址(腾讯微云):https://share.weiyun.com/5539X2S 密码:65b36i Zo...

人气王子333 ⋅ 04/17 ⋅ 0

分布式作业 Elastic-Job 快速上手指南,从理论到实战一文搞定!

Elastic-Job支持 JAVA API 和 Spring 配置两种方式配置任务,这里我们使用 JAVA API 的形式来创建一个简单的任务入门,现在都是 Spring Boot 时代了,所以不建议使用 Spring 配置文件的形式。...

Java技术栈 ⋅ 05/22 ⋅ 0

小柒2012/spring-boot-seckill

分布式秒杀系统 开发环境 JDK1.7、Maven、Mysql、Eclipse、SpringBoot1.5.10、zookeeper3.4.6、kafka_2.11、redis-2.8.4、curator-2.10.0 友情提示 由于工作原因,项目正在完善中(仅供参考)...

小柒2012 ⋅ 05/19 ⋅ 0

分布式定时任务Elastic-Job框架在SpringBoot工程中的应用实践(一)

摘要:如何构建具备作业分片和弹性扩缩容的定时任务系统是每个大型业务系统在设计时需要考虑的重要问题? 对于构建一般的业务系统来说,使用Quartz或者Spring Task即可基本满足我们的单体服用...

癫狂侠 ⋅ 05/12 ⋅ 0

阿里巴巴为什么不用 ZooKeeper 做服务发现?

站在未来的路口,回望历史的迷途,常常会很有意思,因为我们会不经意地兴起疯狂的念头,例如如果当年某事提前发生了,而另外一件事又没有发生会怎样?一如当年的奥匈帝国皇位继承人斐迪南大公...

阿里云头条 ⋅ 06/06 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

NFS介绍 NFS服务端安装配置 NFS配置选项

NFS介绍 NFS是Network File System的缩写;这个文件系统是基于网路层面,通过网络层面实现数据同步 NFS最早由Sun公司开发,分2,3,4三个版本,2和3由Sun起草开发,4.0开始Netapp公司参与并主导...

lyy549745 ⋅ 21分钟前 ⋅ 0

Spring AOP 源码分析 - 筛选合适的通知器

1.简介 从本篇文章开始,我将会对 Spring AOP 部分的源码进行分析。本文是 Spring AOP 源码分析系列文章的第二篇,本文主要分析 Spring AOP 是如何为目标 bean 筛选出合适的通知器(Advisor...

java高级架构牛人 ⋅ 44分钟前 ⋅ 0

HTML-标签手册

标签 描述 <!--...--> 定义注释。 <!DOCTYPE> 定义文档类型。 <a> 定义锚。超链接 <abbr> 定义缩写。 <acronym> 定义只取首字母的缩写。 <address> 定义文档作者或拥有者的联系信息。 <apple......

ZHAO_JH ⋅ 46分钟前 ⋅ 0

SylixOS在t_main中使用硬浮点方法

问题描述 在某些使用场景中,应用程序不使用动态加载的方式执行,而是跟随BSP在 t_main 线程中启动,此时应用代码是跟随 BSP 进行编译的。由于 BSP 默认使用软浮点,所以会导致应用代码中的浮...

zhywxyy ⋅ 53分钟前 ⋅ 0

JsBridge原理分析

看了这个Github代码 https://github.com/lzyzsd/JsBridge,想起N年前比较火的Hybrid方案,想看看现在跨平台调用实现有什么新的实现方式。代码看下来之后发现确实有点独特之处,这里先把核心的...

Kingguary ⋅ 今天 ⋅ 0

Intellij IDEA神器常用技巧五-真正常用快捷键(收藏级)

如果你觉得前面几篇博文太啰嗦,下面是博主多年使用Intellij IDEA真正常用快捷键,建议收藏!!! sout,System.out.println()快捷键 fori,for循环快捷键 psvm,main方法快捷键 Alt+Home,导...

Mkeeper ⋅ 今天 ⋅ 0

Java 静态代码分析工具简要分析与使用

本文首先介绍了静态代码分析的基本概念及主要技术,随后分别介绍了现有 4 种主流 Java 静态代码分析工具 (Checkstyle,FindBugs,PMD,Jtest),最后从功能、特性等方面对它们进行分析和比较,...

Oo若离oO ⋅ 今天 ⋅ 0

SpringBoot自动配置小记

spring-boot项目的特色就在于它的自动配置,自动配置就是开箱即用的本源。 不过支持一个子项目的自动配置,往往比较复杂,无论是sping自己的项目,还是第三方的,都是如此。刚接触会有点乱乱...

大_于 ⋅ 今天 ⋅ 0

React jsx 中写更优雅、直观的条件运算符

在这篇文字中我学到了很多知识,同时结合工作中的一些经验也在思考一些东西。比如条件运算符 Conditional Operator condition ? expr_if_true : expr_if_false 在jsx中书写条件语句我们经常都...

开源中国最帅没有之一 ⋅ 今天 ⋅ 0

vim编辑模式与命令模式

5.5 进入编辑模式 从编辑模式返回一般模式“Esc” 5.6 vim命令模式 命令 :“nohl”=no high light 无高亮,取消内容中高亮标记 "x":保存退出,和wq的区别是,当进入一个文件未进行编辑时,使...

弓正 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部