文档章节

Nacos配置中心-源码解析

o
 osc_ogi0qclx
发布于 2019/08/24 14:58
字数 1453
阅读 22
收藏 0

「深度学习福利」大神带你进阶工程师,立即查看>>>

一:关于Nacos的思考

     首先思考一个问题,Nacos作为配置中心,Nacos 客户端是怎么实时获取到 Nacos 服务端的最新数据?

     其实客户端和服务端之间的数据交互,无外乎两种情况:

     1.服务端推数据给客户端

     2.客户端从服务端拉数据

     zk作为配置中心,基于zk的watcher机制,配置发生变化通知客户端,Nacos也是同样的原理吗?

二:Nacos的源码解析

      看看Nacos是如何获取服务端的最新数据

       

      createConfigService的作用:通过反射创建NacosConfigService

    

   public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }
View Code

     NacosConfigService的构造函数

     

    new MetricsHttpAgent(new ServerHttpAgent(properties))作用:MetricsHttpAgent包装ServerHttpAgent(装饰器模式)为了做一些统计等功能。
agent.start()作用 : 监听服务器列表是否变化

      

public class MetricsHttpAgent implements HttpAgent {
    private HttpAgent httpAgent;

    public MetricsHttpAgent(HttpAgent httpAgent) {
        this.httpAgent = httpAgent;
    }

    @Override
    public void start() throws NacosException {
        httpAgent.start();
    }

    @Override
    public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
        Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("GET", path, "NA");
        HttpResult result = null;
        try {
            result = httpAgent.httpGet(path, headers, paramValues, encoding, readTimeoutMs);
        } catch (IOException e) {
            throw e;
        } finally {
            timer.observeDuration();
            timer.close();
        }

        return result;
    }

    @Override
    public HttpResult httpPost(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
        Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("POST", path, "NA");
        HttpResult result = null;
        try {
            result = httpAgent.httpPost(path, headers, paramValues, encoding, readTimeoutMs);
        } catch (IOException e) {
            throw e;
        } finally {
            timer.observeDuration();
            timer.close();
        }

        return result;
    }

    @Override
    public HttpResult httpDelete(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
        Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("DELETE", path, "NA");
        HttpResult result = null;
        try {
            result = httpAgent.httpDelete(path, headers, paramValues, encoding, readTimeoutMs);
        } catch (IOException e) {

            throw e;
        } finally {
            timer.observeDuration();
            timer.close();
        }

        return result;
    }

    @Override
    public String getName() {
        return httpAgent.getName();
    }

    @Override
    public String getNamespace() {
        return httpAgent.getNamespace();
    }

    @Override
    public String getTenant() {
        return httpAgent.getTenant();
    }

    @Override
    public String getEncode() {
        return httpAgent.getEncode();
    }
}
View Code
      new ClientWorker(agent, configFilterChainManager, properties)的作用:线程池轮询服务器

       

    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);

        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }
View Code
    checkConfigInfo作用:检查配置信息

     

    public void checkConfigInfo() {
        // 分任务
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
View Code
     LongPollingRunnable:run方法是一个长轮询服务端过程,大致分为四块部分
第一部分:检查本地配置相关

       

        checkLocalConfig作用:判断本地配置是否存在,是否有变更,dataId=“example”和group=“DEFAULT_GROUP”在Windows环境下默认配置路径(C:\Users\Administrator\nacos\config\fixed-localhost_8848_nacos\data\config-data\DEFAULT_GROUP\example)。

       

private void checkLocalConfig(CacheData cacheData) {
        final String dataId = cacheData.dataId;
        final String group = cacheData.group;
        final String tenant = cacheData.tenant;
        File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);

        // 没有 -> 有
        if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);

            LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
            return;
        }

        // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
        if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
            cacheData.setUseLocalConfigInfo(false);
            LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
            return;
        }

        // 有变更
        if (cacheData.isUseLocalConfigInfo() && path.exists()
            && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        }
    }
View Code

      第二部分:检查server端的变更

          

     第三部分:如果server端有变更,更新CacheData的content

         

     第四部分:检查md5的值是否改变,即配置是否改变,选择唤醒listener回调

          

    checkListenerMd5作用:检查CacheData的md5和wrap(listener的包装器)的md5是否一致,如果不一致唤醒回调

          

        sageNotifyListener作用 : 触发listener的回调函数

       

          配置中心的完整流程已经分析完毕了,可以发现,Nacos 并不是通过推的方式将服务端最新的配置信息发送给客户端的,而是客户端维护了一个长轮询的任务,定时去拉取发生变更的配置信息,然后将最新的数据推送给 Listener 的持有者。

