文档章节

RPC的简单实现

w
 wall--e
发布于 2016/05/14 12:02
字数 1968
阅读 352
收藏 20

RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

 

源码托管在 Git@OSC

 

原理

1. Client端获取一个 RPC 代理对象 proxy

2. 调用 proxy 上的方法, 被 InvocationHandler 实现类 Invoker 的 invoke() 方法捕获

3. invoke() 方法内将 RPC 请求封装成 Invocation 实例, 再向 Server 发送 RPC请求

4. Server端循环接收 RPC请求, 对每一个请求都创建一个 Handler线程处理

5. Handler线程从输入流中反序列化出 Invocation实例, 再调用 Server端的实现方法

6. 调用结束, 向 Client端返回调用结果

 

一. Invoker 类

    InvocationHandler 的实现类

/**
 * InvocationHandler 接口的实现类 <br>
 * Client端代理对象的方法调用都会被 Invoker 的 invoke() 方法捕获
 */
public class Invoker implements InvocationHandler {
	/** RPC协议接口的 Class对象 */
	private Class<?> intface;
	/** Client 端 Socket */
	private Socket client;
	/** 用于向 Server端发送 RPC请求的输出流 */
	private ObjectOutputStream oos;
	/** 用于接收 Server端返回的 RPC请求结果的输入流 */
	private ObjectInputStream ois;

	/**
	 * 构造一个 Socket实例 client, 并连接到指定的 Server端地址, 端口
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param serverAdd
	 *            Server端地址
	 * @param serverPort
	 *            Server端监听的端口
	 */
	public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException {
		this.intface = intface;
		client = new Socket(serverAdd, serverPort);
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

		try {
			// 封装 RPC请求
			Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args);
			// 打开 client 的输出流
			oos = new ObjectOutputStream(client.getOutputStream());
			// 序列化, 将 RPC请求写入到 client 的输出流中
			oos.writeObject(invocation);
			oos.flush();

			// 等待 Server端返回 RPC请求结果 //

			// 打开 client 的输入流
			ois = new ObjectInputStream(client.getInputStream());
			// 反序列化, 从输入流中读取 RPC请求结果
			Object res = ois.readObject();
			// 向 client 返回 RPC请求结果
			return res;
		} finally { // 关闭资源
			CloseUtil.closeAll(ois, oos);
			CloseUtil.closeAll(client);
		}
	}
}

 

二. Invocation 类

    Serializable 的实现类, RPC请求的封装

/**
 * RPC调用的封装, 包括以下字段: <br>
 * methodName: 方法名 <br>
 * parameterTypes: 方法参数列表的 Class 对象数组 <br>
 * params: 方法参数列表
 */
@SuppressWarnings("rawtypes")
public class Invocation implements Serializable {
	private static final long serialVersionUID = -7311316339835834851L;
	/** RPC协议接口的 Class对象 */
	private Class<?> intface;
	/** 方法名 */
	private String methodName;
	/** 方法参数列表的 Class 对象数组 */
	private Class[] parameterTypes;
	/** 方法的参数列表 */
	private Object[] params;

	public Invocation() {
	}

	/**
	 * 构造一个 RPC请求的封装
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param methodName
	 *            方法名
	 * @param parameterTypes
	 *            方法参数列表的 Class 对象数组
	 * @param params
	 *            方法的参数列表
	 */
	public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) {
		this.intface = intface;
		this.methodName = methodName;
		this.parameterTypes = parameterTypes;
		this.params = params;
	}

	public Class getIntface() {
		return intface;
	}

	public String getMethodName() {
		return methodName;
	}

	public Class[] getParameterTypes() {
		return parameterTypes;
	}

	public Object[] getParams() {
		return params;
	}
}

 

三.RPC 类

    构造 Client端代理对象, Server端实例

/**
 * 一个构造 Server 端实例与 Client 端代理对象的类
 */
public class RPC {

	/**
	 * 获取一个 Client 端的代理对象
	 * 
	 * @param intface
	 *            RPC协议接口, Client 与 Server 端共同遵守
	 * @param serverAdd
	 *            Server 端地址
	 * @param serverPort
	 *            Server 端监听的端口
	 * @return Client 端的代理对象
	 */
	public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort)
			throws UnknownHostException, IOException {

		Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface },
				new Invoker(intface, serverAdd, serverPort));
		return proxy;
	}

	/**
	 * 获取 RPC 的 Server 端实例
	 * 
	 * @param intface
	 *            RPC协议接口
	 * @param intfaceImpl
	 *            Server 端 RPC协议接口的实现
	 * @param port
	 *            Server 端监听的端口
	 * @return RPCServer 实例
	 */
	public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
		return new RPCServer(intface, intfaceImpl, port);
	}

}

 

