dubbo SPI

原创
2018/11/22 13:58
阅读数 686

dubbo SPI扩展框架

  • dubbo采用微内核+插件式扩展体系,获得极佳的扩展性。
  • dubbo没有借助spring,guice等IOC框架来管理,而是运用JDK SPI思路自己实现了一个IOC框架,减少了对外部框架的依赖
  • 在Dubbo中,SPI是一个非常核心的机制,贯穿在几乎所有的流程中。搞懂这块内容,是接下来了解Dubbo更多源码的关键因素。
  • Dubbo是基于Java原生SPI机制思想的一个改进,所以,先从JAVA SPI机制开始了解什么是SPI以后再去学习Dubbo的SPI,就比较容易了

关于JAVA 的SPI机制

  • SPI全称(service provider interface),是JDK内置的一种服务提供发现机制,
  • 目前市面上有很多框架都是用它来做服务的扩展发现,大家耳熟能详的如JDBC、日志框架都有用到;
  • 简单来说,它是一种动态替换发现的机制。
  • 举个简单的例子,
    • 如果我们定义了一个规范,需要第三方厂商去实现,
    • 那么对于我们应用方来说,只需要集成对应厂商的插件,既可以完成对应规范的实现机制。
    • 形成一种插拔式的扩展手段。
  • SPI规范总结
    • 实现SPI,就需要按照SPI本身定义的规范来进行配置,SPI规范如下:
      • 需要在classpath下创建一个目录,该目录命名必须是:META-INF/services
      • 在该目录下创建一个properties文件,该文件需要满足以下几个条件
        1. 文件名必须是扩展的接口的全路径名称
        2. 文件内部描述的是该扩展接口的所有实现类
        3. 文件的编码格式是UTF-8
      • 通过java.util.ServiceLoader的加载机制来发现
    • SPI的实际应用
      • SPI在很多地方有应用,大家可以看看最常用的java.sql.Driver驱动。
      • JDK官方提供了java.sql.Driver这个驱动扩展点,但是你们并没有看到JDK中有对应的Driver实现。
      • 那在哪里实现呢?
        • 以连接Mysql为例,我们需要添加mysql-connector-java依赖。
        • 你们可以在这个jar包中找到SPI的配置信息。
        • 所以java.sql.Driver由各个数据库厂商自行实现。
      • https://oscimg.oschina.net/oscnet/c549fbdd2a5ae4db14239b180e3efca170b.jpg
    • SPI的缺点
      • JDK标准的SPI会一次性加载实例化扩展点的所有实现
        • 就是如果你在META-INF/service下的文件里面加了N个实现类,那么JDK启动的时候都会一次性全部加载
        • 那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源
      • 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因

dubbo扩展框架特性

  • 1). 内嵌在dubbo中
  • 2). 支持通过SPI文件声明扩展实现(interfce必须有@SPI注解),
    • 格式为extensionName=extensionClassName,extensionName类似于spring的beanName
  • 3). 支持通过配置指定extensionName来从SPI文件中选出对应实现
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("defineProtocol");

Dubbo SPI机制源码阅读

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();
  • 把上面这段代码分成两段,一段是getExtensionLoader、 另一段是getAdaptiveExtension。
    1. 第一段是通过一个Class参数去获得一个ExtensionLoader对象,有点类似一个工厂模式。
    2. 第二段getAdaptiveExtension,去获得一个自适应的扩展点

Extension源码的结构

Protocol 源码

  • 一个是在类级别上的@SPI(“dubbo”)
    • @SPI 表示当前这个接口是一个扩展点,可以实现自己的扩展实现,默认的扩展点是DubboProtocol。
  • 另一个是@Adaptive
    • @Adaptive  表示一个自适应扩展点,在方法级别上,会动态生成一个适配器类
/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 *
 * @author william.liangf
 */
@SPI("dubbo")
public interface Protocol {

    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     *
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:<br>
     * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
     * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
     *
     * @param <T>     服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:<br>
     * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
     * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
     * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
     *
     * @param <T>  服务的类型
     * @param type 服务的类型
     * @param url  远程服务的URL地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议:<br>
     * 1. 取消该协议所有已经暴露和引用的服务。<br>
     * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
     * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
     */
    void destroy();

}

