文档章节

为什么说dubbo的声明式缓存不好用!!!

Mr_Qi
 Mr_Qi
发布于 2017/06/22 22:03
字数 3275
阅读 533
收藏 3

前几篇我们分析了dubbo的缓存以及缓存依赖的机制(filter)

那么通常提供缓存的目的是什么呢?

关于两级缓存的说明

通常为了更快的速度(以及一定的稳定性)

那么dubbo中的实现是通过filter机制基本上来缓存了我们需要结果。

但是默认提供的缓存并没有清空机制(不能提供供用户直接刷新缓存的机制)

我们系统中较为简单的是提供jmx来共用户刷新mac环境下ehcache 广播rmi异常解决&JMX相关

其次缓存提供了更好的稳定性,通常第三方服务挂了时候对于已经缓存好的数据可以直接获取缓存数据。

 

那么dubbo中的实现如何呢?下面来分析一下

对于稳定性来说我们一本是站在客户端的角度的。那么我们先从ReferneceBean开始分析

抛开spring对于RefenenceBean的初始化过程,直接进入RefenenceBean的init方法。

private void init() {
  if (initialized) {
      return;
  }
  initialized = true;
   if (interfaceName == null || interfaceName.length() == 0) {
       throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
   }
   // 获取消费者全局配置
   checkDefault();
     appendProperties(this);
     if (getGeneric() == null && getConsumer() != null) {
         setGeneric(getConsumer().getGeneric());
     }
     if (ProtocolUtils.isGeneric(getGeneric())) {
         interfaceClass = GenericService.class;
     } else {
         try {
   interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
           .getContextClassLoader());
} catch (ClassNotFoundException e) {
   throw new IllegalStateException(e.getMessage(), e);
}
         checkInterfaceAndMethods(interfaceClass, methods);
     }
     String resolve = System.getProperty(interfaceName);
     String resolveFile = null;
     if (resolve == null || resolve.length() == 0) {
      resolveFile = System.getProperty("dubbo.resolve.file");
      if (resolveFile == null || resolveFile.length() == 0) {
       File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
       if (userResolveFile.exists()) {
          resolveFile = userResolveFile.getAbsolutePath();
       }
      }
      if (resolveFile != null && resolveFile.length() > 0) {
       Properties properties = new Properties();
       FileInputStream fis = null;
       try {
           fis = new FileInputStream(new File(resolveFile));
      properties.load(fis);
   } catch (IOException e) {
      throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
   } finally {
       try {
                     if(null != fis) fis.close();
                 } catch (IOException e) {
                     logger.warn(e.getMessage(), e);
                 }
   }
       resolve = properties.getProperty(interfaceName);
      }
     }
     if (resolve != null && resolve.length() > 0) {
       url = resolve;
       if (logger.isWarnEnabled()) {
          if (resolveFile != null && resolveFile.length() > 0) {
             logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
          } else {
             logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
          }
      }
     }
     if (consumer != null) {
         if (application == null) {
             application = consumer.getApplication();
         }
         if (module == null) {
             module = consumer.getModule();
         }
         if (registries == null) {
             registries = consumer.getRegistries();
         }
         if (monitor == null) {
             monitor = consumer.getMonitor();
         }
     }
     if (module != null) {
         if (registries == null) {
             registries = module.getRegistries();
         }
         if (monitor == null) {
             monitor = module.getMonitor();
         }
     }
     if (application != null) {
         if (registries == null) {
             registries = application.getRegistries();
         }
         if (monitor == null) {
             monitor = application.getMonitor();
         }
     }
     checkApplication();
     checkStubAndMock(interfaceClass);
     Map<String, String> map = new HashMap<String, String>();
     Map<Object, Object> attributes = new HashMap<Object, Object>();
     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
     map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
     if (ConfigUtils.getPid() > 0) {
         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
     }
     if (! isGeneric()) {
         String revision = Version.getVersion(interfaceClass, version);
         if (revision != null && revision.length() > 0) {
             map.put("revision", revision);
         }
 
         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
         if(methods.length == 0) {
             logger.warn("NO method found in service interface " + interfaceClass.getName());
             map.put("methods", Constants.ANY_VALUE);
         }
         else {
             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
         }
     }
     map.put(Constants.INTERFACE_KEY, interfaceName);
     appendParameters(map, application);
     appendParameters(map, module);
     appendParameters(map, consumer, Constants.DEFAULT_KEY);
     appendParameters(map, this);
     String prifix = StringUtils.getServiceKey(map);
     if (methods != null && methods.size() > 0) {
         for (MethodConfig method : methods) {
             appendParameters(map, method, method.getName());
             String retryKey = method.getName() + ".retry";
             if (map.containsKey(retryKey)) {
                 String retryValue = map.remove(retryKey);
                 if ("false".equals(retryValue)) {
                     map.put(method.getName() + ".retries", "0");
                 }
             }
             appendAttributes(attributes, method, prifix + "." + method.getName());
             checkAndConvertImplicitConfig(method, map, attributes);
         }
     }
     //attributes通过系统context进行存储.
     StaticContext.getSystemContext().putAll(attributes);
     ref = createProxy(map);
 }