四. RPCServer 类

    Server端接收 RPC请求, 处理请求

/**
 * RPC 的 Server端
 */
public class RPCServer {
	/** Server端的 ServerSocket实例 */
	private ServerSocket server;
	/** Server端 RPC协议接口的实现缓存, 一个接口对应一个实现类的实例 */
	private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>();

	/**
	 * 构造一个 RPC 的 Server端实例
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param intfaceImpl
	 *            Server端 RPC协议接口的实现
	 * @param port
	 *            Server端监听的端口
	 */
	public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
		server = new ServerSocket(port);
		RPCServer.intfaceImpls.put(intface, intfaceImpl);
	}

	/**
	 * 循环监听并接收 Client端连接, 处理 RPC请求, 向 Client端返回结果
	 */
	public void start() {
		try {
			while (true) {
				// 接收 Client端连接, 创建一个 Handler线程, 处理 RPC请求
				new Handler(server.accept()).start();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally { // 关闭资源
			CloseUtil.closeAll(server);
		}
	}

	/**
	 * 向 RPC协议接口的实现缓存中添加缓存
	 * 
	 * @param intface
	 *            RPC协议接口的 Class对象
	 * @param intfaceImpl
	 *            Server端 RPC协议接口的实现
	 */
	public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) {
		RPCServer.intfaceImpls.put(intface, intfaceImpl);
	}

	/**
	 * 处理 RPC请求的线程类
	 */
	private static class Handler extends Thread {
		/** Server端接收到的 Client端连接 */
		private Socket client;
		/** 用于接收 client 的 RPC请求的输入流 */
		private ObjectInputStream ois;
		/** 用于向 client 返回 RPC请求结果的输出流 */
		private ObjectOutputStream oos;
		/** RPC请求的封装 */
		private Invocation invocation;

		/**
		 * 用 Client端连接构造 Handler线程
		 * 
		 * @param client
		 */
		public Handler(Socket client) {
			this.client = client;
		}

		@Override
		public void run() {
			try {
				// 打开 client 的输入流
				ois = new ObjectInputStream(client.getInputStream());
				// 反序列化, 从输入流中读取 RPC请求的封装
				invocation = (Invocation) ois.readObject();
				// 从 RPC协议接口的实现缓存中获取实现
				Object intfaceImpl = intfaceImpls.get(invocation.getIntface());
				// 获取 Server端 RPC协议接口的方法实现
				Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(),
						invocation.getParameterTypes());
				// 跳过安全检查
				method.setAccessible(true);
				// 调用具体的实现方法, 用 res 接收方法返回结果
				Object res = method.invoke(intfaceImpl, invocation.getParams());
				// 打开 client 的输出流
				oos = new ObjectOutputStream(client.getOutputStream());
				// 序列化, 向输出流中写入 RPC请求的结果
				oos.writeObject(res);
				oos.flush();
			} catch (Exception e) {
				e.printStackTrace();
			} finally { // 关闭资源
				CloseUtil.closeAll(ois, oos);
				CloseUtil.closeAll(client);
			}
		}
	}
}

 

五. 测试类

    Login类, RPC协议接口

/**
 * RPC协议接口, Client 与 Server端共同遵守
 */
public interface Login {
	/**
	 * 抽象方法 login(), 模拟用户登录传入两个String 类型的参数, 返回 String类型的结果
	 * 
	 * @param username
	 *            用户名
	 * @param password
	 *            密码
	 * @return 返回登录结果
	 */
	public String login(String username, String password);
}

 

LoginImpl类, Server 端 RPC协议接口( Login )的实现类

/**
 * Server端 RPC协议接口( Login )的实现类
 */
public class LoginImpl implements Login {
	/**
	 * 实现 login()方法, 模拟用户登录
	 * 
	 * @param username
	 *            用户名
	 * @param password
	 *            密码
	 * @return hello 用户名
	 */
	@Override
	public String login(String username, String password) {
		return "hello " + username;
	}
}

 

ClientTest类, Client端测试类

/**
 * Client端测试类
 */
public class ClientTest {
	public static void main(String[] args) throws UnknownHostException, IOException {
		// 获取一个 Client端的代理对象 proxy
		Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888);
		// 调用 proxy 的 login() 方法, 返回值为 res
		String res = proxy.login("rpc", "password");
		// 输出 res
		System.out.println(res);
	}
}

 

