文档章节

zk实战--rpc框架集群化

xpbob
 xpbob
发布于 2018/07/17 13:26
字数 979
阅读 609
收藏 8
ZK

在看此篇内容时需要浏览下面内容 netty实战--手写rpc框架

前文功能简介以及功能扩充

利用netty来实现一个点对点的rpc调用。客户端和服务端都是靠手写地址进行socket同学的,无法1对多,也无法把服务拆分到不同的机器上进行压力分摊,做不到调用的时候水平扩展。现实的场景是机器单台作战能力比较差劲,但是机器多,可以把不同的运算交给不同的机器来做,这样达到机器的利用。所以有了如下的改造。

  1. 支持服务水平扩展。
  2. 支持不同的服务分布在不同的机器上。

这时候就需要引入zookeeper来做管理。

主体思路

  1. 上线的数据都写入到临时节点里
  2. 通过临时节点来确保,服务器停止之后,节点消失
  3. 自动发现服务的功能,依靠zk的Watcher来发现服务的上线和下线功能。

数据组织形式

zk数据

   /xp
    |------ip:port---services
    |    
    |------ip:port---services
    |       
    |------ip:port---services

zk的数据按照如上情况进行组织,xp为一个永久节点,下面的ip:port组成一个临时节点。分别代表服务的ip和port,当服务停止或者意外被杀以后,对应的ip:host就会消失。

xp是一个永久节点,我们对他进行watch,只要有新的服务启动或者停止都会收到事件。

client维护的结构

|-service---client
|
|-service---client

client维护的结构是service和client连接的对应,一般情况下都是一个server里有多个服务,水平扩展的时候启动多个server即可。所以组织的时候是按照service组织方便查找。

代码实现要点

客户端对永久节点进行watch,这里加入了一个callback,当发生了节点变化的时候重新更新消息。

	public List<String> getInofAndWatcher(final String path, final InfoCallBack callBack) throws Exception {
		List<String> nodeList = zk.getChildren(path, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				if (event.getType() == Event.EventType.NodeChildrenChanged) {
					try {
						List<String> nodeList = zk.getChildren(path, false);
						callBack.getLastList(nodeList);
					} catch (Exception e) {
						e.printStackTrace();
					}

				}
			}
		});
		return nodeList;
	}

发布到zk的信息,就是ip,port,service。

public class ServiceInfo {
	private String ip;
	private int port;
	private List<String> interfaces;
}

会把这些数据通过json格式写入到zk,这里需要写明文,方便查看zk下有哪些服务

	public void addService(ServiceInfo info) {
		zkUtil.create(CommonContext.PATH + "/" + info.toString(), JSONObject.toJSONString(info).getBytes());

	}