getExtensionLoader

  • 该方法需要一个Class类型的参数,该参数表示希望加载的扩展点类型,该参数必须是接口,且该接口必须被@SPI注解注释,否则拒绝处理。
  • 检查通过之后首先会检查ExtensionLoader缓存中是否已经存在该扩展对应的ExtensionLoader,
    • 如果有则直接返回,否则创建一个新的ExtensionLoader负责加载该扩展实现,同时将其缓存起来。
    @SuppressWarnings("unchecked")
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type +
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }

        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
  • ExtensionLoader提供了一个私有的构造函数,并且在这里面对两个成员变量type/objectFactory进行赋值。而objectFactory赋值的意义是什么呢?先留个悬念
        private ExtensionLoader(Class < ? > type){
            this.type = type;
            objectFactory = (type == ExtensionFactory.class ? null : 
                                        ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
        }

getAdaptiveExtension

  • 通过getExtensionLoader获得了对应的ExtensionLoader实例以后,
  • 再调用getAdaptiveExtension()方法来获得一个自适应扩展点。
  • ps:简单对自适应扩展点做一个解释,大家一定了解过适配器设计模式,而这个自适应扩展点实际上就是一个适配器。

  • 主要做几个事情:

    • 从cacheAdaptiveInstance 这个内存缓存中获得一个对象实例
    • 如果实例为空,说明是第一次加载,则通过双重检查锁的方式去创建一个适配器扩展点
    • createAdaptiveExtension
      • 这段代码里面有两个结构,一个是injectExtension.  另一个是getAdaptiveExtensionClass()

    • ​​​​​
  • getAdaptiveExtensionClass()

    • 从类名来看,是获得一个适配器扩展点的类。

    • 在这段代码中,做了两个事情:

      • getExtensionClasses() 加载所有路径下的扩展点
        •  
      • createAdaptiveExtensionClass() 动态创建一个扩展点
      • cachedAdaptiveClass这里有个判断,用来判断当前Protocol这个扩展点是否存在一个自定义的适配器,
      • 如果有,则直接返回自定义适配器,否则,就动态创建,这个值是在getExtensionClasses中赋值的,这块代码我们稍后再看
      • createAdaptiveExtensionClass

        • 动态生成适配器代码,以及动态编译:

          • createAdaptiveExtensionClassCode,  动态创建一个字节码文件。返回code这个字符串
          • 通过compiler.compile进行编译(默认情况下使用的是javassist
          • 通过ClassLoader加载到jvm中
      • //创建一个适配器扩展点。(创建一个动态的字节码文件)
        private Class<?> createAdaptiveExtensionClass() {
            //生成字节码代码
            String code = createAdaptiveExtensionClassCode();
            //获得类加载器
            ClassLoader classLoader = findClassLoader();
            com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader
                       .getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class)
                       .getAdaptiveExtension();
            //动态编译字节码
            return compiler.compile(code, classLoader);
        }
        
        code的字节码内容:
        •  public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
                      public void destroy() {
                          throw new UnsupportedOperationException("method " +
                                  "public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() " +
                                  "of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
                      }
          
                      public int getDefaultPort() {
                          throw new UnsupportedOperationException("method " +
                                  "public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() " +
                                  "of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
                      }
          
                      public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1)
                              throws com.alibaba.dubbo.rpc.RpcException {
                          if (arg1 == null) throw new IllegalArgumentException("url == null");
                          com.alibaba.dubbo.common.URL url = arg1;
                          String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
                          if (extName == null)
                              throw new IllegalStateException("Fail to " +
                                      "get extension(com.alibaba.dubbo.rpc.Protocol) name " +
                                      "from url(" + url.toString() + ") use keys([protocol])");
                          com.alibaba.dubbo.rpc.Protocol extension =
                                  (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
                                          .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
                          return extension.refer(arg0, arg1);
                      }
          
                      public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0)
                              throws com.alibaba.dubbo.rpc.RpcException {
                          if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
                          if (arg0.getUrl() == null)
                              throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
                          com.alibaba.dubbo.common.URL url = arg0.getUrl();
                          String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
                          if (extName == null)
                              throw new IllegalStateException("Fail to " +
                                      "get extension(com.alibaba.dubbo.rpc.Protocol) name " +
                                      "from url(" + url.toString() + ") use keys([protocol])");
                          com.alibaba.dubbo.rpc.Protocol extension =
                                  (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
                                          .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                                          .getExtension(extName);
                          return extension.export(arg0);
                      }
                  }

          Protocol$Adaptive的主要功能:

          • 1. 从url或扩展接口获取扩展接口实现类的名称;

          • 2.根据名称,获取实现类ExtensionLoader.getExtensionLoader(扩展接口类).getExtension(扩展接口实现类名称),

            • 然后调用实现类的方法。

    • 需要明白一点dubbo的内部传参基本上都是基于Url来实现的,也就是说Dubbo是基于URL驱动的技术

      • 所以,适配器类的目的是在运行期获取扩展的真正实现来调用,解耦接口和实现,

      • 这样的话要不我们自己实现适配器类,要不dubbo帮我们生成,而这些都是通过Adpative来实现。

  • getExtensionClasses

    • 就是加载扩展点实现类了。
    • 这段代码主要做如下几个事情:
      • 从cachedClasses中获得一个结果,
        • 这个结果实际上就是所有的扩展点类,key对应name,value对应class
      • 通过双重检查锁进行判断
      • 调用loadExtensionClasses,去加载左右扩展点的实现
    • loadExtensionClasses
      • 从不同目录去加载扩展点的实现,在最开始的时候讲到过的。

      • META-INF/dubbo ;META-INF/internal ; META-INF/services

      • 主要逻辑:

        • 获得当前扩展点的注解,也就是Protocol.class这个类的注解,@SPI

        • 判断这个注解不为空,则再次获得@SPI中的value值
        • 如果value有值,也就是@SPI(“dubbo”),则讲这个dubbo的值赋给cachedDefaultName。
          • 这就是为什么我们能够通过ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension() ,
          • 能够获得DubboProtocol这个扩展点的原因
        • 最后,通过loadFile去加载指定路径下的所有扩展点。
          • 也就是META-INF/dubbo;META-INF/internal;META-INF/services
    • // 此方法已经getExtensionClasses方法同步过。
      private Map<String, Class<?>> loadExtensionClasses() {
          //type->Protocol.class
          //得到SPI的注解
          final SPI defaultAnnotation = type.getAnnotation(SPI.class);
          if(defaultAnnotation != null) { //如果不等于空.
              String value = defaultAnnotation.value();
              if(value != null && (value = value.trim()).length() > 0) {
                  String[] names = NAME_SEPARATOR.split(value);
                  if(names.length > 1) {
                      throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                              + ": " + Arrays.toString(names));
                  }
                  if(names.length == 1) cachedDefaultName = names[0];
              }
          }
          Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
          loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
          loadFile(extensionClasses, DUBBO_DIRECTORY);
          loadFile(extensionClasses, SERVICES_DIRECTORY);
          return extensionClasses;
      }
      

      loadFile

      • 解析指定路径下的文件,获取对应的扩展点,通过反射的方式进行实例化以后,put到extensionClasses这个Map集合中
      • private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
            String fileName = dir + type.getName();
            try {
                Enumeration<java.net.URL> urls;
                ClassLoader classLoader = findClassLoader();
                if (classLoader != null) {
                    urls = classLoader.getResources(fileName);
                } else {
                    urls = ClassLoader.getSystemResources(fileName);
                }
                if (urls != null) {
                    while (urls.hasMoreElements()) {
                        java.net.URL url = urls.nextElement();
                        try {
                            BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                            try {
                                String line = null;
                                while ((line = reader.readLine()) != null) {
                                    final int ci = line.indexOf('#');
                                    if (ci >= 0) line = line.substring(0, ci);
                                    line = line.trim();
                                    if (line.length() > 0) {
                                        try {
                                            String name = null;
                                            int i = line.indexOf('=');
                                            if (i > 0) {//文件采用name=value方式,通过i进行分割
                                                name = line.substring(0, i).trim();
                                                line = line.substring(i + 1).trim();
                                            }
                                            if (line.length() > 0) {
                                                Class<?> clazz = Class.forName(line, true, classLoader);
                                                //加载对应的实现类,并且判断实现类必须是当前的加载的扩展点的实现
                                                if (! type.isAssignableFrom(clazz)) {
                                                    throw new IllegalStateException("Error when load extension class(interface: " +
                                                            type + ", class line: " + clazz.getName() + "), class " 
                                                            + clazz.getName() + "is not subtype of interface.");
                                                }
        
                                                //判断是否有自定义适配类,如果有,则在前面讲过的获取适配类的时候,直接返回当前的自定义适配类,不需要再动态创建
        // 还记得在前面讲过的getAdaptiveExtensionClass中有一个判断吗?是用来判断cachedAdaptiveClass是不是为空的。如果不为空,表示存在自定义扩展点。也就不会去动态生成字节码了。这个地方可以得到一个简单的结论;
        // @Adaptive如果是加在类上, 表示当前类是一个自定义的自适应扩展点
        //如果是加在方法级别上,表示需要动态创建一个自适应扩展点,也就是Protocol$Adaptive
                                                if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                    if(cachedAdaptiveClass == null) {
                                                        cachedAdaptiveClass = clazz;
                                                    } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                        throw new IllegalStateException("More than 1 adaptive class found: "
                                                                + cachedAdaptiveClass.getClass().getName()
                                                                + ", " + clazz.getClass().getName());
                                                    }
                                                } else {
                                                    try {
                                                        //如果没有Adaptive注解,则判断当前类是否带有参数是type类型的构造函数,如果有,则认为是
                                                        //wrapper类。这个wrapper实际上就是对扩展类进行装饰.
                                                        //可以在dubbo-rpc-api/internal下找到Protocol文件,发现Protocol配置了3个装饰
                                                        //分别是,filter/listener/mock. 所以Protocol这个实例来说,会增加对应的装饰器
                                                        clazz.getConstructor(type);//
                                                        //得到带有public DubboProtocol(Protocol protocol)的扩展点。进行包装
                                                        Set<Class<?>> wrappers = cachedWrapperClasses;
                                                        if (wrappers == null) {
                                                            cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                            wrappers = cachedWrapperClasses;
                                                        }
                                                        wrappers.add(clazz);//包装类 ProtocolFilterWrapper(ProtocolListenerWrapper(Protocol))
                                                    } catch (NoSuchMethodException e) {
                                                        clazz.getConstructor();
                                                        if (name == null || name.length() == 0) {
                                                            name = findAnnotationName(clazz);
                                                            if (name == null || name.length() == 0) {
                                                                if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                        && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                    name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                                } else {
                                                                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                                }
                                                            }
                                                        }
                                                        String[] names = NAME_SEPARATOR.split(name);
                                                        if (names != null && names.length > 0) {
                                                            Activate activate = clazz.getAnnotation(Activate.class);
                                                            if (activate != null) {
                                                                cachedActivates.put(names[0], activate);
                                                            }
                                                            for (String n : names) {
                                                                if (! cachedNames.containsKey(clazz)) {
                                                                    cachedNames.put(clazz, n);
                                                                }
                                                                Class<?> c = extensionClasses.get(n);
                                                                if (c == null) {
                                                                    extensionClasses.put(n, clazz);
                                                                } else if (c != clazz) {
                                                                    throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                                }
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        } catch (Throwable t) {
                                            IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                            exceptions.put(line, e);
                                        }
                                    }
                                } // end of while read lines
                            } finally {
                                reader.close();
                            }
                        } catch (Throwable t) {
                            logger.error("Exception when load extension class(interface: " +
                                                type + ", class file: " + url + ") in " + url, t);
                        }
                    } // end of while urls
                }
            } catch (Throwable t) {
                logger.error("Exception when load extension class(interface: " +
                        type + ", description file: " + fileName + ").", t);
            }
        }
        

阶段性小结

 

  • 截止到目前,我们已经把基于Protocol的自适应扩展点看完了。

  • 也明白最终这句话应该返回的对象是什么了.

    Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();

    也就是,这段代码中,最终的protocol应该等于= Protocol$Adaptive

injectExtension

  • 简单来说,这个方法的作用,是为这个自适应扩展点进行依赖注入。类似于spring里面的依赖注入功能。
  • 为适配器类的setter方法插入其他扩展点或实现。
  • 前面所有的相关类都加载完了,这里会把成员变量,进行赋值(所谓依赖注入
  • private T createAdaptiveExtension() {
        try {
            //可以实现扩展点的注入
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
        }
    }
    

getExtensionLoader这个方法中,会调用ExtensionLoader的私有构造方法进行初始化,其中有一个objectFactory.

  • 这个是干嘛的呢?
    • 构建bean的工厂,方便Extension 托管给 Spring等容器
    • objectFactory是一个 ExtensionFactory,用来获取所有的托管bean
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部