除开一些必要的设置之外,主要进入createProxy方法

@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
private T createProxy(Map<String, String> map) {
   URL tmpUrl = new URL("temp", "localhost", 0, map);
   final boolean isJvmRefer;
       if (isInjvm() == null) {
           if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
               isJvmRefer = false;
           } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
               //默认情况下如果本地有服务暴露,则引用本地服务.
               isJvmRefer = true;
           } else {
               isJvmRefer = false;
           }
       } else {
           isJvmRefer = isInjvm().booleanValue();
       }
    
   if (isJvmRefer) {
      URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
      invoker = refprotocol.refer(interfaceClass, url);
           if (logger.isInfoEnabled()) {
               logger.info("Using injvm service " + interfaceClass.getName());
           }
   } else {
           if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
               String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
               if (us != null && us.length > 0) {
                   for (String u : us) {
                       URL url = URL.valueOf(u);
                       if (url.getPath() == null || url.getPath().length() == 0) {
                           url = url.setPath(interfaceName);
                       }
                       if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                           urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                       } else {
                           urls.add(ClusterUtils.mergeUrl(url, map));
                       }
                   }
               }
           } else { // 通过注册中心配置拼装URL
               List<URL> us = loadRegistries(false);
               if (us != null && us.size() > 0) {
                   for (URL u : us) {
                       URL monitorUrl = loadMonitor(u);
                       if (monitorUrl != null) {
                           map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                       }
                       urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                   }
               }
               if (urls == null || urls.size() == 0) {
                   throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
               }
           }
 
           if (urls.size() == 1) {
               invoker = refprotocol.refer(interfaceClass, urls.get(0));
           } else {
               List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
               URL registryURL = null;
               for (URL url : urls) {
                   invokers.add(refprotocol.refer(interfaceClass, url));
                   if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                       registryURL = url; // 用了最后一个registry url
                   }
               }
               if (registryURL != null) { // 有 注册中心协议的URL
                   // 对有注册中心的Cluster 只用 AvailableCluster
                   URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                   invoker = cluster.join(new StaticDirectory(u, invokers));
               }  else { // 不是 注册中心的URL
                   invoker = cluster.join(new StaticDirectory(invokers));
               }
           }
       }
 
       Boolean c = check;
       if (c == null && consumer != null) {
           c = consumer.isCheck();
       }
       if (c == null) {
           c = true; // default true
       }
       if (c && ! invoker.isAvailable()) {
           throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
       }
       if (logger.isInfoEnabled()) {
           logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
       }
       // 创建服务代理
       return (T) proxyFactory.getProxy(invoker);
   }

便于说明我们此处使用zookeeper作为协议 可以看到此处我们走到的业务逻辑为