客户端在启动或者事件发生的时候进行service更替。

	public void getLastList(List<String> lists) {
		Map<String, Set<Client>> serviceTmp = new HashMap<>();
		if (lists != null && !lists.isEmpty()) {
			for (String node : lists) {
				String info;
				try {
					info = new String(zkUtil.getData(CommonContext.PATH + "/" + node));
					ServiceInfo parse = JSONObject.parseObject(info, ServiceInfo.class);
					String address = parse.toString();
					List<String> interfaces = parse.getInterfaces();
					if (interfaces != null && !interfaces.isEmpty()) {
						for (String service : interfaces) {
							Set<Client> set = serviceTmp.get(service);
							if (set == null) {
								set = new HashSet<Client>();
							}
							if (!set.contains(address)) {
								Client client = new Client();
								client.connect(parse.getIp(), parse.getPort());
								set.add(client);
							}
							serviceTmp.put(service, set);

						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}

			}

		}

		services = serviceTmp;

	}

数据发送的时候选择随机找服务,这样不至于压力都在一个服务上。

	public static void send(RpcRequest request, String className) {
		Set<Client> set = services.get(className);
		if (set != null && !set.isEmpty()) {
			Client[] array = set.toArray(new Client[0]);
			//random 
			Client client = array[new Random().nextInt(array.length)];
			client.write(request);
		}
	}

测试使用

客户端

		new Observer("127.0.0.1:2181");
		ITest proxy = ProxyInterface.getProxy(ITest.class);
		String message = proxy.getMessage();
		System.out.println(message);

服务端

		ITest test= new TestImpl();	
		Invoker.put(test);
		List<String> list = new ArrayList<String>(Invoker.getServices());
		ServiceInfo info = new ServiceInfo("127.0.0.1", 6161, list);
		Publisher pub = new Publisher("127.0.0.1:2181");
		pub.addService(info);
		Server.bind(6161);

总结

利用zk的注册功能,让服务互相发现,并且做到水平扩展。重点就是一个事件机制和临时节点的机制。代码如下 https://github.com/xpbob/lightrpc

© 著作权归作者所有

共有 人打赏支持
xpbob
粉丝 98
博文 95
码字总数 76831
作品 0
高级程序员
私信 提问
XXL-RPC v1.2.2,分布式服务框架,内置原生轻量级注册中心

Release Notes 1、默认通讯方案切换为 Netty,可选方案依赖均调整为 provided 类型;提供强制依赖最小精简选型组合 "netty + hessian + 无注册中心(推荐采用:XXL-RPC原生注册中心)"; 2、X...

许雪里
2018/11/27
754
9
高性能 RPC 框架 Dubbo 从入门到深入-服务注册中心搭建(详细)

一、前言 整体来说,一个公司业务系统的演进流程基本都是从单体应用到多应用。在单体应用时,不同业务模块相互调用直接在本地 JVM 进程内就可以完成,而变为多个应用时,相互之间进行通信的方...

加多
2018/01/26
0
0
Commonrpc 1.0-stable 发布,分布式 RPC 框架

commonrpc 是 一个以netty 传输协议框架为基础。自定义 spring shcema标签的rpc框架,不侵入任何业务代码,插件模式,即插即用;一个高性能分布式rpc框架,支持tcp,http协议,扩展性强。 bu...

liubingsmile
2015/03/29
1K
27
XXL-RPC v1.2.1,分布式服务框架

Release Notes 1、内置注册中心选择ZK时逻辑优化,ZK初始化时unlock逻辑调整,优化断线重连特性; 2、除了springboot类型示例;新增无框架示例项目 "xxl-rpc-executor-sample-frameless"。不...

许雪里
2018/11/11
296
0
Zookeeper实现简单的分布式RPC框架

在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Nettyr实现一个简单的分布式RPC框架。 首...

闪电
2015/05/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

mariadb 内存占用优化

本文由云+社区发表 作者:工程师小熊 摘要:我们在使用mariadb的时候发现有时候不能启动起来,在使用过程中mariadb占用的内存很大,在这里学习下mariadb与内存相关的配置项,对mariadb进行调...

腾讯云加社区
53分钟前
2
0
spring security 自定义登录认证

spring security 自定义认证登录 1.概要 1.1.简介 spring security是一种基于 Spring AOP 和 Servlet 过滤器的安全框架,以此来管理权限认证等。 1.2.spring security 自定义认证流程 1)认证...

EasyProgramming
54分钟前
1
0
PAI通过流式机器学习算法解决实时热点新闻挖掘案例

(机器学习PAI Online Learning模块上线邀测,目前只支持华北2(北京)区域使用,本实验会用到流式机器学习算法) PAI地址:https://data.aliyun.com/product/learn 邀测申请地址:https://dat...

阿里云官方博客
57分钟前
1
0
Win下Jenkins-2.138源码编译及填坑笔记

源码编译篇 1、 安装JDK1.8-181,操作系统添加JDK环境变量。Java -version验证一下。 注:Jenkins2.138版本,JDK必须jkd1.8.0-101以上,不支持Java9,Maven必须3.5.3以上。 2、 解压Maven3....

编程SHA
59分钟前
2
0
Oracle数据库常用函数 转换函数 日期函数 字符型函数 数值函数

在讲解函数的功能和用法之前,先了解一下dual这个表。 dual这个表是一张只有一个字段,一行记录的表。它是一个虚拟表,用来构成select的语法规则。所以我们接下来会用到这个表来讲解常用函数。...

Sakura20
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部