文档章节

dubbo源码分析系列——dubbo-rpc-api模块源码分析

杨武兵
 杨武兵
发布于 2016/05/29 20:47
字数 2398
阅读 1472
收藏 19

简化的类图

该图是经过简化后的rpc-api模块的类图,去除了一些非关键的属性和方法定义,也去除了一些非核心的类和接口,只是一个简化了的的示意图,这样大家能够去除干扰看清楚该模块的核心接口极其关系,请点击看大图更清晰一些

核心类说明

Protocol

服务协议。这是rpc模块中最核心的一个类,它定义了rpc的最主要的两个行为即:1、provider暴露远程服务,即将调用信息发布到服务器上的某个URL上去,可以供消费者连接调用,一般是将某个service类的全部方法整体发布到服务器上。2、consumer引用远程服务,即根据service的服务类和provider发布服务的URL转化为一个Invoker对象,消费者可以通过该对象调用provider发布的远程服务。这其实概括了rpc的最为核心的职责,提供了多级抽象的实现、包装器实现等。

AbstractProtocol

Protocol的顶层抽象实现类,它定义了这些属性:1、exporterMap表示发布过的serviceKey和Exporter(远程服务发布的引用)的映射表;2、invokers是一个Invoker对象的集合,表示层级暴露过远程服务的服务执行体对象集合。还提供了一个通用的服务发布销毁方法destroy,该方法是一个通用方法,它清空了两个集合属性,调用了所有invoker的destroy方法,也调用所有exporter对象的unexport方法。

AbstractProxyProtocol

继承自AbstractProtoco的一个抽象代理协议类。它聚合了代理工厂ProxyFactory对象来实现服务的暴露和引用。它的源码如下。

/*
 * Copyright 1999-2012 Alibaba Group.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.alibaba.dubbo.rpc.protocol;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;

/**
 * AbstractProxyProtocol
 * 
 * @author william.liangf
 */
public abstract class AbstractProxyProtocol extends AbstractProtocol {

    private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();;

    private ProxyFactory proxyFactory;

    public AbstractProxyProtocol() {
    }

    public AbstractProxyProtocol(Class<?>... exceptions) {
        for (Class<?> exception : exceptions) {
            addRpcException(exception);
        }
    }

    public void addRpcException(Class<?> exception) {
        this.rpcExceptions.add(exception);
    }

    public void setProxyFactory(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    public ProxyFactory getProxyFactory() {
        return proxyFactory;
    }

    @SuppressWarnings("unchecked")
	public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
        final String uri = serviceKey(invoker.getUrl());//获得Url对应的serviceKey值。
        Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);//根据url获取对应的exporter。
        if (exporter != null) {//如果已经存在,则直接返回,实现接口支持幂等调用。该处难道无须考虑线程安全问题吗?
        	return exporter;
        }
        //执行抽放方法暴露服务。runnable方法的行为有什么约束没有?该处不明确。
        final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
        //调用proxyFactory.getProxy(invoker)来获得invoker的代理对象。
        exporter = new AbstractExporter<T>(invoker) {
            public void unexport() {
                super.unexport();
                exporterMap.remove(uri);
                if (runnable != null) {
                    try {
                        runnable.run();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
            }
        };
        exporterMap.put(uri, exporter);
        return exporter;
    }

    public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
        //先调用doRefer获得服务服务对象,再调用proxyFactory.getInvoker获得invoker对象。
        final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url);
        Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    Result result = tagert.invoke(invocation);
                    Throwable e = result.getException();
                    if (e != null) {
                        for (Class<?> rpcException : rpcExceptions) {
                            if (rpcException.isAssignableFrom(e.getClass())) {
                                throw getRpcException(type, url, invocation, e);
                            }
                        }
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                        e.setCode(getErrorCode(e.getCause()));
                    }
                    throw e;
                } catch (Throwable e) {
                    throw getRpcException(type, url, invocation, e);
                }
            }
        };
        invokers.add(invoker);
        return invoker;
    }

    protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
        RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: "
                + invocation.getMethodName() + ", cause: " + e.getMessage(), e);
        re.setCode(getErrorCode(e));
        return re;
    }

    protected int getErrorCode(Throwable e) {
        return RpcException.UNKNOWN_EXCEPTION;
    }

    /**
     **留给子类实现的真正将类发布到URL上的抽象方法定义,由具体的协议来实现。 
    **/
    protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;

    /**
     **留给子类实现的引用远程服务的抽象方法定义,该方法是将URL和type接口类应用到一个可以远程调用代理对象。
     **/
    protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;

}

