文档章节

dubbo源码系列之filter的今世

Mr_Qi
 Mr_Qi
发布于 2017/06/22 13:30
字数 1315
阅读 529
收藏 1

上一篇描述了ExtensionLoader加载spi以及wrapper的过程。

本篇描述一下整个filter执行链。

filter分为两种

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
 
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

默认使用两种group对应服务端和客户端(分别是provider和consumer)

找到对应group的extension

/**
 * Get activate extensions.
 *
 * @see com.alibaba.dubbo.common.extension.Activate
 * @param url url
 * @param values extension point names
 * @param group group
 * @return extension list which are activated
 */
public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<T>();
    List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
        getExtensionClasses();
        for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
            String name = entry.getKey();
            Activate activate = entry.getValue();
            if (isMatchGroup(group, activate.group())) {
                T ext = getExtension(name);
                if (! names.contains(name)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                        && isActive(activate, url)) {
                    exts.add(ext);
                }
            }
        }
        Collections.sort(exts, ActivateComparator.COMPARATOR);
    }
    List<T> usrs = new ArrayList<T>();
    for (int i = 0; i < names.size(); i ++) {
       String name = names.get(i);
        if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
              && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
           if (Constants.DEFAULT_KEY.equals(name)) {
              if (usrs.size() > 0) {
              exts.addAll(0, usrs);
              usrs.clear();
              }
           } else {
           T ext = getExtension(name);
           usrs.add(ext);
           }
        }
    }
    if (usrs.size() > 0) {
       exts.addAll(usrs);
    }
    return exts;
}
 
private boolean isMatchGroup(String group, String[] groups) {
    if (group == null || group.length() == 0) {
        return true;
    }
    if (groups != null && groups.length > 0) {
        for (String g : groups) {
            if (group.equals(g)) {
                return true;
            }
        }
    }
    return false;
}

涉及到了一个新的注解 Activate

/**
 * Activate
 * <p />
 * 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。
 * 比如,过滤扩展,有多个实现,使用Activate Annotation的扩展可以根据条件被自动加载。
 * <ol>
 * <li>{@link Activate#group()}生效的Group。具体的有哪些Group值由框架SPI给出。
 * <li>{@link Activate#value()}在{@link com.alibaba.dubbo.common.URL}中Key集合中有,则生效。
 * </ol>
 *
 * <p />
 * 底层框架SPI提供者通过{@link com.alibaba.dubbo.common.extension.ExtensionLoader}的{@link ExtensionLoader#getActivateExtension}方法
 * 获得条件的扩展。
 *
 * @author william.liangf
 * @author ding.lid
 * @export
 * @see SPI
 * @see ExtensionLoader
 * @see ExtensionLoader#getActivateExtension(com.alibaba.dubbo.common.URL, String[], String)
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    /**
     * Group过滤条件。
     * <br />
     * 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
     * <br />
     * 如没有Group设置,则不过滤。
     */
    String[] group() default {};
 
    /**
     * Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
     * <p />
     * 示例:<br/>
     * 注解的值 <code>@Activate("cache,validatioin")</code>,
     * 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
     * <br/>
     * 如没有设置,则不过滤。
     */
    String[] value() default {};
 
    /**
     * 排序信息,可以不提供。
     */
    String[] before() default {};
 
    /**
     * 排序信息,可以不提供。
     */
    String[] after() default {};
 
    /**
     * 排序信息,可以不提供。
     */
    int order() default 0;
}

在ExtensionLoader做loadFile的时候同时会Activate的注解也放入map中,根据对应Activate的注解来确定是否应用在指定的调用链上

上述代码可以得出结论当Activate的value为空是此时将不会过滤,排序字段有after before 和order字段共通完成,group字段指明出现的调用链(空表示不过滤否则需要和名称匹配)

具体排序使用Comparator进行排序,决定了调用链的顺序

public class ActivateComparator implements Comparator<Object> {
     
    public static final Comparator<Object> COMPARATOR = new ActivateComparator();
 
    public int compare(Object o1, Object o2) {
        if (o1 == null && o2 == null) {
            return 0;
        }
        if (o1 == null) {
            return -1;
        }
        if (o2 == null) {
            return 1;
        }
        if (o1.equals(o2)) {
            return 0;
        }
        Activate a1 = o1.getClass().getAnnotation(Activate.class);
        Activate a2 = o2.getClass().getAnnotation(Activate.class);
        if ((a1.before().length > 0 || a1.after().length > 0 
                || a2.before().length > 0 || a2.after().length > 0)
                && o1.getClass().getInterfaces().length > 0
                && o1.getClass().getInterfaces()[0].isAnnotationPresent(SPI.class)) {
            ExtensionLoader<?> extensionLoader = ExtensionLoader.getExtensionLoader(o1.getClass().getInterfaces()[0]);
            if (a1.before().length > 0 || a1.after().length > 0) {
                String n2 = extensionLoader.getExtensionName(o2.getClass());
                for (String before : a1.before()) {
                    if (before.equals(n2)) {
                        return -1;
                    }
                }
                for (String after : a1.after()) {
                    if (after.equals(n2)) {
                        return 1;
                    }
                }
            }
            if (a2.before().length > 0 || a2.after().length > 0) {
                String n1 = extensionLoader.getExtensionName(o1.getClass());
                for (String before : a2.before()) {
                    if (before.equals(n1)) {
                        return 1;
                    }
                }
                for (String after : a2.after()) {
                    if (after.equals(n1)) {
                        return -1;
                    }
                }
            }
        }
        int n1 = a1 == null ? 0 : a1.order();
        int n2 = a2 == null ? 0 : a2.order();
        return n1 > n2 ? 1 : -1; // 就算n1 == n2也不能返回0,否则在HashSet等集合中,会被认为是同一值而覆盖
    }
 
}

获取了Filter此时通过层层嵌套调用完成调用链的建立

此处以CacheFilter作为样例进行详述

 
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {
 
    private CacheFactory cacheFactory;
 
    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }
 
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
            Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
            if (cache != null) {
                String key = StringUtils.toArgumentString(invocation.getArguments());
                if (cache != null && key != null) {
                    Object value = cache.get(key);
                    if (value != null) {
                        return new RpcResult(value);
                    }
                    Result result = invoker.invoke(invocation);
                    if (! result.hasException()) {
                        cache.put(key, result.getValue());
                    }
                    return result;
                }
            }
        }
        return invoker.invoke(invocation);
    }
 
}

从上文描述可知,CacheFilter是同时作用在客户端和服务端,并且必须在URL中存在Cache的key才会自动激活。可以参考dubbo缓存代码分析

因此我们自定义Filter时可以根据Activate来定义调用链

先在文件夹META-INF/dubbo定义spi文件

com.alibaba.dubbo.rpc.Filter

clientInfoConsumer=com.air.tqb.dubbo.filter.ClientInfoConsumerFilter

package com.air.tqb.dubbo.filter;
  
import com.air.tqb.rmi.clientInfo.ClientInfo;
import com.air.tqb.rmi.clientInfo.ClientInfoRemoteInvocationFilter;
import com.air.tqb.rmi.clientInfo.RemoteInvocationCallback;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.json.JSON;
import com.alibaba.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
  
import java.io.IOException;
  
/**
 * Created by qixiaobo on 2017/6/16.
 */
@Activate(group = {Constants.CONSUMER})
public class ClientInfoConsumerFilter implements Filter, ClientInfoRemoteInvocationFilter {
   private ThreadLocal<ClientInfo> clientInfoTL = new ThreadLocal<>();
   private RemoteInvocationCallback remoteInvocationCallback;
   private String from;
  
   private static Logger logger = LoggerFactory.getLogger(ClientInfoConsumerFilter.class);
  
