Dubbo扩展点机制详解
Dubbo扩展点机制详解
学渣的第六感 发表于9个月前
Dubbo扩展点机制详解
  • 发表于 9个月前
  • 阅读 140
  • 收藏 0
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

摘要: Dubbo扩展点机制详解

一 简述

Dubbo的架构是一个微内核型的,很多组件都是可插拔的。为了解耦,很多模块都是面向接口编程,比如常见的Procotol,在dubbo中默认是dubbo协议,也可以是其他rmi协议等。如何能够使得dubbo灵活的使用接口的不同实现类,这就是本节内容讲述的重点。

二 dubbo 动态加载基本原理

如何动态选择接口的实现类,dubbo采用了一种叫扩展点加载的机制。Dubbo采用了类似java spi(service provider interface),因为java的spi有一些缺点(就是一个list),而dubbo对此进行了改进(改成了map),而且还添加了一些动态的加载方法。对于一个接口的实现类,dubbo不是加载一个class,而是通过拼接java代码,再对java进行编译生成一个class,在dubbo中称之为Adaptive,也就是后面我们会经常提到的自适应类。

三 Protocol接口例子

1 Protocol接口加载实现类

废话少说,先举一个栗子

public class ServiceConfig<T> extends AbstractServiceConfig {

    private static final long   serialVersionUID = 3033787999037024738L;

    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

// 略.......
}

这个是服务发布类的部分代码,这个protocol就是以后我们经常会用用到的协议,而Protocol只是一个接口,代码如下。

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 * 
 * @author william.liangf
 */
@SPI("dubbo")
public interface Protocol {
    
    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:<br>
     * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
     * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
     * 
     * @param <T> 服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:<br>
     * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
     * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
     * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
     * 
     * @param <T> 服务的类型
     * @param type 服务的类型
     * @param url 远程服务的URL地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议:<br>
     * 1. 取消该协议所有已经暴露和引用的服务。<br>
     * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
     * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
     */
    void destroy();

}

可以看出,这个Protocol不仅仅是个接口,还是由@SPI标注的接口,这个@SPI是dubbo定义的一个标注,具体作用在后面会说到,提前说一句,这个@SPI的内容,就是默认的实现类的选择,在这里也就是”dubbo”,其实这个dubbo仅是一个key,真正的protocol是写在配置文件中的,当然这是后话,后面会详细说明。

2 如何选择实现类

在dubbo内实现了几种不同的协议,比如dubbo,http等,如图所示, 而Protocol有这么多的实现,具体选择哪个协议呢?自然是选择我们在配置文件里写的那个,如果不写,默认就是dubbo协议喽。Protocol的实现类如下所示:

从前面Protocol的例子中看出

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

Protocol的实现类是用ExtensionLoader来载入的,我们就分析一下这个类。

由于ExtensionLoader是个很大的类,我们这里还是以Protocol的加载为线索进行

 

四 扩展点解析

这里就是正式开始解决如何加载实现类了。

ExtensionLoader 简介

Dubbo中所有的实现类都是用ExtensionLoader 类进行加载的,大概的思想就是先把某个接口的所有实现类都加载进来,放在一个map中,而对于一个接口的实现类并不是采用这其中的某一个类,而是生成一个新的类,即Adaptive类,这个类会根据URL等参数,自动从刚才的map中寻找对应的类,达到充分解耦,即根据URL动态选择实现类。

getExtensionLoader

ExtensionLoader 类中有一个static域,EXTENSION_LOADERS,这是一个map,以Class为key,ExtensionLoader为value,即dubbo为每一个Class(接口)都提供一个ExtensionLoader,而这个ExtensionLoader则为各个Class提供具体的加载类的行为。 

3 getAdaptiveExtension