三:Nacos配置中心总结归纳

      1. 关闭Nacos服务端,删除本地配置,启动测试类,依然能获取配置信息,为什么呢?

          跟踪代码 String config = configService.getConfig("example", "DEFAULT_GROUP", 10)

          

            当前环境下快照地址为:C:\Users\Administrator\nacos\config\fixed-localhost_8848_nacos\snapshot\DEFAULT_GROUP\example

        2.客户端拉取服务端的数据与服务端推送数据给客户端相比,优势在哪呢,为什么 Nacos 不设计成主动推送数据,而是要客户端去拉取呢?

          如果用推的方式,服务端需要维持与客户端的长连接,这样的话需要耗费大量的资源,并且还需要考虑连接的有效性,例如需要通过心跳来维持两者之间的连接。而用拉的方式,客户端只需要通过一个无状态的 http 请求即可获取到服务端的数据。       

        3. Nacos实现配置中心的原理?      

           客户端是通过一个定时任务来检查自己监听的配置项的数据,一旦本地配置或者服务端的数据发生变化时,客户端将会获取到最新的数据(跟踪checkUpdateDataIds方法可以明白),优先本地配置,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调。考虑到服务端故障的问题,客户端将最新数据获取后会保存在本地的 snapshot 文件中。

        上述为本人阅读源码的理解,可能存在误差。

 


     



o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
记一次失败的Perl + Nginx + FastCGI 配置过程

这两天心血来潮,不知道为什么和 Perl + Nginx + FastCGI 配置 耗上了。但是失败了,记录如下: 1)安装Nginx 1.4.3 ,我的是WINDOWS 7 系统,修改配置文件如下: location ~ .(pl|cgi|perl)?...

通吃岛-低手哥
2013/10/27
1.7K
7
CDH5: 使用parcels配置lzo

一、Parcel 部署步骤 1 下载: 首先需要下载 Parcel。下载完成后,Parcel 将驻留在 Cloudera Manager 主机的本地目录中。 2 分配: Parcel 下载后,将分配到群集中的所有主机上并解压缩。 3 激...

cloud-coder
2014/07/01
6.8K
1
beego API开发以及自动化文档

beego API开发以及自动化文档 beego1.3版本已经在上个星期发布了,但是还是有很多人不了解如何来进行开发,也是在一步一步的测试中开发,期间QQ群里面很多人都问我如何开发,我的业余时间实在...

astaxie
2014/06/25
2.7W
22
Nutch学习笔记4-Nutch 1.7 的 索引篇 ElasticSearch

上一篇讲解了爬取和分析的流程,很重要的收获就是: 解析过程中,会根据页面的ContentType获得一系列的注册解析器, 依次调用每个解析器,当其中一个解析成功后就返回,否则继续执行下一个解...

强子哥哥
2014/06/26
712
0
程序猿媛一:Android滑动翻页+区域点击事件

滑动翻页+区域点击事件 ViewPager+GrideView 声明:博文为原创,文章内容为,效果展示,思路阐述,及代码片段。文尾附注源码获取途径。 转载请保留原文出处“http://my.oschina.net/gluoyer...

花佟林雨月
2013/11/09
4.2K
1

没有更多内容

加载失败,请刷新页面

加载更多

SQL 语句大全

点击上方“掌上编程”,选择“置顶或者星标” 优质文章第一时间送达! 一、基础 「1、说明:创建数据库」 CREATE DATABASE database-name    「2、说明:删除数据库」 drop database ...

GeneralMa
昨天
0
0
山东创睦网络科技有限公司:使用Python爬取全球新冠肺炎疫情数据

使用Python爬取全球新冠肺炎疫情数据 导入所需库包 获取实时数据的url 正式编写程序 查看输出结果 导入所需库包 在获取数据之前,我们需要先安装好所需的包requests和pandas: 1.如果是使用p...

osc_qv1fwke0
46分钟前
14
0
如何1年获得别人3年的工作经验(深度好文)

最近有同学问我,为什么你的工作年限不长,技术却这么厉害,我笑了笑,啥也没说。 我不是不想回答,是不知道怎么回答。在他们的定位可能就是,每方面都懂一点,遇到问题能够快速解决,就是比...

zhang_rick
今天
1
0
新基建带动行业

什么是“新基建”? 什么是“新基建”? 根据央视发布的信息来看,其涵盖了5G基站建设、新能源汽车充电桩、大数据中心、人工智能、工业互联网,特高压,城际以及城轨交通,涉及了七大领域和相...

osc_anefoz50
47分钟前
16
0
怕入错行?这群技术人写了本“择业指南”

计算机专业好找工作吗?哪些方向是当前的主流和热门方向呢? 计算机专业的你是不是还在为职业发展纠结犹豫呢? 刚经历完高考选专业的你是不是还在迷茫徘徊呢? 那么福利来啦! 《软件技术职业...

阿里云云栖号
47分钟前
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部