文档章节

dubbo源码分析系列——dubbo-register-api模块源码分析

杨武兵
 杨武兵
发布于 2016/06/13 11:41
字数 2312
阅读 2K
收藏 5

「深度学习福利」大神带你进阶工程师,立即查看>>>

核心类图

该图是包含了dubbo-registry-api和dubbo-registry-default两个模块整合的简化类图,只描述了核心的类与类的关系,为了清晰明了,去除了方法和属性的描述,也忽略了类所处的包,将其放在一个类图中。

注册中心核心的职责就是注册和注销URL,订阅和取消订阅URL,还有包括查询符合条件的已注册的URL列表等职责,类图中的接口和类就是围绕核心职责的实现。

核心源码分析

RegistryProtocol

该类是注册中心的协议,即协议名称为registry的协议。这是整个注册中心启用的入口,通过该类整合了Protocol、Cluster、Directory 和Registry这几个组件。因此它作为入口我们非常有必有学习它的源码。

当我们在配置发布服务和引用服务的时候,若配置使用了注册中心,则会将原来的URL替换为一个protocol名称为registry的URL,并且会保留原始的URL在新的URL参数中。则通过dubbo的SPI机制获取到的Protocol接口的实现类便是RegistryProtocol,因此变回进入到该类的核心方法中,我们一起来看看这些核心方法的源码实现。

发布服务方法export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
            	try {
            		exporter.unexport();
            	} catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
                try {
                	registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
                try {
                	overrideListeners.remove(overrideSubscribeUrl);
                	registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
            }
        };
    }

@SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

    /**
     * 根据invoker的地址获取registry实例
     * @param originInvoker
     * @return
     */
    private Registry getRegistry(final Invoker<?> originInvoker){
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryFactory.getRegistry(registryUrl);
    }

该方法的实现逻辑是:

  1. 会先调用private方法protocol实现本地服务发布。该方法是调用了protocol.export()方法实现真正的服务发布,而这个protocol属性是SPI机制中自动注入进来的,它注入的是原始的网络协议的实现类,比如默认的DubboProtocol。并且支持缓存,避免发布多次。
  2. 根据url获得Registry对象。会调用registryFactory.getRegistry(registryUrl)获得对象。而registryFactory属性也是通过SPI机制自动注入进来的。比如注入默认的DubboRegistryFactory对象。
  3. 注册提供者URL到registry中。调用registry.register(registedProviderUrl)实现提供者URL的注册。原始服务的URL被编码并作为名为export的参数加入到RegistryURL中。
  4. 订阅协议替换为provider的URL。调用方法registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)订阅URL,并添加监听器OverrideListener。该监听器主要监听变化,当检测到URL变化的时候会调用方法doChangeLocalExport更新服务发布。
  5. 最后返回一个代理原始exporter对象的代理Exporter对象。该对象在unexport方法同时会注销订阅,删除监听器等操作。

服务引用方法refer

 @SuppressWarnings("unchecked")
	public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
        	return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        return doRefer(cluster, registry, type, url);
    }


 private Cluster getMergeableCluster() {
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
    }
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
    }

发布服务流程

  1. 将原始URL的协议转为registry协议。
  2. 获得Registry对象。
  3. 调用doRefer()方法引用服务。
  4. 创建RegistryDirectory对象,并设置相关值。
  5. 构造消费者subscribeUrl。协议名称设置为consumer。
  6. 注册中心注册URL。代码为registry.register(subscribeUrl)。
  7. 目录服务注册URL。RegistryDirectory.subscribe();
  8. 调用cluster.join()方法返回一个支持集群功能的invoker。cluster.join(directory);

这里用到了RegistryDirectory,因此我们要继续探究一下该类的源码。

RegistryDirectory

Registry

该接口注册中心,它继承自接口Node和RegistryService,它本身没有定义任何方法。

public interface Registry extends Node, RegistryService {
}

RegistryService

该接口是注册中心服务,定义了注册中心的核心行为。

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.registry;

import java.util.List;

import com.alibaba.dubbo.common.URL;

/**
 * RegistryService. (SPI, Prototype, ThreadSafe)
 * 
 * @see com.alibaba.dubbo.registry.Registry
 * @see com.alibaba.dubbo.registry.RegistryFactory#getRegistry(URL)
 * @author william.liangf
 */
public interface RegistryService {

    /**
     * 注册数据,比如:提供者地址,消费者地址,路由规则,覆盖规则,等数据。
     * 
     * 注册需处理契约:<br>
     * 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>
     * 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>
     * 3. 当URL设置了category=routers时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>
     * 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>
     * 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
     * 
     * @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     */
    void register(URL url);

    /**
     * 取消注册.
     * 
     * 取消注册需处理契约:<br>
     * 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>
     * 2. 按全URL匹配取消注册。<br>
     * 
     * @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     */
    void unregister(URL url);