Dubbo中各个Extension利用getAdaptiveExtension()方法真正加载实现类,代码如下所示。首先这是一个标准的单例模式。cachedAdaptiveInstance中没有时,通过createAdaptiveExtension()方法来动态的创建自适应类。在dubbo中,这个真正的实现类,不是一个从classpath中加载的类,而是一个利用java代码拼接出来的自适应类,这个自适应类,很重要,后面会经常提到。

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if(createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
        else {
            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

4 createAdaptiveExtension

在创建自适应类对象时,首先获取自适应类Class

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
    }
}

5 getAdaptiveExtensionClass 

在获取自适应类Class时,先通过getExtensionClasses()方法将所有的实现类都进行加载,放在一个map中,也就是cachedClasses中,将来通过URL来具体决定用哪个实现类后,直接从这个map中获取类就可以;同时创建一个自适应类,cachedAdaptiveClass 这个自适应类通过拼接java代码,后编译而成。

private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

private Map<String, Class<?>> getExtensionClasses() {
       Map<String, Class<?>> classes = cachedClasses.get();
       if (classes == null) {
           synchronized (cachedClasses) {
               classes = cachedClasses.get();
               if (classes == null) {
                   classes = loadExtensionClasses();
                   cachedClasses.set(classes);
               }
           }
       }
       return classes;
}

   // 此方法已经getExtensionClasses方法同步过。
   private Map<String, Class<?>> loadExtensionClasses() {
       final SPI defaultAnnotation = type.getAnnotation(SPI.class);
       if(defaultAnnotation != null) {
           String value = defaultAnnotation.value();
           if(value != null && (value = value.trim()).length() > 0) {
               String[] names = NAME_SEPARATOR.split(value);
               if(names.length > 1) {
                   throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                           + ": " + Arrays.toString(names));
               }
               if(names.length == 1) cachedDefaultName = names[0];
           }
       }
       
       Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
       loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
       loadFile(extensionClasses, DUBBO_DIRECTORY);
       loadFile(extensionClasses, SERVICES_DIRECTORY);
       return extensionClasses;
   }

这个getExtensionClasses()方法其实调用了loadExtensionClasses()方法,这就很类似java的SPI了,通过从约定的路径中读取class文件,放在cachedClasses,将来可以利用URL配置好的key直接读取Class。

6 创建自适应类

Dubbo通过拼接java代码写出一个类的java文件,之后对其进行编译,获取到这个自适应类。

private Class<?> createAdaptiveExtensionClass() {
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}

以Protocol为例,生成的自适应类如下(来源于这篇博客https://my.oschina.net/pingpangkuangmo/blog/508963,如有侵权,请联系我)

class Protocol$Adpative implements Protocol{
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException{
        if (arg0 == null)  { 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 
        }
        if (arg0.getUrl() == null) { 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 
        }
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) {
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 
        }
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)com.alibaba.dubbo.common.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException{
        if (arg1 == null)  { 
            throw new IllegalArgumentException("url == null"); 
        }
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) {
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 
        }
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)com.alibaba.dubbo.common.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public void destroy(){
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
}

从代码中看出,这个自适应类是通过读取URL来动态决定选择哪个实现类的,也就是从getExtension(extName)这行代码决定的,我们再看一眼这个方法。这个方法其实就是从cachedClasses中读取对应的实现类。

public T getExtension(String name) {
   if (name == null || name.length() == 0)
       throw new IllegalArgumentException("Extension name == null");
   if ("true".equals(name)) {
       return getDefaultExtension();
   }
   Holder<Object> holder = cachedInstances.get(name);
   if (holder == null) {
       cachedInstances.putIfAbsent(name, new Holder<Object>());
       holder = cachedInstances.get(name);
   }
   Object instance = holder.get();
   if (instance == null) {
       synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name);
                holder.set(instance);
            }
        }
   }
   return (T) instance;
}

五结束语

好了,dubbo的扩展点机制就介绍到这里,由于dubbo还有active这一块没有涉及到,但是对于目前阅读服务发布的代码已经有很大的帮助了。

  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 0
博文 3
码字总数 6804
×
学渣的第六感
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: