文档章节

RPC的简单实现

w
 wall--e
发布于 2016/05/14 12:02
字数 1968
阅读 324
收藏 20
点赞 5
评论 3

RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

 

源码托管在 Git@OSC

 

原理

1. Client端获取一个 RPC 代理对象 proxy

2. 调用 proxy 上的方法, 被 InvocationHandler 实现类 Invoker 的 invoke() 方法捕获

3. invoke() 方法内将 RPC 请求封装成 Invocation 实例, 再向 Server 发送 RPC请求

4. Server端循环接收 RPC请求, 对每一个请求都创建一个 Handler线程处理

5. Handler线程从输入流中反序列化出 Invocation实例, 再调用 Server端的实现方法

6. 调用结束, 向 Client端返回调用结果

 

一. Invoker 类

    InvocationHandler 的实现类

/**
 * InvocationHandler 接口的实现类 <br>
 * Client端代理对象的方法调用都会被 Invoker 的 invoke() 方法捕获
 */
public class Invoker implements InvocationHandler {
	/** RPC协议接口的 Class对象 */
	private Class<?> intface;
	/** Client 端 Socket */
	private Socket client;
	/** 用于向 Server端发送 RPC请求的输出流 */
	private ObjectOutputStream oos;
	/** 用于接收 Server端返回的 RPC请求结果的输入流 */
	private ObjectInputStream ois;

	/**
	 * 构造一个 Socket实例 client, 并连接到指定的 Server端地址, 端口
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param serverAdd
	 *            Server端地址
	 * @param serverPort
	 *            Server端监听的端口
	 */
	public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException {
		this.intface = intface;
		client = new Socket(serverAdd, serverPort);
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

		try {
			// 封装 RPC请求
			Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args);
			// 打开 client 的输出流
			oos = new ObjectOutputStream(client.getOutputStream());
			// 序列化, 将 RPC请求写入到 client 的输出流中
			oos.writeObject(invocation);
			oos.flush();

			// 等待 Server端返回 RPC请求结果 //

			// 打开 client 的输入流
			ois = new ObjectInputStream(client.getInputStream());
			// 反序列化, 从输入流中读取 RPC请求结果
			Object res = ois.readObject();
			// 向 client 返回 RPC请求结果
			return res;
		} finally { // 关闭资源
			CloseUtil.closeAll(ois, oos);
			CloseUtil.closeAll(client);
		}
	}
}

 

二. Invocation 类

    Serializable 的实现类, RPC请求的封装

/**
 * RPC调用的封装, 包括以下字段: <br>
 * methodName: 方法名 <br>
 * parameterTypes: 方法参数列表的 Class 对象数组 <br>
 * params: 方法参数列表
 */
@SuppressWarnings("rawtypes")
public class Invocation implements Serializable {
	private static final long serialVersionUID = -7311316339835834851L;
	/** RPC协议接口的 Class对象 */
	private Class<?> intface;
	/** 方法名 */
	private String methodName;
	/** 方法参数列表的 Class 对象数组 */
	private Class[] parameterTypes;
	/** 方法的参数列表 */
	private Object[] params;

	public Invocation() {
	}

	/**
	 * 构造一个 RPC请求的封装
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param methodName
	 *            方法名
	 * @param parameterTypes
	 *            方法参数列表的 Class 对象数组
	 * @param params
	 *            方法的参数列表
	 */
	public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) {
		this.intface = intface;
		this.methodName = methodName;
		this.parameterTypes = parameterTypes;
		this.params = params;
	}

	public Class getIntface() {
		return intface;
	}

	public String getMethodName() {
		return methodName;
	}

	public Class[] getParameterTypes() {
		return parameterTypes;
	}

	public Object[] getParams() {
		return params;
	}
}

 

三.RPC 类

    构造 Client端代理对象, Server端实例

/**
 * 一个构造 Server 端实例与 Client 端代理对象的类
 */
public class RPC {