List<URL> us = loadRegistries(false);
protected List<URL> loadRegistries(boolean provider) {
    checkRegistry();
    List<URL> registryList = new ArrayList<URL>();
    if (registries != null && registries.size() > 0) {
        for (RegistryConfig config : registries) {
            String address = config.getAddress();
            if (address == null || address.length() == 0) {
               address = Constants.ANYHOST_VALUE;
            }
            String sysaddress = System.getProperty("dubbo.registry.address");
            if (sysaddress != null && sysaddress.length() > 0) {
                address = sysaddress;
            }
            if (address != null && address.length() > 0
                    && ! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                appendParameters(map, application);
                appendParameters(map, config);
                map.put("path", RegistryService.class.getName());
                map.put("dubbo", Version.getVersion());
                map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                if (ConfigUtils.getPid() > 0) {
                    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                }
                if (! map.containsKey("protocol")) {
                    if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                        map.put("protocol", "remote");
                    } else {
                        map.put("protocol", "dubbo");
                    }
                }
                List<URL> urls = UrlUtils.parseURLs(address, map);
                for (URL url : urls) {
                    url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                    url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                    if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                            || (! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}

通篇最重要的就是设置了Protocol为Constants.REGISTRY_PROTOCOL 因此我们可以认为此处使用的是RegistryProtocol 。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
            + "," + Constants.CONFIGURATORS_CATEGORY
            + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}

RegistryProtocol在生成Invoker时使用了Cluster,那么这个Cluster是啥呢?

通常来说指的是(使用*或者复杂分组)

private Cluster getMergeableCluster() {
    return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}

而cluster对于来说spi如下

mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster
failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster
failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster
failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster
forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster
available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster
mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster
broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster

其中MockClusterWrapper是对应的wrapper 参考 dubbo源码系列之filter的前生

*/
public class MockClusterWrapper implements Cluster {
 
   private Cluster cluster;
 
   public MockClusterWrapper(Cluster cluster) {
      this.cluster = cluster;
   }
 
   public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
      return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
   }
 
}

那么可以确定生成的Invoker就是MockClusterInvoker

 

这个Mock中注入的为cluster接口spi默认为failover

@SPI(FailoverCluster.NAME)
public interface Cluster {
 
    /**
     * Merge the directory invokers to a virtual invoker.
     *
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
 
}

参考说明

Feature

Maturity

Strength

Problem

Advise

User

Failover Cluster Stable 失败自动切换,当出现失败,重试其它服务器,通常用于读操作(推荐使用) 重试会带来更长延迟 可用于生产环境 Alibaba
Failfast Cluster Stable 快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作 如果有机器正在重启,可能会出现调用失败 可用于生产环境 Alibaba
Failsafe Cluster Stable 失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作 调用信息丢失 可用于生产环境 Monitor
Failback Cluster Tested 失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作 不可靠,重启丢失 可用于生产环境 Registry
Forking Cluster Tested 并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作 需要浪费更多服务资源 可用于生产环境  
Broadcast Cluster Tested 广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态 速度慢,任意一台报错则报错 可用于生产环境  

 

我们获取到了一个FailoverClusterInvoker

注意此处是由FailoverCluster直接new的,换言之没有任何包装和代理

public class FailoverCluster implements Cluster {
 
    public final static String NAME = "failover";
 
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
 
}

 

 

那么我们在实际调用的时候(上述情景)使用的是由MockClusterInvoker委托的FailoverClusterInvoker

出现MockClusterInvoker的主要原因应该是可以直接使用dubbo的 mock功能可以在服务调用结束时收尾(做其他处理)===》此处先不提stub

那么每次调用均会执行到

 

 

@SuppressWarnings({ "unchecked", "rawtypes" })
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
   List<Invoker<T>> copyinvokers = invokers;
   checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
       //重试时,进行重新选择,避免重试时invoker列表已发生变化.
       //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
       if (i > 0) {
          checkWheatherDestoried();
          copyinvokers = list(invocation);
          //重新检查一下
          checkInvokers(copyinvokers, invocation);
       }
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
            + invocation.getMethodName() + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
   List<Invoker<T>> invokers = directory.list(invocation);
   return invokers;
}
public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +  NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");
        }
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if(args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
            }
            if(invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if(invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if(invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

那么即使此刻有缓存的时候我们也无法使用该缓存,因为缓存的CacheFilter根本没有机会执行,反而直接报错。那么使用声明式缓存完成dubbo稳定性的美梦破灭了。

 

接着分析,上述代码中的字段forbidden是何时被指为true的呢?

/**
 * 根据invokerURL列表转换为invoker列表。转换规则如下:
 * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
 * 2.如果传入的invoker列表不为空,则表示最新的invoker列表
 * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
 * @param invokerUrls 传入的参数不能为null
 */
private void refreshInvoker(List<URL> invokerUrls){
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // 禁止访问
        this.methodInvokerMap = null; // 置空列表
        destroyAllInvokers(); // 关闭所有Invoker
    } else {
        this.forbidden = false; // 允许访问
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
        }
        if (invokerUrls.size() ==0 ){
           return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
        // state change
        //如果计算错误,则不进行处理.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
            return ;
        }
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try{
            destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
        }catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

 

此段代码当invokerUrls中有且仅有一个empty的protocol的URL时将会强行置为true。

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // configurators
    if (configuratorUrls != null && configuratorUrls.size() >0 ){
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && routerUrls.size() >0 ){
        List<Router> routers = toRouters(routerUrls);
        if(routers != null){ // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override参数
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    refreshInvoker(invokerUrls);
}

 

通常来说我们使用zookeeper作为注册。

通常provider在注册一个临时节点(当服务被摘掉时将会自动消失)

那么客户端会对对应path进行监听,如果出现childremove

if (zkListener == null) {
    listeners.putIfAbsent(listener, new ChildListener() {
        public void childChanged(String parentPath, List<String> currentChilds) {
           ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
        }
    });
    zkListener = listeners.get(listener);
}

此时会对currentChilds进行处理

private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
    List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
    if (urls == null || urls.isEmpty()) {
       int i = path.lastIndexOf('/');
       String category = i < 0 ? path : path.substring(i + 1);
       URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
        urls.add(empty);
    }
    return urls;
}

一旦发现当前孩子节点为空那么新建一条protocol为empty的URL放入,此时由于上述代码将会将forbidden置为true,因此出现消费者不能正常获取缓存。

当然对于结果的缓存还是有用处的,但是一般如果单机服务节点出现故障即使存有缓存也无法消费略有些遗憾。

下一篇可以介绍使用Stub或者mock来进行dubbo缓存前置

 

© 著作权归作者所有

共有 人打赏支持
Mr_Qi

Mr_Qi

粉丝 280
博文 359
码字总数 369228
作品 0
南京
程序员
私信 提问
推荐几个自己写的Java后端相关的范例项目(转载)

http://wosyingjun.iteye.com/blog/2312553 这里推荐几个自己写的范例项目,主要采用SSM(Spring+SpringMVC+Mybatis)框架,分布式架构采用的是(dubbo+zookeeper)。范例项目的好处是简单易...

指尖的舞者
2016/09/27
147
0
dubbo源码解析-zookeeper订阅

前言 上周写完了服务暴露总结之后发现遗漏了一个很重要的点,在dubbo源码解析-zookeeper连接中我们对面试高频题 dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信...

肥朝
2017/12/03
0
0
从线程池理论浅析为什么要看源码

前言 很多时候,我都想向大家传输一个思想,那就是只有懂了原理,才能随心随心所欲写代码.而看源码,又是了解原理的一个非常重要的途径. 然而,肥朝之前的文章,大致分为三类 源码解析,穿插怎么看源...

肥朝
2018/10/13
0
0
dubbo专题-深入浅出zookeeper订阅原理

  点击上方“java进阶架构师”,选择右上角“置顶公众号”   20大进阶架构专题每日送达      上周写完了服务暴露总结之后发现遗漏了一个很重要的点,在dubbo源码解析-zookeeper连接中...

java进阶架构师
2018/10/24
0
0
深入 Spring Boot : 快速集成 Dubbo + Hystrix

背景 Hystrix 旨在通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。Hystrix具备拥有回退机制和断路器功能的线程和信号隔离,请求缓存和请求打包,以...

小致dad
2018/07/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

zookeeper和HBASE总结

zookeeper快速上手 zookeeper的基本功能和应用场景 zookeeper的整体运行机制 zookeeper的数据存储机制 数据存储形式 zookeeper中对用户的数据采用kv形式存储 只是zk有点特别: key:是以路径...

瑞查德-Jack
47分钟前
1
0
Oracle 查询时间在当天的数据

要实现这个功能需要用到trunc这个函数对时间的操作select trunc(sysdate) from dual --2014-12-27 今天的日期为2014-12-27select trunc(sysdate, 'mm') from dual --2014-12-1 ......

覃光林
48分钟前
1
0
阿里技术专家详解 Dubbo 实践,演进及未来规划

作者:曹胜利 链接:https://www.infoq.cn/article/IwZCAp3jo_H5fJFbWOZu?utm_source=tuicool&utm_medium=referral Dubbo 整体介绍 Dubbo 是一款高性能,轻量级的 Java RPC 框架。虽然它是以...

Java干货分享
51分钟前
1
0
深入解读阿里云数据库POLARDB核心功能物理复制技术

日志是数据库的重要组成部份,按顺序以增量的方式记录了数据库上所有的操作,日志模块的设计对于数据库的可靠性、稳定性和性能都非常重要。 可靠性方面,在有一个数据文件的基础全量备份后,...

阿里云官方博客
55分钟前
1
0
Python数据科学环境:Anaconda 了解一下

几乎所有的 Python 学习者都遇到过“安装”方面的问题。这些安装问题包括 Python 自身环境的安装、第三方模块的安装、不同版本的切换,以及不同平台、版本间的兼容问题等。当你因为这些问题而...

crossin
56分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部