文档章节

dubbo源码分析-Directory 和 LoadBalance-笔记

Java搬砖工程师
 Java搬砖工程师
发布于 2018/12/03 18:02
字数 1124
阅读 61
收藏 0

Directory

  • 订阅节点的变化,
    1. 当zookeeper上指定节点发生变化以后,会通知到RegistryDirectory的notify方法
    2. 将url转化为invoker对象

调用过程中invokers的使用

  • StaticDirectory: 静态目录服务,
    • 它的所有Invoker通过构造函数传入,
    • 服务消费方引用服务的时候, 服务对多注册中心的引用,将Invokers集合直接传入 StaticDirectory构造器,再由Cluster伪装成一个Invoker
    • StaticDirectory的list方法直接返回所有invoker集合;
  • RegistryDirectory: 注册目录服务,
    • 它的Invoker集合是从注册中心获取的,
    • 它实现了NotifyListener接口实现了回调接口notify(List<Url>)

Directory目录服务的更新过程

  • RegistryProtocol.doRefer方法,也就是消费端在初始化的时候,这里涉及到了RegistryDirectory这个类。然后执行cluster.join(directory)方法。
  • 这些代码在上节课有分析过。
  • cluster.join其实就是将Directory中的多个Invoker伪装成一个Invoker, 对上层透明,包含集群的容错机制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//对多个invoker进行组装
    directory.setRegistry(registry); //ZookeeperRegistry
    directory.setProtocol(protocol); //protocol=Protocol$Adaptive
    //url=consumer://192.168.111....
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    //会把consumer://192...  注册到注册中心
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        //zkClient.create()
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    //Cluster$Adaptive
    return cluster.join(directory);
}

directory.subscribe

订阅节点的变化,

  1. 当zookeeper上指定节点发生变化以后,会通知到RegistryDirectory的notify方法
  2. 将url转化为invoker对象

调用过程中invokers的使用

  • 再调用过程中,AbstractClusterInvoker.invoke方法中
public Result invoke(final Invocation invocation) throws RpcException {

    checkWhetherDestroyed();

    LoadBalance loadbalance;
    
    List<Invoker<T>> invokers = list(invocation); 
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

list方法

  • 从directory中获得invokers
protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
   List<Invoker<T>> invokers = directory.list(invocation);
   return invokers;
}

负载均衡LoadBalance

  • LoadBalance负载均衡, 负责从多个 Invokers中选出具体的一个Invoker用于本次调用,调用过程中包含了负载均衡的算法。

负载均衡代码访问入口

  • 在AbstractClusterInvoker.invoke中代码如下,通过名称获得指定的扩展点。RandomLoadBalance
public Result invoke(final Invocation invocation) throws RpcException {

    checkWhetherDestroyed();

    LoadBalance loadbalance;
    
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

AbstractClusterInvoker.doselect

  • 调用LoadBalance.select方法,将 invokers 按照指定算法进行负载
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.size() == 0)
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    // 如果只有两个invoker,退化成轮循
    if (invokers.size() == 2 && selected != null && selected.size() > 0) {
        return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
    }
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
    //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
    if( (selected != null && selected.contains(invoker))
            ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
        try{
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if(rinvoker != null){
                invoker =  rinvoker;
            }else{
                //看下第一次选的位置,如果不是最后,选+1位置.
                int index = invokers.indexOf(invoker);
                try{
                    //最后在避免碰撞
                    invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
                }catch (Exception e) {
                    logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
                }
            }
        }catch (Throwable t){
            logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
        }
    }
    return invoker;
} 
  • 默认情况下,LoadBalance使用的是Random算法,但是这个随机和我们理解上的随机还是不一样的,因为他还有个概念叫weight(权重)

