文档章节

Dubbo超时控制源码分析

Mr_Qi
 Mr_Qi
发布于 2017/07/06 16:44
字数 2061
阅读 467
收藏 1

我们在么dubbo声明超时大约可以分如下几个层次

  • 上图中以timeout为例,显示了配置的查找顺序,其它retries, loadbalance, actives等类似。
    • 方法级优先,接口级次之,全局配置再次之。
    • 如果级别一样,则消费方优先,提供方次之。
  • 其中,服务提供方配置,通过URL经由注册中心传递给消费方。
  • 建议由服务提供方设置超时,因为一个方法需要执行多长时间,服务提供方更清楚,如果一个消费方同时引用多个服务,就不需要关心每个服务的超时设置。
     

如上所述,我们可以获得最终的timeOut。那么实际代码中如何控制的呢?

首先查看AbstractConfig中如何构建url

@SuppressWarnings("unchecked")
protected static void appendParameters(Map<String, String> parameters, Object config, String prefix) {
    if (config == null) {
        return;
    }
    Method[] methods = config.getClass().getMethods();
    for (Method method : methods) {
        try {
            String name = method.getName();
            if ((name.startsWith("get") || name.startsWith("is"))
                    && ! "getClass".equals(name)
                    && Modifier.isPublic(method.getModifiers())
                    && method.getParameterTypes().length == 0
                    && isPrimitive(method.getReturnType())) {
                Parameter parameter = method.getAnnotation(Parameter.class);
                if (method.getReturnType() == Object.class || parameter != null && parameter.excluded()) {
                    continue;
                }
                int i = name.startsWith("get") ? 3 : 2;
                String prop = StringUtils.camelToSplitName(name.substring(i, i + 1).toLowerCase() + name.substring(i + 1), ".");
                String key;
                if (parameter != null && parameter.key() != null && parameter.key().length() > 0) {
                    key = parameter.key();
                } else {
                    key = prop;
                }
                Object value = method.invoke(config, new Object[0]);
                String str = String.valueOf(value).trim();
                if (value != null && str.length() > 0) {
                    if (parameter != null && parameter.escaped()) {
                        str = URL.encode(str);
                    }
                    if (parameter != null && parameter.append()) {
                        String pre = (String)parameters.get(Constants.DEFAULT_KEY + "." + key);
                        if (pre != null && pre.length() > 0) {
                            str = pre + "," + str;
                        }
                        pre = (String)parameters.get(key);
                        if (pre != null && pre.length() > 0) {
                            str = pre + "," + str;
                        }
                    }
                    if (prefix != null && prefix.length() > 0) {
                        key = prefix + "." + key;
                    }
                    parameters.put(key, str);
                } else if (parameter != null && parameter.required()) {
                    throw new IllegalStateException(config.getClass().getSimpleName() + "." + key + " == null");
                }
            } else if ("getParameters".equals(name)
                    && Modifier.isPublic(method.getModifiers())
                    && method.getParameterTypes().length == 0
                    && method.getReturnType() == Map.class) {
                Map<String, String> map = (Map<String, String>) method.invoke(config, new Object[0]);
                if (map != null && map.size() > 0) {
                    String pre = (prefix != null && prefix.length() > 0 ? prefix + "." : "");
                    for (Map.Entry<String, String> entry : map.entrySet()) {
                        parameters.put(pre + entry.getKey().replace('-', '.'), entry.getValue());
                    }
                }
            }
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}
  1. 将对应类型的get方法或者is方法打头的方法(public 参数长度为0 返回值是包装类型)取出排除getClass(获取相关属性)
  2. 排除有Parameter中excluded为true或者返回类型是Object的类型
  3. 取出parameter中key的字段(如果不存在或者长度为0使用方法名截取后面部分)
  4. 利用反射获取数据值并根据是否需要escaped来进行转换
  5. 依次检测参数并且将数据放入parameters

在方法暴露成Url时如下

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }
 
    String host = protocolConfig.getHost();
    if (provider != null && (host == null || host.length() == 0)) {
        host = provider.getHost();
    }
    boolean anyhost = false;
    if (NetUtils.isInvalidLocalHost(host)) {
        anyhost = true;
        try {
            host = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            logger.warn(e.getMessage(), e);
        }
        if (NetUtils.isInvalidLocalHost(host)) {
            if (registryURLs != null && registryURLs.size() > 0) {
                for (URL registryURL : registryURLs) {
                    try {
                        Socket socket = new Socket();
                        try {
                            SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                            socket.connect(addr, 1000);
                            host = socket.getLocalAddress().getHostAddress();
                            break;
                        } finally {
                            try {
                                socket.close();
                            } catch (Throwable e) {}
                        }
                    } catch (Exception e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
            if (NetUtils.isInvalidLocalHost(host)) {
                host = NetUtils.getLocalHost();
            }
        }
    }
 
    Integer port = protocolConfig.getPort();
    if (provider != null && (port == null || port == 0)) {
        port = provider.getPort();
    }
    final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
    if (port == null || port == 0) {
        port = defaultPort;
    }
    if (port == null || port <= 0) {
        port = getRandomPort(name);
        if (port == null || port < 0) {
            port = NetUtils.getAvailablePort(defaultPort);
            putRandomPort(name, port);
        }
        logger.warn("Use random available port(" + port + ") for protocol " + name);
    }
 
    Map<String, String> map = new HashMap<String, String>();
    if (anyhost) {
        map.put(Constants.ANYHOST_KEY, "true");
    }
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    if (methods != null && methods.size() > 0) {
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            List<ArgumentConfig> arguments = method.getArguments();
            if (arguments != null && arguments.size() > 0) {
                for (ArgumentConfig argument : arguments) {
                    //类型自动转换.
                    if(argument.getType() != null && argument.getType().length() >0){
                        Method[] methods = interfaceClass.getMethods();
                        //遍历所有方法
                        if(methods != null && methods.length > 0){
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                //匹配方法名称,获取方法签名.
                                if(methodName.equals(method.getName())){
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    //一个方法中单个callback
                                    if (argument.getIndex() != -1 ){
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        }else {
                                            throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        //一个方法中多个callback
                                        for (int j = 0 ;j<argtypes.length ;j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())){
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j){
                                                    throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }else if(argument.getIndex() != -1){
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    }else {
                        throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }
 
                }
            }
        } // end of methods for
    }
 
    if (generic) {
        map.put("generic", String.valueOf(true));
        map.put("methods", Constants.ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }
 
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if(methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        }
        else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    if (! ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put("token", UUID.randomUUID().toString());
        } else {
            map.put("token", token);
        }
    }
    if ("injvm".equals(protocolConfig.getName())) {
        protocolConfig.setRegister(false);
        map.put("notify", "false");
    }
    // 导出服务
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
 
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }
 
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置为none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
 
        //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
        if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && registryURLs.size() > 0
                    && url.getParameter("register", true)) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
 
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
 
                Exporter<?> exporter = protocol.export(invoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

核心在于appendParameters的顺序

服务提供者由上到下依次是

appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);

最终并且将method的参数也append进去。

所以可以得出结论最终服务端暴露出的顺序最终以method为准,否则是service 再上是provider

消费者端

在invoker中查看可知(默认超时时间为1000ms)

boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
   boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    currentClient.send(inv, isSent);
    RpcContext.getContext().setFuture(null);
    return new RpcResult();
} else if (isAsync) {
   ResponseFuture future = currentClient.request(inv, timeout) ;
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();
} else {
   RpcContext.getContext().setFuture(null);
    return (Result) currentClient.request(inv, timeout).get();
}

重点查看该方法

public String getMethodParameter(String method, String key) {
    String value = parameters.get(method + "." + key);
    if (value == null || value.length() == 0) {
        return getParameter(key);
    }
    return value;
}
public String getParameter(String key) {
    String value = parameters.get(key);
    if (value == null || value.length() == 0) {
        value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key);
    }
    return value;
}
  1. 根据method的对应字段来获取timeout
  2. 根据default获取timeout

从上文中service的构建可以看出timeout如果不是method上定义的话优先使用(default是在provider作为前缀使用),当针对method调价超时是由methodname作为前缀使用==》dubbo中请不要使用方法名称相同

因此在客户端调用如下

appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prifix = StringUtils.getServiceKey(map);
if (methods != null && methods.size() > 0) {
    for (MethodConfig method : methods) {
        appendParameters(map, method, method.getName());
        String retryKey = method.getName() + ".retry";
        if (map.containsKey(retryKey)) {
            String retryValue = map.remove(retryKey);
            if ("false".equals(retryValue)) {
                map.put(method.getName() + ".retries", "0");
            }
        }
        appendAttributes(attributes, method, prifix + "." + method.getName());
        checkAndConvertImplicitConfig(method, map, attributes);
    }
}

同样道理将consumer作为default放入,依次放入reference的参数以及method对应名称为prefix的参数

因此从上述表达中可知

  1. 根据refrenece的method获取timeout   ===》<method>.timeout method级别
  2. 如果没有就获取timeout ===》timeout reference级别
  3. 如果没有获取默认timeout===》default.timeout consumer级别
  4. 没有直接设置为1000ms 

那么看到核心合并方法如下 RegistryDirectory

URL url = mergeUrl(providerUrl);
/**
 * 合并url参数 顺序为override > -D >Consumer > Provider
 * @param providerUrl
 * @param overrides
 * @return
 */
private URL mergeUrl(URL providerUrl){
    providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 合并消费端参数
     
    List<Configurator> localConfigurators = this.configurators; // local reference
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            providerUrl = configurator.configure(providerUrl);
        }
    }
     
    providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // 不检查连接是否成功,总是创建Invoker!
     
    //directoryUrl 与 override 合并是在notify的最后,这里不能够处理
    this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // 合并提供者参数       
     
    if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0)
            && "dubbo".equals(providerUrl.getProtocol())) { // 兼容1.0
        //fix by tony.chenl DUBBO-44
        String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);
        if (path != null) {
            int i = path.indexOf('/');
            if (i >= 0) {
                path = path.substring(i + 1);
            }
            i = path.lastIndexOf(':');
            if (i >= 0) {
                path = path.substring(0, i);
            }
            providerUrl = providerUrl.setPath(path);
        }
    }
    return providerUrl;
}
由上述代码可知客户端默认生成超时时间会用消费者timeout覆盖提供者timeout
依然分三种区分timeout:
default.timeout consumer 覆盖 provider
timeout refer覆盖service
<method>.timeout 消费者方法覆盖生产者方法
而获取timeout的参数整体就是从3—》的顺序 如果仍然没有则设置时间为1000ms也就是1s
整体rpc的调用通过DefaultFuture来实现
public DefaultFuture(Channel channel, Request request, int timeout){
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // put into waiting map.
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

当同步调用时,执行future.get获取结果。

因此完成了整个超时的设置。

© 著作权归作者所有

共有 人打赏支持
Mr_Qi

Mr_Qi

粉丝 280
博文 359
码字总数 369228
作品 0
南京
程序员
私信 提问
使用Dubbo中需要注意的事项

一、前言 Dubbo作为高性能RPC框架,已经进入Apache卵化器项目,虽然官方给出了dubbo使用的用户手册,但是大多是一概而过,使用dubbo时候要尽量了解源码,不然会很容易入坑。 二 、服务消费端...

加多
2018/01/02
0
0
dubbo 超时设置和源码分析

本文 dubbo 2.6.2 在工作中碰到一个业务接口时间比较长,需要修改超时时间,不知道原理,在网上搜索,看到有人说如果你觉得自己了解了dubbo的超时机制,那么问问自己以下问题: 超时是针对消...

我叫董先森
2018/09/13
0
0
dubbo超时异常

dubbo超时异常 在调用dubbo服务时经常看到如下错误:Caused by: com.alibaba.dubbo.remoting.TimeoutException: Waiting server-side response timeout by scan timer. 源码分析 客户端调用远......

lipengHeke
2018/02/05
13
0
Dubbo源码之服务端并发控制——ExecuteLimitFilter

上一篇关于《Dubbo客户端并发控制——ActiveLimitFilter》 作用,设计原理,及配置方式。 这篇是关于Dubbo服务端Filter组件扩展 ExecuteLimitFilter ,它可以限制服务端的方法级别的并发处理...

键走偏锋
2018/08/25
0
0
dubbo源码分析-服务端发布流程-笔记

Spring对外留出的扩展 dubbo是基于spring 配置来实现服务的发布的,那么一定是基于spring的扩展来写了一套自己的标签,那么spring是如何解析这些配置呢?具体细节就不在这里讲解,大家之前在...

Java搬砖工程师
2018/12/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

外教比较

确定收费的模式 确定授课的模式 确定教学的方式-用什么样的方式能让人更快更好的学会 确定核心竞争力-比如我们的师资是牛津大学的 英语流利说 收费的模式-报特色课程,比如训练营之类的,其实...

V字仇杀
15分钟前
1
0
上下文无关文法介绍

上下文无关文法 上下文无关文法是用来描述程序语言的一种表达方式,通过简单的符号描述语言的集合。正如我们所知道,一个程序即为一个句子(字符串),语言就是所有句子的集合。上下文无关文...

陶小陶
25分钟前
3
0
eggjs与sequelize简单demo

参考 egg 官方文档 安装 // 依赖npm install --save egg-sequelize mysql2// ts 类型npm install --save @types/sequelize 插件,config/plugin.ts import { EggPlugin } from 'egg';......

Geeyu
今天
1
0
看过上百部片子的这个人教你视频标签算法解析

本文由云+社区发表 随着内容时代的来临,多媒体信息,特别是视频信息的分析和理解需求,如图像分类、图像打标签、视频处理等等,变得越发迫切。目前图像分类已经发展了多年,在一定条件下已经...

腾讯云加社区
今天
4
0
2. 红黑树

定义:红黑树(Red-Black Tree,简称R-B Tree),它一种特殊的二叉查找树(Binary Search Tree)。 要理解红黑树,先要了解什么是二叉查找树。在上一章中,我们学习了什么是二叉树,以及二叉树...

火拳-艾斯
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部