	/**
	 * 获取一个 Client 端的代理对象
	 * 
	 * @param intface
	 *            RPC协议接口, Client 与 Server 端共同遵守
	 * @param serverAdd
	 *            Server 端地址
	 * @param serverPort
	 *            Server 端监听的端口
	 * @return Client 端的代理对象
	 */
	public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort)
			throws UnknownHostException, IOException {

		Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface },
				new Invoker(intface, serverAdd, serverPort));
		return proxy;
	}

	/**
	 * 获取 RPC 的 Server 端实例
	 * 
	 * @param intface
	 *            RPC协议接口
	 * @param intfaceImpl
	 *            Server 端 RPC协议接口的实现
	 * @param port
	 *            Server 端监听的端口
	 * @return RPCServer 实例
	 */
	public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
		return new RPCServer(intface, intfaceImpl, port);
	}

}

 

四. RPCServer 类

    Server端接收 RPC请求, 处理请求

/**
 * RPC 的 Server端
 */
public class RPCServer {
	/** Server端的 ServerSocket实例 */
	private ServerSocket server;
	/** Server端 RPC协议接口的实现缓存, 一个接口对应一个实现类的实例 */
	private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>();

	/**
	 * 构造一个 RPC 的 Server端实例
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param intfaceImpl
	 *            Server端 RPC协议接口的实现
	 * @param port
	 *            Server端监听的端口
	 */
	public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
		server = new ServerSocket(port);
		RPCServer.intfaceImpls.put(intface, intfaceImpl);
	}

	/**
	 * 循环监听并接收 Client端连接, 处理 RPC请求, 向 Client端返回结果
	 */
	public void start() {
		try {
			while (true) {
				// 接收 Client端连接, 创建一个 Handler线程, 处理 RPC请求
				new Handler(server.accept()).start();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally { // 关闭资源
			CloseUtil.closeAll(server);
		}
	}

	/**
	 * 向 RPC协议接口的实现缓存中添加缓存
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param intfaceImpl
	 *            Server端 RPC协议接口的实现
	 */
	public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) {
		RPCServer.intfaceImpls.put(intface, intfaceImpl);
	}

	/**
	 * 处理 RPC请求的线程类
	 */
	private static class Handler extends Thread {
		/** Server端接收到的 Client端连接 */
		private Socket client;
		/** 用于接收 client 的 RPC请求的输入流 */
		private ObjectInputStream ois;
		/** 用于向 client 返回 RPC请求结果的输出流 */
		private ObjectOutputStream oos;
		/** RPC请求的封装 */
		private Invocation invocation;

		/**
		 * 用 Client端连接构造 Handler线程
		 * 
		 * @param client
		 */
		public Handler(Socket client) {
			this.client = client;
		}

		@Override
		public void run() {
			try {
				// 打开 client 的输入流
				ois = new ObjectInputStream(client.getInputStream());
				// 反序列化, 从输入流中读取 RPC请求的封装
				invocation = (Invocation) ois.readObject();
				// 从 RPC协议接口的实现缓存中获取实现
				Object intfaceImpl = intfaceImpls.get(invocation.getIntface());
				// 获取 Server端 RPC协议接口的方法实现
				Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(),
						invocation.getParameterTypes());
				// 跳过安全检查
				method.setAccessible(true);
				// 调用具体的实现方法, 用 res 接收方法返回结果
				Object res = method.invoke(intfaceImpl, invocation.getParams());
				// 打开 client 的输出流
				oos = new ObjectOutputStream(client.getOutputStream());
				// 序列化, 向输出流中写入 RPC请求的结果
				oos.writeObject(res);
				oos.flush();
			} catch (Exception e) {
				e.printStackTrace();
			} finally { // 关闭资源
				CloseUtil.closeAll(ois, oos);
				CloseUtil.closeAll(client);
			}
		}
	}
}

 

五. 测试类

    Login类, RPC协议接口

/**
 * RPC协议接口, Client 与 Server端共同遵守
 */
public interface Login {
	/**
	 * 抽象方法 login(), 模拟用户登录传入两个String 类型的参数, 返回 String类型的结果
	 * 
	 * @param username
	 *            用户名
	 * @param password
	 *            密码
	 * @return 返回登录结果
	 */
	public String login(String username, String password);
}

 