RandomLoadBalance

  • 假设有四个集群节点A,B,C,D,对应的权重分别是1,2,3,4,
  • 那么请求到A节点的概率就为1/(1+2+3+4) = 10%.B,C,D节点依次类推为20%,30%,40%.
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // 总个数
    int totalWeight = 0; // 总权重
    boolean sameWeight = true; // 权重是否都一样
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight; // 累计总权重
        if (sameWeight && i > 0
                && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false; // 计算所有权重是否一样
        }
    }
    if (totalWeight > 0 && ! sameWeight) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offset = random.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // 如果权重相同或权重为0则均等随机
    return invokers.get(random.nextInt(length));
}

© 著作权归作者所有

Java搬砖工程师
粉丝 37
博文 648
码字总数 343937
作品 0
南京
程序员
私信 提问
dubbo源码分析系列——dubbo-cluster模块源码分析

模块功能介绍 该模块的使用介绍请参考dubbo官方用户手册如下章节内容。 集群容错 负载均衡 路由规则 配置规则 注册中心参考手册 其中注册中心其实是对于目录服务的一种实现方式,本文不会对注...

杨武兵
2016/06/12
4.9K
0
Dubbo分析之Cluster层

系列文章 Dubbo分析Serialize层 Dubbo分析之Transport层 Dubbo分析之Exchange 层 Dubbo分析之Protocol层 Dubbo分析之Cluster层 Dubbo分析之Registry层 前言 紧接上文Dubbo分析之Protocol层,...

ksfzhaohui
2018/11/21
351
0
源码分析Dubbo Invoker概述----服务发现、集群、负载均衡、路由体系

Invoker,负载网络调用组件,底层依懒与网络通信,Invoker主要负责服务调用,自然与路由(比如集群)等功能息息相关,本节先从整体上把控一下Dubbo服务调用体系,服务发现、集群、负载均衡、路...

丁威
10/29
0
0
dubbo源码解析-集群容错架构设计

前言 本来是想把整个dubbo源码解析一次性弄完,再做成一个系列来发布的,但是正巧最近有位好朋友要去杭州面试,就和我交流了一下。本着对dubbo源码略有心得的心态,在交流过程中也发表了个人...

微笑向暖wx
2018/11/16
15
0
dubbo源码分析-集群容错(二)

FailbackClusterInvoker FailbackClusterInvoke是失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。 FailsafeClusterInvoker FailsafeClu...

秦帝国三川郡守
03/07
17
0

没有更多内容

加载失败,请刷新页面

加载更多

使用原生css+js+html实现打印A4纸张的功能页面

有时候我们需要使用html+css实现打印A4纸张的功能页面,以下代码实现 <!DOCTYPE html><html lang="zh-CN"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatibl......

b0cloud
19分钟前
2
0
读组件化之MGJRouter源码第二次的收获与思考

上一次我们写好了一个自定义的 路由类 ,然后我们来制作自己的 库 ,可以用来被 pod 引入 : 库的制作参考:https://www.jianshu.com/p/928d2ab053be 以下是我创建的: 利用上篇提到的 ,组件...

T型人才追梦者
20分钟前
1
0
spring cache、ehcache的使用及集成

项目中需要加缓存,故学习了 1、spring cache、ehcache的使用及集成 2、缓存的命中率等统计数据 一、spring cache 1、概述 Spring 3.1 引入了基于注解(annotation)的缓存(cache)技术 2、...

qkKing
21分钟前
3
0
Windows 10上源码编译Poco并编写httpserver和tcpserver | compile and install poco cpp library on windows

本文首发于个人博客https://kezunlin.me/post/9587bb47/,欢迎阅读! compile and install poco cpp library on windows Series guide to compile and install poco cpp library on windows g......

kezunlin
22分钟前
2
0
if-else-if-else与switch的区别

if-else-if-else: 适合分支较少 判断条件类型不单一 支持取 boolean 类型的所有运算 满足条件即停止对后续分支语句的执行 switch: 适合分支较多 判断条件类型单一,JDK 1.7 之前仅支持 in...

ConstXiong
22分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部