ProtocolFilterWrapper

是一个Protocol的支持过滤器的装饰器。通过该装饰器的对原始对象的包装使得Protocol支持可扩展的过滤器链,已经支持的包括ExceptionFilter、ExecuteLimitFilter和TimeoutFilter等多种支持不同特性的过滤器。

  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        //通过该句获得扩展配置的过滤器列表,具体机制需要研究该类的实现。
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            //循环将过滤器列表组装成为过滤器链,目标invoker是最后一个执行的。
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

ProtocolListenerWrapper

一个支持监听器特性的Protocal的包装器。支持两种监听器的功能扩展,分别是:ExporterListener是远程服务发布监听器,可以监听服务发布和取消发布两个事件点;InvokerListener是服务消费者引用调用器的监听器,可以监听引用和销毁两个事件方法。支持可扩展的事件监听模型,目前只提供了一些适配器InvokerListenerAdapter、ExporterListenerAdapter以及简单的过期服务调用监听器DeprecatedInvokerListener。开发者可自行扩展自己的监听器。该类源码如下。

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.rpc.protocol;

import java.util.Collections;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.ExporterListener;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.listener.ListenerExporterWrapper;
import com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper;

/**
 * ListenerProtocol
 * 
 * @author william.liangf
 */
public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol){
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //特殊协议,跳过监听器触发。
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //调用原始协议的发布方法,触发监听器链事件。
        return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                        .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

    public void destroy() {
        protocol.destroy();
    }

}

ProxyFactory

dubbo的代理工厂。定义了两个接口分别是:getProxy根据invoker目标接口的代理对象,一般是消费者获得代理对象触发远程调用;getInvoker方法将代理对象proxy、接口类type和远程服务的URL获取执行对象Invoker,往往是提供者获得目标执行对象执行目标实现调用。AbstractProxyFactory是其抽象实现,提供了getProxy的模版方法实现,使得可以支持多接口的映射。dubbo最终内置了两种动态代理的实现,分别是jdkproxy和javassist。默认的实现使用javassist。为什么选择javassist,梁飞选型的时候做过性能测试对比分析,参考:http://javatar.iteye.com/blog/814426/

Invoker

该接口是服务的执行体。它有获取服务发布的URL,服务的接口类等关键属性的行为;还有核心的服务执行方法invoke,执行该方法后返回执行结果Result,而传递的参数是调用信息Invocation。该接口有大量的抽象和具体实现类。AbstractProxyInvoker是基于代理的执行器抽象实现,AbstractInvoker是通用的抽象实现。

服务发布流程


首先ServiceConfig类拿到对外提供服务的实际类ref(如:HelloWorldImpl),然后通过ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成具体服务到Invoker的转化。接下来就是Invoker转换到Exporter的过程。
Dubbo处理服务暴露的关键就在Invoker转换到Exporter的过程(如上图中的红色部分),下面我们以Dubbo和RMI这两种典型协议的实现来进行说明:

服务引用流程

上图是服务消费的主过程:
首先ReferenceConfig类的init方法调用Protocol的refer方法生成Invoker实例(如上图中的红色部分),这是服务消费的关键。接下来把Invoker转换为客户端需要的接口(如:HelloWorld)。
关于每种协议如RMI/Dubbo/Web service等它们在调用refer方法生成Invoker实例的细节和上一章节所描述的类似。

总结

该模块下设计较为复杂,在设计中可以看出来应用了大量的设计模式,包括模版方法、职责链、装饰器和动态代理等设计模式。掌握该模块下的核心概念对于后续阅读其它部分代码至关重要,后面的其它模块要么是它的实现,要么是由它衍生出来的,要么与它的关系非常紧密。

