文档章节

Java RPC实现及原理讲解(附git源码地址)

tantexian
 tantexian
发布于 2016/05/27 11:39
字数 1040
阅读 453
收藏 7

输入图片说明

package com.wish.RPC;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * RPC原理解析:
 * 服务器端:
 * 1、RPCServer#registService:主要作用就是提供了一个服务注册管理中心,
 *    用来保存被注册服务(如果是dubbo则是分布式服务框架,对应了不同机器的地址及端口发布的服务(dubbo还使用了zookeeper))
 * 2、RPCServer#startServer:开启一个ServerSocket连接(new 一个ServiceTask服务,使用线程循环监听等待),
 *    等待客户端的远程socket连接调用
 * 3、RPCServer#registService:定义一个注册服务接口。即将所有需要注册的服务保存起来,后续ServiceTask需要使用该接口对象,
 *    动态代理调用该接口对象方法,并将方法返回值通过socket网络通信方式,传递给该服务的Client客户端。
 *    
 * 客户端:
 * 1、RPCClient#findService:根据serviceInterface接口名,通过动态代理生成被请求对象及通过InvocationHandler调用远程方法。
 *    其中InvocationHandler里面,通过传入的ip和prot地址,开启一个socket连接,远程发送调用远端RPCServer注册的服务方法
 *    然后通过远端RPCServer,的socket连接,讲返回对象通过socket网络通信传递过来,这样即获取到了远端服务的返回结果。
 *    
 * 启动服务端:
 * 1、TestRPCServer#main:启动服务端,通过server.registService(new HelloWorld()) ;
 *    注册HelloWorld服务方法到RPCServer
 * 2、TestRPCServer#main:通过server.startServer(51234);启动RPCServer,监听来自client的socket请求
 * 
 * 启动客户端:
 * 1、TestRPCClient#main:通过RPCClient.findService("127.0.0.1" , 51234 , IHelloWorld.class);
 *    调用客户端findService,获取HelloWorld对象,接下来即可以像使用本地一样使用远程服务方法
 * 
 * PS:更多源码请访问:http://git.oschina.net/tantexian/wishRPC
 * 
 * @author tantexian<tantexian@qq.com>
 * @since 2016年5月27日 上午9:44:46
 */
public class RPCServer {

	private static final ExecutorService taskPool = Executors.newFixedThreadPool(50);

	/**
	 * 服务接口对象库 key:接口名 value:接口实现
	 */
	private static final ConcurrentHashMap<String, Object> serviceTargets = new ConcurrentHashMap<String, Object>();

	private static AtomicBoolean run = new AtomicBoolean(false);

	/**
	 * 注册服务
	 * 
	 * @param service
	 */
	public void registService(Object service) {
		Class<?>[] interfaces = service.getClass().getInterfaces();
		if (interfaces == null) {
			throw new IllegalArgumentException("服务对象必须实现接口");
		}
		Class<?> interfacez = interfaces[0];
		String interfaceName = interfacez.getName();
		serviceTargets.put(interfaceName, service);
	}