   @Override
   public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  
       remoteInvocationCallback.beforeCreateRemoteInvocation(this, invocation.getMethodName(), invocation.getArguments());
       if (getClientInfo() != null) {
           ClientInfo info = getClientInfo();
           if (from != null && info.getFrom() == null) {
               info.setFrom(from);
           }
           try {
               RpcContext.getContext()
                       .setInvoker(invoker)
                       .setInvocation(invocation)
                       .setAttachment(CLIENT_INFO, JSON.json(info));
           } catch (IOException e) {
               logger.error(e.getMessage(), e);
           }
           clientInfoTL.remove();
       }
       remoteInvocationCallback.afterCreateRemoteInvocation(this, invocation.getMethodName(), invocation.getArguments());
       return invoker.invoke(invocation);
   }
  
   @Override
   public ClientInfo getClientInfo() {
       return clientInfoTL.get();
   }
  
   @Override
   public void setClientInfo(ClientInfo clientInfo) {
       clientInfoTL.set(clientInfo);
   }
  
   @Override
   public RemoteInvocationCallback getRemoteInvocationCallback() {
       return remoteInvocationCallback;
   }
  
   @Override
   public void setRemoteInvocationCallback(RemoteInvocationCallback remoteInvocationCallback) {
       this.remoteInvocationCallback = remoteInvocationCallback;
   }
  
   @Override
   public String getFrom() {
       return from;
   }
  
   @Override
   public void setFrom(String from) {
       this.from = from;
   }
}

这样只要加载了这个jar那么久自然实现了扩展点的启用,在Filter上修改了调用链

© 著作权归作者所有

共有 人打赏支持
Mr_Qi

Mr_Qi

粉丝 280
博文 359
码字总数 369228
作品 0
南京
程序员
私信 提问
Dubbo的filter按需加载

背景 一天之内 两个小伙伴问我关于filter的按需加载的机制 有必要这边记录一下 关于filter的说明 dubbo源码系列之filter的前生 dubbo源码系列之filter的今世 分析 关于dubbo中Activate的注解...

Mr_Qi
08/17
0
0
源码之下无秘密 ── 做最好的 Netty 源码分析教程

背景 在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学习 Netty 源码的想法. 刚开始看源码的时候, 自然是比较痛苦...

永顺
2017/11/29
0
0
Dubbo之telnet实现

我们可以通过telnet来访问道对应dubbo服务的信息 比如 1737165qEu871390.png 我们可以利用一些指令来访问。 我们知道,默认情况下,dubbo使用netty做transport。 那么dubbo是如何区分开正常业...

波波维奇
2017/11/30
0
0
Dubbo分析之Protocol层

系列文章 Dubbo分析Serialize层 Dubbo分析之Transport层 Dubbo分析之Exchange 层 Dubbo分析之Protocol层 前言 紧接着上文Dubbo分析之Exchange层,继续分析protocol远程调用层,官方介绍:封装...

ksfzhaohui
11/02
0
0
Dubbo源码之客户端并发控制——ActiveLimitFilter

上篇解释了Dubbo源码中降级及容错处理 Dubbo服务调用——Cluster组件(服务降级,容错) 这篇文章主要是关于Dubbo源码中的限流组件,Dubbo限流除了限流(并发限制)的入口ThreadPool 之外,还有...

键走偏锋
08/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

RabbitMQ+PHP 教程三(Publish/Subscribe)用yii2测试通过

介绍 在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都交付给一个工作人员处理。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者发送消息。此模式...

hansonwong
23分钟前
2
0
关于JAVA你必须知道的那些事(四):单例模式和多态

好吧,今天一定要把面向对象的最后一个特性:多态,给说完。不过我们先来聊一聊设计模式,因为它很重要。 设计模式 官方的解释是,设计模式是:一套被反复使用,多数人知晓的,经过分类编目,...

拾光TM
23分钟前
1
0
ES6 系列之 Babel 是如何编译 Class 的(下)

摘要: ## 前言 在上一篇 [《 ES6 系列 Babel 是如何编译 Class 的(上)》](https://github.com/mqyqingfeng/Blog/issues/105),我们知道了 Babel 是如何编译 Class 的,这篇我们学习 Babel ...

阿里云官方博客
24分钟前
1
0
附实例!实现iframe父窗体与子窗体的通信

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由前端林子发表于云+社区专栏 本文主要会介绍如何基于MessengerJS,实现iframe父窗体与子窗体间的通信,传递数据信息。同时本...

腾讯云加社区
30分钟前
1
0
JSP页面传List集合到Action中

1:JSP页面(前端用的是H-UI框架) <div class="codeView docs-example"> <table class="table table-border table-bordered table-striped"> <thead> ......

uug
33分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部