文档章节

基于akka实现RPC

tantexian
 tantexian
发布于 2016/05/27 11:45
字数 1349
阅读 81
收藏 3
目前的工作在基于akka实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用),这篇文章将会介绍一种实现方式。
akka rpc java
目录[-]
akka-rpc(基于akka的rpc的实现)
RPC
实现原理
Server端核心代码
Client端核心代码 
Demo
akka-rpc(基于akka的rpc的实现)

代码:http://git.oschina.net/for-1988/Simples

目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。

RPC

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。

实现原理

整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Class<T> clz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。

Server端核心代码

public class RpcServer extends UntypedActor {
         private Map<String, Object> proxyBeans;

    public RpcServer(Map<Class<?>, Object> beans) {
        proxyBeans = new HashMap<String, Object>();
        for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
                .hasNext();) {
            Class<?> inface = iterator.next();
            proxyBeans.put(inface.getName(), beans.get(inface));
        }
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口实现类的实例
            CallBean event = (CallBean) message;
            ReturnBean bean = new ReturnBean(
                    proxyBeans.get(event.getBeanName()), getSelf());
            getSender().tell(bean, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {
            CallMethod event = (CallMethod) message;
            Object bean = proxyBeans.get(event.getBeanName());
            Object[] params = event.getParams();
            List<Class<?>> paraTypes = new ArrayList<Class<?>>();
            Class<?>[] paramerTypes = new Class<?>[] {};
            if (params != null) {
                for (Object param : params) {
                    paraTypes.add(param.getClass());
                }
            }
            Method method = bean.getClass().getMethod(event.getMethodName(),
                    paraTypes.toArray(paramerTypes));
            Object o = method.invoke(bean, params);
            getSender().tell(o, getSelf());
        }
    }

}
启动Server

public static void main(String[] args) {
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2551)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcServer]"))
                .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("EsbSystem", config);
        
        // Server 加入发布的服务
        Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
        beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
        system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
    }
Client端核心代码 

RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。

public class RpcClient extends Thread {

    private ActorSystem system;

    private ActorRef rpc;

    private ActorRef clientServer;

    private static RpcClient instance = null;

    public RpcClient() {
        this.start();
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2552)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcClient]"))
                .withFallback(ConfigFactory.load());
        system = ActorSystem.create("EsbSystem", config);

        int totalInstances = 100;
        Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
        boolean allowLocalRoutees = false;
        ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
                new AdaptiveLoadBalancingGroup(
                        HeapMetricsSelector.getInstance(),
                        Collections.<String> emptyList()),
                new ClusterRouterGroupSettings(totalInstances, routeesPaths,
                        allowLocalRoutees, "RpcServer"));
        rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
        clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
                "client");
        Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回调事件,恢复当前线程的中断
            @Override
            public void run() {
                synchronized (instance) {
                    System.out.println("notify");
                    instance.notify();
                }
            }
        });

    }

    public static RpcClient getInstance() {
        if (instance == null) {
            instance = new RpcClient();
            synchronized (instance) {
                try {   //中断当前线程,等待加入集群成功后,恢复
                    System.out.println("wait");
                    instance.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return instance;
    }

    public <T> T getBean(Class<T> clz) {
        Future<Object> future = Patterns.ask(clientServer,
                new RpcEvent.CallBean(clz.getName(), clientServer),
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        try {
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            if (o != null) {
                ReturnBean returnBean = (ReturnBean) o;
                return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
                        clientServer, clz);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
RpcClientServer

public class RpcClientServer extends UntypedActor {

    private ActorRef rpc;

    public RpcClientServer(ActorRef rpc) {
        this.rpc = rpc;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {  //向Server发送CallBean请求
            CallBean event = (CallBean) message;
            Future<Object> future = Patterns.ask(rpc, event, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {  //向Server发送方法调用请求
            Future<Object> future = Patterns.ask(rpc, message, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        }
    }
}
RpcBeanProxy,客户端的动态代理类

public class RpcBeanProxy implements InvocationHandler {

    private ActorRef rpcClientServer;

    private Class<?> clz;

    public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
        this.rpcClientServer = rpcClientServer;
        this.clz = clz;
        return Proxy.newProxyInstance(target.getClass().getClassLoader(),
                target.getClass().getInterfaces(), this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        Object result = null;
        RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
                method.getName(), args, clz.getName());
        Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
        result = o;
        return result;
    }

}
Demo

Interface,Client和Server都需要这个类,必须实现序列化

public interface ExampleInterface extends Serializable{
    public String sayHello(String name);
}
实现类,只需要Server端存在这个类。

public class ExampleInterfaceImpl implements ExampleInterface {
    @Override
    public String sayHello(String name) {
        System.out.println("Be Called !");
        return "Hello " + name;
    }
}
Client调用

public static void main(String[] args) {
        RpcClient client = RpcClient.getInstance();
        long start = System.currentTimeMillis();
        
        ExampleInterface example = client.getBean(ExampleInterface.class);
        System.out.println(example.sayHello("rpc"));
        
        long time = System.currentTimeMillis() - start;
        System.out.println("time :" + time);
    }
 


这里第一次调用耗时比较长需要46毫秒,akka会对消息进行优化,调用多次以后时间为 1~2毫秒。

目前还没来得及做性能测试,后面会补充。

本文转载自:

tantexian
粉丝 221
博文 527
码字总数 746616
作品 0
成都
架构师
私信 提问
akka-rpc(基于akka的rpc实现)

akka-rpc(基于akka的rpc的实现) 代码:http://git.oschina.net/for-1988/Simples 目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persisten...

ForEleven
2014/05/23
0
9
Spark2.1.0之内置RPC框架

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80799622 在Spark中很多地方都涉及网络通信,比如Spark各个组件间的消息互通、用户...

泰山不老生
2018/06/27
0
0
Play! Akka Flume实现的完整数据收集

前言 现如今,大数据如火如荼。针对用户行为,用户喜好等后续大数据分析也是十分火热。这个小项目实现了后台数据收集的一系列完整流程。 项目总体流程以及用到的技术 Play ! 作为web服务器,...

blue1110
2015/01/27
0
2
Spring与Akka的集成

概述 近年来随着Spark的火热,Spark本身使用的开发语言Scala、用到的分布式内存文件系统Tachyon(现已更名为Alluxio)以及基于Actor并发编程模型的Akka都引起了大家的注意。了解过Akka或者A...

beliefer
2017/01/04
0
0
Zeroc ICE或者说rpc 其实说是上个时代的东西也不为过 否则dubbo actor的akka为什么会流行

Zeroc ICE或者说rpc 其实说是上个时代的东西也不为过 否则dubbo actor的akka为什么会流行 dubbo最快的模式 dubbo中心节点只是提供服务节点信息 你自己去连接那个节点 直连 有可能比rpc桥接多...

雷兽
2016/10/19
2K
1

没有更多内容

加载失败,请刷新页面

加载更多

Protocol Buffers 简介

文档编辑和持续集成状态: 本文档的 Protocol Buffer 的中文文档使用的是 Asciidoctor 进行编排的 http://docs.ossez.com/protocol-buffers-docs/index.html(本 WIKI 中的内容将会与在线发布...

honeymoose
今天
4
0
uniapp + bootstrapvue 移动/PC 一套搞定 (一)配置bootstrapvue

1.准备文件 自己到DCloud官网: http://dcloud.io/ 去下载官方的IDE Hbuilder,新建一个空的uniapp项目即可。 uniapp框架自带优化的vue,我们仅仅需要准备以下三个文件: bootstrap.min.css ...

panyunxing
今天
12
0
Android Camera原理之camera service类与接口关系

camera service主要是指 frameworks/av/services/camera/下面的代码,最近在看这一块的代码,为了更好地理清这一块的代码,也为了后续学习camera方便一些,我觉得很有必要理一下这一块的整体...

天王盖地虎626
今天
6
0
Golang学习笔记

[TOC] Golang学习笔记 这个学习笔记是最早在1.初,版本左右的时候写的,和当前最新的版本可能会有较大的差异. 因为成文比较早,文章里面又有很多自己的见解,有些东西当时理解的不太透彻可能写错...

我爱吃炒鸡
今天
21
0
科技赋能成效显著!金融壹账通两大赋能项目荣获IDC大奖

7月19日,2019IDC中国未来金融论坛曁颁奖典礼于北京举办。由金融壹账通赋能的长春农商银行多人视频面审智能风控系统、包头农商银行互联网银行SaaS服务两大项目因在项目的创新性、技术领先性、...

IFTNews
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部