	/**
	 * 启动Server
	 * 
	 * @param port
	 */
	public void startServer(final int port) {
		Runnable lifeThread = new Runnable() {
			@Override
			public void run() {
				ServerSocket lifeSocket = null;
				Socket client = null;
				ServiceTask serviceTask = null;
				try {
					lifeSocket = new ServerSocket(port);
					run.set(true);
					while (run.get()) {
						client = lifeSocket.accept();
						serviceTask = new ServiceTask(client);
						serviceTask.accept();
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		};
		taskPool.execute(lifeThread);
		System.out.println("服务启动成功...");
	}

	public void stopServer() {
		run.set(false);
		taskPool.shutdown();
	}

	public static final class ServiceTask implements Runnable {

		private Socket client;

		public ServiceTask(Socket client) {
			this.client = client;
		}

		public void accept() {
			taskPool.execute(this);
		}

		@Override
		public void run() {
			InputStream is = null;
			ObjectInput oi = null;
			OutputStream os = null;
			ObjectOutput oo = null;
			try {
				is = client.getInputStream();
				os = client.getOutputStream();
				oi = new ObjectInputStream(is);
				String serviceName = oi.readUTF();
				String methodName = oi.readUTF();
				Class<?>[] paramTypes = (Class[]) oi.readObject();
				Object[] arguments = (Object[]) oi.readObject();
				System.out.println("serviceName:" + serviceName + " methodName:" + methodName);
				Object targetService = serviceTargets.get(serviceName);
				if (targetService == null) {
					throw new ClassNotFoundException(serviceName + "服务未找到!");
				}

				Method targetMethod = targetService.getClass().getMethod(methodName, paramTypes);
				Object result = targetMethod.invoke(targetService, arguments);

				oo = new ObjectOutputStream(os);
				oo.writeObject(result);
			} catch (IOException e) {
				e.printStackTrace();
			} catch (ClassNotFoundException e) {
				e.printStackTrace();
			} catch (SecurityException e) {
				e.printStackTrace();
			} catch (NoSuchMethodException e) {
				e.printStackTrace();
			} catch (IllegalArgumentException e) {
				e.printStackTrace();
			} catch (IllegalAccessException e) {
				e.printStackTrace();
			} catch (InvocationTargetException e) {
				e.printStackTrace();
			} finally {
				try {
					if (oo != null) {
						oo.close();
					}
					if (os != null) {
						os.close();
					}
					if (is != null) {
						is.close();
					}
					if (oi != null) {
						oi.close();
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}

}

package com.wish.RPC;

import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class RPCClient {

	/** 
     * 根据接口类型得到代理的接口实现 
     * @param <T> 
     * @param host  RPC服务器IP 
     * @param port  RPC服务端口 
     * @param serviceInterface  接口类型 
     * @return  被代理的接口实现 
     */  
    @SuppressWarnings("unchecked")  
    public static <T> T findService(final String host , final int port ,final Class<T> serviceInterface){  
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {  
            @SuppressWarnings("resource")
			@Override  
            public Object invoke(final Object proxy, final Method method, final Object[] args)  
            throws Throwable {  
                Socket socket = null ;  
                InputStream is = null ;  
                OutputStream os = null ;  
                ObjectInput oi = null ;  
                ObjectOutput oo = null ;  
                try {  
                    socket = new Socket(host, port) ;  
                    os = socket.getOutputStream() ;  
                    oo = new ObjectOutputStream(os);  
                    oo.writeUTF(serviceInterface.getName()) ;  
                    oo.writeUTF(method.getName()) ;  
                    oo.writeObject(method.getParameterTypes()) ;  
                    oo.writeObject(args);  

                    is = socket.getInputStream() ;  
                    oi = new ObjectInputStream(is) ;  
                    return oi.readObject() ;  
                } catch (Exception e) {  
                    System.out.println("调用服务异常...");  
                    return null ;  
                }finally{  
                    if(is != null){  
                        is.close() ;  
                    }  
                    if(os != null){  
                        is.close() ;  
                    }  
                    if(oi != null){  
                        is.close() ;  
                    }  
                    if(oo != null){  
                        is.close() ;  
                    }  
                    if(socket != null){  
                        is.close() ;  
                    }  
                }  
            }  
        });   
    }  

}

package com.wish.RPC;

public class HelloWorld implements IHelloWorld {

	@Override
	public String sayHello(String name) {
		return "Hello, " + name;
	}

}

package com.wish.RPC;

public interface IHelloWorld {

	String sayHello(String name);
}

package com.wish.RPC;

public class TestRPCServer {

	public static void main(String[] args) {  

        RPCServer server = new RPCServer() ;  
        server.registService(new HelloWorld()) ;  
        server.startServer(51234) ;  
    }  
}

package com.wish.RPC;

public class TestRPCClient {

	public static void main(String[] args) {  

        IHelloWorld helloWorld =   
            RPCClient.findService("127.0.0.1" , 51234 , IHelloWorld.class) ;  
        String  result = helloWorld.sayHello("tantexian, My blog address is: http://my.oschina.net/tantexian/");  
        System.out.println(result );  

    }  
}

© 著作权归作者所有

tantexian
粉丝 218
博文 527
码字总数 746616
作品 0
成都
架构师
私信 提问
Java NIO原理 图文分析及代码实现

Java NIO原理图文分析及代码实现 前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术...

囚兔
2015/04/29
0
0
Java开发者不会这些永远都只能是三流程序员,细数一下你是不是?

源码系列 手写spring mvc框架 基于Spring JDBC手写ORM框架 实现自己的MyBatis Spring AOP实战之源码分析 Spring IOC高级特性应用分析 ORM框架底层实现原理剖析 手写Spring MVC框架实现 手把手...

美的让人心动
2018/04/16
0
0
结合JVM源码谈Java类加载器

一、前言 之前文章 Java 类加载器揭秘 从Java层面讲解了Java类加载器的原理,这里我们结合JVM源码在稍微深入讲解下。 二、Java类加载器的委托机制 Java 类加载器使用的是委托机制,也就是一个...

阿里加多
2018/04/29
0
0
Spring Cloud OpenFeign集成Protocol Buffer

本文作者张天,著有《Spring Cloud 微服务架构进阶》一书。 背景  在之前的文章中,我们介绍过基于Spring Cloud微服务架构,其中,微服务实例之间的交互方式一般为RESTful HTTP请求或RPC调用...

aoho
2018/10/07
0
0
java如何通过web3j开发以太坊dapp?

如何使用web3j为Java应用或Android App增加以太坊区块链支持,教程内容即涉及以太坊中的核心概念,例如账户管理包括账户的创建、钱包创建、交易转账,交易与状态、智能合约开发与交互、过滤器...

geek12345
2018/08/28
416
0

没有更多内容

加载失败,请刷新页面

加载更多

枚举 创建/获取key,name,list

创建枚举 public enum MessageTypeEnum { // 类型:0.一般消息,1.公告消息,2交易消息,3.活动消息,4.其他消息 type_general("一般消息", "0"), type_ann("公告消息", "1")......

龘游戏人生龘
7分钟前
0
0
Linus 本尊来了!为什么 KubeCon 越来越火?

阿里妹导读: 从200人的小会议到3500 多位云原生和开源领域工程师齐聚一堂的大会,KubeCon 只用了四年,昨天,在KubeCon China 2019 上阿里巴巴宣布开源 OpenKruise,今天,Linus 本尊竟然现...

阿里云云栖社区
43分钟前
2
0
五小时构建云原生电商平台 | KubeCon SOFAStack Workshop 详解

本文根据 KubeCon China 2019 同场活动 SOFAStack Cloud Native Workshop 内容整理, 文末包含文档、PPT 地址,欢迎试用和提出建议。 2019 年 6 月 25 日,在 KubeCon China 2019,全球知名开...

SOFAStack
44分钟前
6
0
跨平台开发框架DevExtreme v19.1.4正式发布|附下载

DevExtreme Complete Subscription是性能最优的 HTML5,CSS 和 JavaScript 移动、Web开发框架,可以直接在Visual Studio集成开发环境,构建iOS,Android,Tizen和Windows Phone 8应用程序。D...

FILA6666
45分钟前
2
0
数据库链接断开 Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

报错信息如下: Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failureThe last packet successfully received from the server was 97,130 mill......

为了美好的明天
52分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部