next

接下来我们看看rpc的默认实现模块——dubbo-rpc-default。该模块提供了默认的dubbo协议的实现,也是默认使用的协议。

© 著作权归作者所有

共有 人打赏支持
杨武兵

杨武兵

粉丝 255
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
加载中

评论(3)

杨武兵
杨武兵

引用来自“erpaoshouling”的评论

擦,没下文了?

已经增加了不少内容了。
杨武兵
杨武兵

引用来自“erpaoshouling”的评论

擦,没下文了?

未完,还在继续写,我只是先发布出来一部分,或者列一个提纲,你可以持续关注。
他好像条狗啊
他好像条狗啊
擦,没下文了?
dubbo源码分析系列——项目工程结构介绍

项目源码地址 本系列文章是基于当当网维护的dubbox版本进行分析的,源码地址参考:https://github.com/dangdangdotcom/dubbox 项目源码结构 我们下载源码后导入到ide中可以看到如此之多的项目...

杨武兵
2016/05/29
745
0
dubbo源码分析系列——dubbo-rpc-default模块源码分析

简化类图 从图中可以看出该模块下的类主要是实现了dubbo-rpc-api和dubbo-remoting-api两个模块中定义的一些接口和抽象类。扩展了一种duubo框架自定义的dubbo协议,包括编解码和方法调用处理等...

杨武兵
2016/05/31
337
0
Dubbo (二) ——- 项目结构解析

本文主要说明点 概述 背景 需求 架构 Dubbo源代码项目结构 概述 分享 Dubbo 的项目结构 ,通过本文可以大致了解到Dubbo整个项目的结构 背景 将一个项目进行拆分, 进行分布式架构。 需要解决...

小刀爱编程
2018/10/16
0
0
dubbo源码分析系列——dubbo的SPI机制源码分析

dubbo的SPI机制 关于dubbo的SPI机制请参阅dubbo开发者文档 -> http://dubbo.io/Developer+Guide-zh.htm#DeveloperGuide-zh-%E6%89%A9%E5%B1%95%E7%82%B9%E5%8A%A0%E8%BD%BD 我只把我自己理解......

杨武兵
2016/06/05
462
4
dubbo源码分析系列(2)服务的发布

1 系列目录 - dubbo源码分析系列(1)扩展机制的实现- dubbo源码分析系列(2)服务的发布- dubbo源码分析系列(3)服务的引用- dubbo源码分析系列(4)dubbo通信设计 2 dubbo与spring接入 du...

乒乓狂魔
2015/09/28
6.3K
4

没有更多内容

加载失败,请刷新页面

加载更多

eggjs与sequelize简单demo

参考 egg 官方文档 安装 // 依赖npm install --save egg-sequelize mysql2// ts 类型npm install --save @types/sequelize 插件,config/plugin.ts import { EggPlugin } from 'egg';......

Geeyu
52分钟前
1
0
看过上百部片子的这个人教你视频标签算法解析

本文由云+社区发表 随着内容时代的来临,多媒体信息,特别是视频信息的分析和理解需求,如图像分类、图像打标签、视频处理等等,变得越发迫切。目前图像分类已经发展了多年,在一定条件下已经...

腾讯云加社区
今天
4
0
2. 红黑树

定义:红黑树(Red-Black Tree,简称R-B Tree),它一种特殊的二叉查找树(Binary Search Tree)。 要理解红黑树,先要了解什么是二叉查找树。在上一章中,我们学习了什么是二叉树,以及二叉树...

火拳-艾斯
今天
3
0
input的button类型,点击页面跳转

一、input type=button 不做任何操作 例如: <input type="button" class="btn btn-primary" style="width: 30%" value="返回" onclick="window.location.href='/users/list'"></input> onc......

Sunki
今天
1
0
踩坑:js 小数运算出现精度问题

背景 在学习小程序商城源码时发现了这个问题,单价可能出现小数,小数之间运算结果会莫名其妙多出一大串数字,比如下面这样👇。 在此之前我是知道 js 中著名的 0.1 + 0.2 != 0.3 的问题的,...

dkvirus
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部