ServerTest类, Server端测试类

/**
 * Server端测试类
 */
public class ServerTest {
	public static void main(String[] args) throws ClassNotFoundException, IOException {
		// 获取 RPC 的 Server 端实例 server
		RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888);
		// 循环监听并接收 Client 端连接, 处理 RPC 请求, 向 Client 端返回结果
		server.start();
	}
}

 

运行 ServerTest, 控制台输出: 

Starting Socket Handler for port 8888

 

运行 ClientTest, 控制台输出: 

hello rpc

 

至此, 实现了基于 Proxy, Socket, IO 的简单版 RPC模型,

对于每一个 RPC请求, Server端都开启一个 Handler线程处理该请求,

在高并发情况下, Server端是扛不住的, 改用 NIO应该表现更好

JDK动态代理的简单实现

Hadoop中RPC机制简介

源码托管在 Git@OSC

© 著作权归作者所有

共有 人打赏支持
上一篇: GC中的对象自救
下一篇: volatile与可见性
w
粉丝 6
博文 31
码字总数 25016
作品 0
东城
程序员
私信 提问
加载中

评论(3)

i
iqilian
已阅,很好!:smile:
w
wall--e

引用来自“紫电清霜”的评论

00Mark
谢谢, 谢谢支持
紫电清霜
紫电清霜
00Mark
轻量级rpc框架--bbossgroups RPC

bbossgroups RPC 是基于bbossaop的轻量级rpc框架,感兴趣的朋友可以用一用。bbossgroups提供的RPC框架是bboss aop子项目中一个子模块,具有以下特点: 1.支持多种通讯协议jms,jgroups,min...

bboss
2010/04/07
1K
0
Java for Web学习笔记(九四):消息和集群(9)RabbitMQ和消息模式(下)

例子:RPC的实现 这是一个通过AMQP使用RPC的例子,RPC是个同步的处理,需要等待响应。在实际应用需要特别消息,server可能性能很慢,server可能关闭。我们是否一定要使用RPC,是否可以用异步...

flowingflying
2017/11/12
0
0
基于 Swoole 的轻量级高性能框架 swoolefy 1.0.6 发布

swoolefy是基于swoole实现的轻量级高性能框架,框架支持http,websocket,udp服务器,以及基于tcp实现可扩展的rpc服务,同时支持composer包方式安装部署项目。基于实用,swoolefy抽象Event事...

bingcool
06/11
0
0
Zookeeper实现简单的分布式RPC框架

在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Nettyr实现一个简单的分布式RPC框架。 首...

闪电
2015/05/10
0
0
基于 Netty 自己动手编写 RPC 框架

  今天我们要来做一道小菜,这道菜就是RPC通讯框架。它使用netty作为原料,fastjson序列化工具作为调料,来实现一个极简的多线程RPC服务框架。   我们暂且命名该RPC框架为rpckids。   ...

深度学习
04/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MySQL Replication 梳理详解

MySQL Replication 1 MySQL5.5以前的复制 异步、SQL线程串行化回放 MySQL内建的复制功能是构建大型,高性能应用程序的基础。主服务器将更新写入二进制日志文件,从服务器重新执行一遍来实现的...

PeakFang-BOK
27分钟前
1
0
.NET Core & ConsoleApp & appsettings.json

准备 Visual Studio 2017 .NET Core 2.1 新建控制台应用(.NET Core) 默认的 Program.cs // Program.csusing System;namespace ConsoleApp1{ class Program { static voi......

taadis
37分钟前
1
0
结合lucene谈谈日期的压缩问题

说起日期值的压缩,一般容易想到的办法是将日期转化成long类型,然后再通过变长整形进行压缩,我算了一下按照毫秒来算最多占用5个字节(可以通过“谈谈变长整型”中的表查看),确实节省了部...

FAT_mt
今天
1
0
导出私有函数与私有变量

在Go语言中, package中包含函数与变量通过identifier的首字母是否大写来决定它是否可以被其它package所访问。当一个函数或变量名称为小写字母时,默认是无法被其他package引用的. 有没有办法...

xtof
今天
2
0
new Date() 在Safari下的 Invalid Date问题

问题复现 var timeStr = '2018-11-11 00:00:00';var time = new Date(timeStr);// error: Invalid Date... 在safari浏览器下,time为Invalid Date, 导致后面代码执行错误; 其他浏览器诸...

会写代码的husky
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部