LoginImpl类, Server 端 RPC协议接口( Login )的实现类

/**
 * Server端 RPC协议接口( Login )的实现类
 */
public class LoginImpl implements Login {
	/**
	 * 实现 login()方法, 模拟用户登录
	 * 
	 * @param username
	 *            用户名
	 * @param password
	 *            密码
	 * @return hello 用户名
	 */
	@Override
	public String login(String username, String password) {
		return "hello " + username;
	}
}

 

ClientTest类, Client端测试类

/**
 * Client端测试类
 */
public class ClientTest {
	public static void main(String[] args) throws UnknownHostException, IOException {
		// 获取一个 Client端的代理对象 proxy
		Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888);
		// 调用 proxy 的 login() 方法, 返回值为 res
		String res = proxy.login("rpc", "password");
		// 输出 res
		System.out.println(res);
	}
}

 

ServerTest类, Server端测试类

/**
 * Server端测试类
 */
public class ServerTest {
	public static void main(String[] args) throws ClassNotFoundException, IOException {
		// 获取 RPC 的 Server 端实例 server
		RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888);
		// 循环监听并接收 Client 端连接, 处理 RPC 请求, 向 Client 端返回结果
		server.start();
	}
}

 

运行 ServerTest, 控制台输出: 

Starting Socket Handler for port 8888

 

运行 ClientTest, 控制台输出: 

hello rpc

 

至此, 实现了基于 Proxy, Socket, IO 的简单版 RPC模型,

对于每一个 RPC请求, Server端都开启一个 Handler线程处理该请求,

在高并发情况下, Server端是扛不住的, 改用 NIO应该表现更好

JDK动态代理的简单实现

Hadoop中RPC机制简介

源码托管在 Git@OSC

© 著作权归作者所有

共有 人打赏支持
w
粉丝 4
博文 31
码字总数 25016
作品 0
东城
程序员
加载中

评论(3)

i
iqilian
已阅,很好!:smile:
w
wall--e

引用来自“紫电清霜”的评论

00Mark
谢谢, 谢谢支持
紫电清霜
紫电清霜
00Mark
轻量级rpc框架--bbossgroups RPC

bbossgroups RPC 是基于bbossaop的轻量级rpc框架,感兴趣的朋友可以用一用。bbossgroups提供的RPC框架是bboss aop子项目中一个子模块,具有以下特点: 1.支持多种通讯协议jms,jgroups,min...

bboss
2010/04/07
1K
0
基于 Swoole 的轻量级高性能框架 swoolefy 1.0.6 发布

swoolefy是基于swoole实现的轻量级高性能框架,框架支持http,websocket,udp服务器,以及基于tcp实现可扩展的rpc服务,同时支持composer包方式安装部署项目。基于实用,swoolefy抽象Event事...

bingcool
06/11
0
0
Java for Web学习笔记(九四):消息和集群(9)RabbitMQ和消息模式(下)

例子:RPC的实现 这是一个通过AMQP使用RPC的例子,RPC是个同步的处理,需要等待响应。在实际应用需要特别消息,server可能性能很慢,server可能关闭。我们是否一定要使用RPC,是否可以用异步...

flowingflying
2017/11/12
0
0
Zookeeper实现简单的分布式RPC框架

在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Nettyr实现一个简单的分布式RPC框架。 首...

闪电
2015/05/10
0
0
基于 Netty 自己动手编写 RPC 框架

  今天我们要来做一道小菜,这道菜就是RPC通讯框架。它使用netty作为原料,fastjson序列化工具作为调料,来实现一个极简的多线程RPC服务框架。   我们暂且命名该RPC框架为rpckids。   ...

深度学习
04/16
0
0
Python3简单使用xmlrpc实现RPC

RPC 先说说什么是RPC,RPC(Remote Procedure Call)——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如T...

Cloudox_
2017/12/25
0
0
给自己的Blog程序添加对Windows Live Writer的支持

Windows Live Writer是Microsoft推出的脱离浏览器的Blog编辑和发布的工具, 支持Html和简单的图片编辑上传, 使用它, 可以极大的方便Blog的编辑和发布. 现在Windows Live Writer对国外较大的B...