    /**
     * 订阅符合条件的已注册数据,当有注册数据变更时自动推送.
     * 
     * 订阅需处理契约:<br>
     * 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>
     * 2. 当URL设置了category=routers,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>
     * 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
     * 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
     * 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br>
     * 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
     * 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>
     * 
     * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @param listener 变更事件监听器,不允许为空
     */
    void subscribe(URL url, NotifyListener listener);

    /**
     * 取消订阅.
     * 
     * 取消订阅需处理契约:<br>
     * 1. 如果没有订阅,直接忽略。<br>
     * 2. 按全URL匹配取消订阅。<br>
     * 
     * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @param listener 变更事件监听器,不允许为空
     */
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 查询符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
     * 
     * @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
     * @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @return 已注册信息列表,可能为空,含义同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。
     */
    List<URL> lookup(URL url);

}

RegistryFactory

注册中心工厂类,用于获得注册中心对象。

@SPI("dubbo")
public interface RegistryFactory {

    /**
     * 连接注册中心.
     * 
     * 连接注册中心需处理契约:<br>
     * 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
     * 2. 支持URL上的username:password权限认证。<br>
     * 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
     * 4. 支持file=registry.cache本地磁盘文件缓存。<br>
     * 5. 支持timeout=1000请求超时设置。<br>
     * 6. 支持session=60000会话超时或过期设置。<br>
     * 
     * @param url 注册中心地址,不允许为空
     * @return 注册中心引用,总不返回空
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

该类是一个支持SPI扩展的类,默认的扩展实现是名称为dubbo的扩展实现类DubboRegistryFactory。

AbstractRegistry

该类是Registry接口的抽象实现。

杨武兵

杨武兵

粉丝 295
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
加载中
请先登录后再评论。
Netty那点事(三)Channel与Pipeline

Channel是理解和使用Netty的核心。Channel的涉及内容较多,这里我使用由浅入深的介绍方法。在这篇文章中,我们主要介绍Channel部分中Pipeline实现机制。为了避免枯燥,借用一下《盗梦空间》的...

黄亿华
2013/11/24
2W
22
访问安全控制解决方案

本文是《轻量级 Java Web 框架架构设计》的系列博文。 今天想和大家简单的分享一下,在 Smart 中是如何做到访问安全控制的。也就是说,当没有登录或 Session 过期时所做的操作,会自动退回到...

黄勇
2013/11/03
3.5K
6
Flappy Bird(安卓版)逆向分析(一)

更改每过一关的增长分数 反编译的步骤就不介绍了,我们直接来看反编译得到的文件夹 方法1:在smali目录下,我们看到org/andengine/,可以知晓游戏是由andengine引擎开发的。打开/res/raw/at...

enimey
2014/03/04
6K
18
beego API开发以及自动化文档

beego API开发以及自动化文档 beego1.3版本已经在上个星期发布了,但是还是有很多人不了解如何来进行开发,也是在一步一步的测试中开发,期间QQ群里面很多人都问我如何开发,我的业余时间实在...

astaxie
2014/06/25
2.7W
22
Nutch学习笔记4-Nutch 1.7 的 索引篇 ElasticSearch

上一篇讲解了爬取和分析的流程,很重要的收获就是: 解析过程中,会根据页面的ContentType获得一系列的注册解析器, 依次调用每个解析器,当其中一个解析成功后就返回,否则继续执行下一个解...

强子哥哥
2014/06/26
712
0

没有更多内容

加载失败,请刷新页面

加载更多

HTML5 视频和音频的常用方法

HTML5 中为视频 video 和音频 audio 元素,提供了属性、方法和事件。这两个元素的常用属性上一节我们已经讲过了,本节我们来讲一下这两个元素的方法。 视频和音频的常用方法 HTML5 中为 vide...

凌兮洛
13分钟前
5
0
Git应用详解第一讲:Git分区、配置与日志

Git应用详解第一讲:Git分区、配置与日志 前言 曾经听到过这样一句话:不会「git」就不要敲代码了。细细品味确实有其中的道理,可能是当事人代码被强行覆盖后的叹息吧! 因此,为了避免这种情...

osc_jhlfbvu7
13分钟前
13
0
HashMap、HashSet、HashTable比较

1.HashMap和HashTable区别 线程安全:HashMap线程不安全。而HashTable通过让get/put上锁达到线程安全,不过代价很大。 HashMap允许key/value为null(但只能有一个null键),而HashTable不允许...

曦鱼violet
14分钟前
10
0
SHELL脚本编程练习答案(多版本)

练习: 1、编写脚本 systeminfo.sh,显示当前主机系统信息,包括:主机名,IPv4地址,操作系统版本,内核 版本,CPU型号,内存大小,硬盘大小 #!/bin/bashYELLOW='\e[1;33m'RED='\e[1;31m'...

osc_0cugk2ks
15分钟前
9
0
盘点 35 个 Apache 顶级项目,我拜服了…

Apache 软件基金会 Apache 软件基金会,全称:Apache Software Foundation,简称:ASF,成立于 1999 年 7 月,是目前世界上最大的最受欢迎的开源软件基金会,也是一个专门为支持开源项目而生...

Java技术栈
16分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部