文档章节

dubbo源码分析(5)

李白吃白菜
 李白吃白菜
发布于 2016/04/21 10:30
字数 1332
阅读 127
收藏 3
点赞 2
评论 0

前几篇已经分析了服务提供者和服务消费者加载以及启动,本篇将大致分析服务提供者和服务消费者是如何向注册中心注册的。

服务注册

对于服务提供方,它需要发布服务,而且由于应用系统的复杂性,服务的数量、类型也不断膨胀;对于服务消费方,它最关心如何获取到它所需要的服务,而面对复杂的应用系统,需要管理大量的服务调用。而且,对于服务提供方和服务消费方来说,他们还有可能兼具这两种角色,即既需要提供服务,有需要消费服务。通过将服务统一管理起来,可以有效地优化内部应用对服务发布/使用的流程和管理。服务注册中心可以通过特定协议来完成服务对外的统一。

dubbo提供的注册中心,查看源码有以下几种类型

服务首先暴露在服务端,然后调用Registry的register方法在注册中心注册服务,然后用户通过配置文件中配置的service的url去subscribe(订阅服务),Registry接收到订阅消息后会往url对应的的List<NotifyListener>中塞入当前NotifyListener,反之从这个list中移除listener就是取消订阅。registry会调用据consumer的订阅情况调用notify方法推送服务列表给Consumer。

以下实例我们以zookeeper注册中心为实例来分析,其他几种也比较类型,没有用到过。暂时忽略。

根据dubbo扩展机制,前篇文章中提到过,这里不再说明

文件中内容:

zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory

首先会执行ZookeeperRegistryFactory类中的createRegistry(URL url),然后调用ZookeeperRegistry中的构造函数

代码如下:

  public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        //如果provider的url是“0.0.0.0”或者在参数中带anyHost=true则抛出异常注册地址不存在
        if (url.isAnyHost()) {
    		throw new IllegalStateException("registry address == null");
    	}
    	//服务分组(默认“dubbo”) 
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (! group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
            	if (state == RECONNECTED) {
	            	try {
			     recover();
			} catch (Exception e) {
			      logger.error(e.getMessage(), e);
			}
            	}
            }
        });
    }

服务提供者(provider)初始化时会调用doRegister方法向注册中心发起注册

 protected void doRegister(URL url) {
        try {
           //连接注册中心
           zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

服务端注册之后,那么客户端又是怎么调用的呢?

服务消费者在初始化ConsumerConfig时会调用RegistryProtocol的refer方法进一步调用RegistryDirectory的subscribe方法最终调用ZookeeperRegistry的subscribe方法向注册中心订阅服务。

RegistryProtocol中refer方法

 @SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
      url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
        	return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        return doRefer(cluster, registry, type, url);
    }

FailBackRegistry的subscribe方法

 @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 向服务器端发送订阅请求
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && urls.size() > 0) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // 如果开启了启动时检测,则直接抛出异常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // 将失败的订阅请求记录到失败列表,定时重试
            addFailedSubscribed(url, listener);
        }
    }