鉴客
2011/12/27
148
0
基于ActiveMQ的RPC实现

上一篇文章《RPC的简单实现》中提到,可以利用MOM来实现RPC。这里就用实例来讲解如何使用Spring+ActiveMQ实现简易的rpc(这里简易的概念是指基本的远程调用,没有监控等功能) 和传统的RPC框...

chaney
2015/02/28
0
0
simpleRpc解析-服务端

本文主要是对勇哥的simpleRpc进行了简单的剖析,用来学习rpc,加深对rpc的理解! 源码地址:http://git.oschina.net/huangyong/rpc 勇哥博客:https://my.oschina.net/huangyong/blog/36175...

涩谷直子
06/22
0
0
C++的XMLRPC实现--XMLRPC++

XmlRpc++ 是 C++ 语言对 XML-RPC 协议的实现。XML-RPC 协议的目的是使远程过程调用简单:它编码数据的一个简单的XML格式,并使用HTTP进行通信。 XmlRpc + +的设计可以很容易地纳入的XML - R...

匿名
2009/03/31
5.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

about git flow

  昨天元芳做了git分支管理规范的分享,为了拓展大家关于git分支的认知,这里我特意再分享这两个关于git flow的链接,大家可以看一下。 Git 工作流程 Git分支管理策略   git flow本质上是...

qwfys
今天
1
0
Linux系统日志文件

/var/log/messages linux系统总日志 /etc/logrotate.conf 日志切割配置文件 参考https://my.oschina.net/u/2000675/blog/908189 dmesg命令 dmesg’命令显示linux内核的环形缓冲区信息,我们可...

chencheng-linux
今天
1
0
MacOS下给树莓派安装Raspbian系统

下载镜像 前往 树莓派官网 下载镜像。 点击 最新版Raspbian 下载最新版镜像。 下载后请,通过 访达 双击解压,或通过 unzip 命令解压。 检查下载的文件 ls -lh -rw-r--r-- 1 dingdayu s...

dingdayu
今天
0
0
spring boot使用通用mapper(tk.mapper) ,id自增和回显等问题

最近项目使用到tk.mapper设置id自增,数据库是mysql。在使用通用mapper主键生成过程中有一些问题,在总结一下。 1、UUID生成方式-字符串主键 在主键上增加注解 @Id @GeneratedValue...

北岩
今天
2
0
告警系统邮件引擎、运行告警系统

告警系统邮件引擎 cd mail vim mail.py #!/usr/bin/env python#-*- coding: UTF-8 -*-import os,sysreload(sys)sys.setdefaultencoding('utf8')import getoptimport smtplibfr......

Zhouliang6
今天
0
0
Java工具类—随机数

Java中常用的生成随机数有Math.random()方法及java.util.Random类.但他们生成的随机数都是伪随机的. Math.radom()方法 在jdk1.8的Math类中可以看到,Math.random()方法实际上就是调用Random类...

PrivateO2
今天
2
0
关于java内存模型、并发编程的好文

Java并发编程:volatile关键字解析    volatile这个关键字可能很多朋友都听说过,或许也都用过。在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果。在...

DannyCoder
昨天
0
0
dubbo @Reference retries 重试次数 一个坑

在代码一中设置 成retries=0,也就是调用超时不用重试,结果DEBUG的时候总是重试,不是0吗,0就不用重试啊。为什么还是调用了多次呢? 结果在网上看到 这篇文章才明白 https://www.cnblogs....

奋斗的小牛
昨天
2
0
数据结构与算法3

要抓紧喽~~~~~~~放羊的孩纸回来喽 LowArray类和LowArrayApp类 程序将一个普通的Java数组封装在LowArray类中。类中的数组隐藏了起来,它是私有的,所以只有类自己的方法才能访问他。 LowArray...

沉迷于编程的小菜菜
昨天
0
0
spring boot应用测试框架介绍

一、spring boot应用测试存在的问题 官方提供的测试框架spring-boot-test-starter,虽然提供了很多功能(junit、spring test、assertj、hamcrest、mockito、jsonassert、jsonpath),但是在数...

yangjianzhou
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部