然后,又调用Zookeeper中的doSubscribe方法

   protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            //如果provider的service接口配置是“*”
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                //获取服务分组根路径
                String root = toRootPath();
                //获取服务的listener
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    //如果没有则创建一个
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                //如果没有子监听器则创建
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
								child = URL.decode(child);
                                if (! anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //向服务器订阅服务,注册中心会调用NotifyListener的notify函数返回服务列表
                zkClient.create(root, false);
                //获取服务地址列表
                List<String> services = zkClient.addChildListener(root, zkListener);
                //如果存在服务 
                if (services != null && services.size() > 0) {
                    for (String service : services) {
			service = URL.decode(service);
			anyServices.add(service);
			//如果serviceInterface是“*”则从分组根路径遍历service并订阅所有服务  
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
            //如果serviceInterface不是“*”则创建Zookeeper客户端索取服务列表,并通知(notify)消费者(consumer)这些服务可以用了 
                List<URL> urls = new ArrayList<URL>();
                //获取类似于http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    //获取ChildListener
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                            	ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //创建Zookeeper客户端
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                    	urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //提醒消费者  
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

 至此,Dubbo的源码分析结束。如有不对的地方,敬请指教!

© 著作权归作者所有

共有 人打赏支持
李白吃白菜
粉丝 11
博文 33
码字总数 18863
作品 0
朝阳
Dubbo 实现原理与源码解析系列 —— 精品合集

摘要: 原创出处 http://www.iocoder.cn/Dubbo/good-collection/ 「芋道源码」欢迎转载,保留摘要,谢谢! 1.【芋艿】精尽 Dubbo 原理与源码专栏 2.【老徐】RPC 专栏 3.【肥朝】Dubbo 源码解析...

芋道源码掘金Java群217878901 ⋅ 今天 ⋅ 0

dubbo中的那些“坑"(3)-netty4-rpc网络接口中的高并发的bug

在几个月前改造dubbo时,netty4已经稳定很久了,一时手痒,按照netty3-rpc的源码克隆了一套netty4,在修正了大量的包、类型不同之后,基本保持了netty3的风格,并发量小或者数据包很小时,一...

阿阮 ⋅ 2014/12/02 ⋅ 10

dubbo源码解析-router

前言 估算了一下,里面涉及的东西还是比较多的.比如谈到框架的时候,设计模式都是一个老生常谈的话题,再比如我们开发中我们不常用的一些概念,、,以及和zookeeper相关的一些知识,比如的使用,这些...

肥朝 ⋅ 2017/09/30 ⋅ 0

百度、阿里、腾讯、京东、大型互联网分布式架构必备技能

分布式架构 迎接高并发大数据的挑战,从深度到广度完善知识体系,成为下一个互联网高薪人才。 理论结合实战,透彻理解分布式架构及其解决方案。 面向人群 1、工作1-5年需要突破瓶颈; 2、传统...

Java高级架构 ⋅ 2017/12/21 ⋅ 0

dubbo请求处理线程模型实现分析

问题的由来: 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识, 则直接在 IO 线程上处理更快,因为减少了线程池调度。 但如果事件处理逻辑较慢,或者需...

wannshan ⋅ 03/30 ⋅ 0

dubbo源码解析-spi(二)

前言 上一篇简单的介绍了的基本一些概念,在末尾也提到了,对jdk的spi进行了一些改进,具体改进了什么,来看看文档的描述 JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很...

肥朝 ⋅ 01/06 ⋅ 0

阿里巴巴Dubbo实现的源码分析

Dubbo概述 Dubbo是阿里巴巴开源出来的一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及作为SOA服务治理的方案。它的核心功能包括: #remoting:远程通讯基础,提供对...

candies ⋅ 2014/02/22 ⋅ 0

阿里巴巴Dubbo实现的源码分析[转]

Dubbo概述 Dubbo是阿里巴巴开源出来的一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及作为SOA服务治理的方案。它的核心功能包括: #remoting:远程通讯基础,提供对...

如何让他和 ⋅ 2016/08/09 ⋅ 0

dubbo源码解析-spi(五)

前言 之前对dubbo的进行了四篇的分享.大家对这个概念有了一些初步的了解.谈到编程水平如何进阶,大家可能都会异口同声的说出三个字,.但是我却始终认为,编程光,是永远学不会的.关键还是要多动手...

肥朝 ⋅ 04/15 ⋅ 0

有经验JAVA程序员如何提升自己?

具有一到五年开发经验 需要学习内容很多 JVM/分布式/高并发/性能优化/Spring MVC/Spring Boot/Spring Cloud/MyBatis/Netty源码分析等等等 01、透彻理解Tomcat原理手写动静态资源的实现 02、分...

阿阳啊啊 ⋅ 2017/11/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

6.1 压缩打包介绍 6.2 gzip压缩工具 6.3 bzip2压缩工具 6.4 xz压缩工具

压缩打包介绍 使用压缩工具的好处: 使用压缩文件,不仅可以节省磁盘空间,而且在传输时还能节省网络宽带。 我们通常讲的家用宽带和机房宽带100M是有区别的: 机房宽带的上行和下行都是100M,...

Linux_老吴 ⋅ 27分钟前 ⋅ 0

SpringBoot热部署加持

概述 进行SpringBoot的Web开发过程中,我们很多时候经常需要重启Web服务器才能保证修改的 源代码文件、或者一些诸如xml的配置文件、以及一些静态文件生效,这样耗时又低效。所谓的热部署指的...

CodeSheep ⋅ 34分钟前 ⋅ 0

OSChina 周六乱弹 —— 假如你被熊困到树上

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @小小编辑:推荐歌曲《如果写不出好的和弦就该在洒满阳光的钢琴前一起吃布丁》 《如果写不出好的和弦就该在洒满阳光的钢琴前一起吃布丁》- 谢...

小小编辑 ⋅ 今天 ⋅ 5

vbs 取文件大小 字节

dim namedim fs, s'name = Inputbox("姓名")'msgbox(name)set fs = wscript.createobject("scripting.filesystemobject") 'fs为FSO实例if (fs.folderexists("c:\temp"))......

vga ⋅ 今天 ⋅ 1

高并发之Nginx的限流

首先Nginx的版本号有要求,最低为1.11.5 如果低于这个版本,在Nginx的配置中 upstream web_app { server 到达Ip1:端口 max_conns=10; server 到达Ip2:端口 max_conns=10; } server { listen ...

算法之名 ⋅ 今天 ⋅ 0

Spring | IOC AOP 注解 简单使用

写在前面的话 很久没更新笔记了,有人会抱怨:小冯啊,你是不是在偷懒啊,没有学习了。老哥,真的冤枉:我觉得我自己很菜,还在努力学习呢,正在学习Vue.js做管理系统呢。即便这样,我还是不...

Wenyi_Feng ⋅ 今天 ⋅ 0

博客迁移到 https://www.jianshu.com/u/aa501451a235

博客迁移到 https://www.jianshu.com/u/aa501451a235 本博客不再更新

为为02 ⋅ 今天 ⋅ 0

win10怎么彻底关闭自动更新

win10自带的更新每天都很多,每一次下载都要占用大量网络,而且安装要等得时间也蛮久的。 工具/原料 Win10 方法/步骤 单击左下角开始菜单点击设置图标进入设置界面 在设置窗口中输入“服务”...

阿K1225 ⋅ 今天 ⋅ 0

Elasticsearch 6.3.0 SQL功能使用案例分享

The best elasticsearch highlevel java rest api-----bboss Elasticsearch 6.3.0 官方新推出的SQL检索插件非常不错,本文一个实际案例来介绍其使用方法。 1.代码中的sql检索 @Testpu...

bboss ⋅ 今天 ⋅ 0

informix数据库在linux中的安装以及用java/c/c++访问

一、安装前准备 安装JDK(略) 到IBM官网上下载informix软件:iif.12.10.FC9DE.linux-x86_64.tar放在某个大家都可以访问的目录比如:/mypkg,并解压到该目录下。 我也放到了百度云和天翼云上...

wangxuwei ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部