文档章节

spring cloud(第二部)服务注册与发现实现原理

白中墨
 白中墨
发布于 06/10 13:11
字数 2052
阅读 11
收藏 0
  • eureka注册中心的使用

  1. 启动eureka server

    1.1、引入pom,本示例使用的spring.cloud版本是:Finchley.SR2

                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring.cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
        </dependencies>

    1.2、server集群模式配置,本例,启动三个ereka集群,配置如下:
    node1配置:

    # Eureka 客户端配置
    eureka:
      client:
        service-url:
          defaultZone: http://node2:10002/eureka/,http://node3:10003/eureka/
        #本客户端是否注册至eureka
        register-with-eureka: false
        #本客户端是否从eureka获取服务列表
        fetch-registry: false
      instance:
        # 配置通过主机名方式注册
        hostname: node1
        # 配置实例编号
        instance-id: ${eureka.instance.hostname}:${server.port}:@project.version@
      # 集群节点之间读取超时时间。单位:毫秒
      server:
        peer-node-read-timeout-ms: 1000
    # 服务端口号
    server:
      port: 10001
    node2配置:
    # Eureka 客户端配置
    eureka:
      client:
        service-url:
          defaultZone: http://node1:10001/eureka/,http://node3:10003/eureka/
        #本客户端是否注册至eureka
        register-with-eureka: false
        #本客户端是否从eureka获取服务列表
        fetch-registry: false
      instance:
        # 配置通过主机名方式注册
        hostname: node2
        # 配置实例编号
        instance-id: ${eureka.instance.hostname}:${server.port}:@project.version@
      # 集群节点之间读取超时时间。单位:毫秒
      server:
        peer-node-read-timeout-ms: 1000
    # 服务端口号
    server:
      port: 10002
    node3配置:
    # Eureka 客户端配置
    eureka:
      client:
        service-url:
          defaultZone: http://node1:10001/eureka/,http://node2:10002/eureka/
        #本客户端是否注册至eureka
        register-with-eureka: false
        #本客户端是否从eureka获取服务列表
        fetch-registry: false
      instance:
        # 配置通过主机名方式注册
        hostname: node3
        # 配置实例编号
        instance-id: ${eureka.instance.hostname}:${server.port}:@project.version@
      # 集群节点之间读取超时时间。单位:毫秒
      server:
        peer-node-read-timeout-ms: 1000
    # 服务端口号
    server:
      port: 10003
    @EnableEurekaServer
    @SpringBootApplication
    public class EurekaServiceApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(EurekaServiceApplication.class, args);
    
        }
    
    }
    补充说明:每一个eureak服务,需要指定出其他的服务节点名字,组成集群组,经过以上配置我们分别启动三个服务的main函数,就可以把这组有三个节点node1、node2、node3组成的集群启动起来,对外提供注册服务
  2. eureka client使用
    所有与eureka server进行交互的端都可以成为客户端,通过eureka server,我们可以完成,注册服务,获取服务列表,下面这个例子是演示如何完成服务的注册。
    2.1、引入pom配置
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    2.2、注册中心配置
    spring:
      application:
        name: orderService
    server:
      port: 8761
    
    eureka:
      instance:
        hostname: myapp
      client:
        registerWithEureka: true
        fetchRegistry: true
        serviceUrl:
          defaultZone: http://node2:10002/eureka/,http://node3:10003/eureka/,http://node1:10001/eureka/
    2.3、启动服务注册
    //@EnableDiscoveryClient
    //@EnableEurekaClient
    @SpringBootApplication
    public class OrderApplication {
    
        /**
            * @MethodName: main
            * @Description: TODO(这里用一句话描述这个方法的作用)
        				* @param @param args
            * @return void
            * @throws
            */
        
    
        public static void main(String[] args) {
            //DiscoveryClient tt = null;
            SpringApplication.run(OrderApplication.class, args);
        }
    
    }

    补充下,我们在配置文件中指定了,该客户端需要向server端进行注册,并从server端获取服务列表,所以运行的main函数中,可以不需要做@EnableEurekaClient或@EnableDiscoveryClient的注解!

  • spring cloud注册发现,源码解析
    首先,我们跟踪一下注册功能的入口,做为切入点,观察eureka整个的注册过程是如何完成的!
    1、EurekaClientAutoConfiguration是一个配置初始化的类,通过这个类,在服务启动过程中完成注册
    		@Bean(destroyMethod = "shutdown")
    		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
    		@org.springframework.cloud.context.config.annotation.RefreshScope
    		@Lazy
    		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
    			manager.getInfo(); // force initialization
    			return new CloudEurekaClient(manager, config, this.optionalArgs,
    					this.context);
    		}

    2、CloudEurekaClient继承自DiscoveryClient,在DiscoveryClient完成核心的注册流程,如下

        @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
    
            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());
    
            fetchRegistryGeneration = new AtomicLong(0);
    
            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
            if (config.shouldFetchRegistry()) {
                //如果开启了从eureka获取服务列表,创建列表更新的,健康监控
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            if (config.shouldRegisterWithEureka()) {
                //如果开启了注册功能,创建一个eureka之间心跳监控
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
            //没有开启获取注册列表和服务注册功能,直接返回
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, this.getApplications().size());
    
                return;  // no need to setup up an network tasks and we are done
            }
            //下面是开启了服务注册和列表查询
            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                //这是一个定时任务,分配了两个调度任务,一个是给心跳维持的线程池,一个是给服务列表刷新的线程池
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
                //心跳维持线程池,通过线程池方式实现了隔离
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
                //列表刷新的线程池,通过线程池刷新了隔离
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }
    
            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {//这里完成启动的时候注册,调用远程的eureka server,如:http://node1:10001/eureka/apps/,通过jersey实现rest调用,详细的注册代码,见下方
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            //完成调度任务的初始化,具体就是本地服务列表刷新任务、心跳监测任务的初始化
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
        }

    3、注册的具体动作:参见AbstractJerseyEurekaHttpClient类的register方法,其实就是发送的一个http请求,如下:

        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }

    4、心跳维持和缓存刷新,这一步就是在initScheduleTasks完成的
    心跳维持的线程池,首次注册完成之后,如何保证这个注册的’有效性‘,所以需要通过心跳维持的线程(HeartbeatThread)做续租,整个过程就是通过前面提到的定时任务,定时去和eureka server进行通信,告诉它服务还在,服务是有效的!具体的’续租‘逻辑如下:

        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }

    上面提到了服务列表刷新线程,如何保证,本地的服务列表是有效的:即如下场景
    4.1、A服务首次启动,注册至eureka的server
    4.2、eureka client通过CacheRefreshThread获取服务列表
    4.3、A服务宕机,未能及时通过HeartbeatThread向server做’续租‘
    4.4、CacheRefreshThread获取最新的服务列表,最新的服务列表不包含A服务
    CacheRefreshThread实现服务列表刷新逻辑如下:

        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }


     

© 著作权归作者所有

白中墨
粉丝 1
博文 26
码字总数 39237
作品 0
昌平
私信 提问
spring cloud微服务分布式云架构 - Spring Cloud简介

Spring Cloud是一系列框架的有序集合。利用Spring Boot的开发模式简化了分布式系统基础设施的开发,如服务发现、注册、配置中心、消息总线、负载均衡、断路器、数据监控等(这里只简单的列了...

明理萝
2018/11/01
0
0
(一)构建spring cloud微服务分布式云架构 - Spring Cloud简介

Spring Cloud是一系列框架的有序集合。利用Spring Boot的开发模式简化了分布式系统基础设施的开发,如服务发现、注册、配置中心、消息总线、负载均衡、断路器、数据监控等(这里只简单的列了...

SpringCloud关注者
2018/09/26
360
1
(一)spring cloud微服务分布式云架构-Spring Cloud简介

Spring Cloud是一系列框架的有序集合。利用Spring Boot的开发模式简化了分布式系统基础设施的开发,如服务发现、注册、配置中心、消息总线、负载均衡、断路器、数据监控等(这里只简单的列了...

itcloud
2018/11/22
0
0
(一)spring cloud微服务架构b2b2c电子商务 - Spring Cloud简介

电子商务平台源码请加企鹅求求:一零三八七七四六二六。用java实施的电子商务平台太少了,使用spring cloud技术构建的b2b2c电子商务平台更少,大型企业分布式互联网电子商务平台,java 推出P...

明理萝
05/23
0
0
说一下Dubbo 的工作原理?注册中心挂了可以继续通信吗?

面试题 说一下的 dubbo 的工作原理?注册中心挂了可以继续通信吗?说说一次 rpc 请求的流程? 面试官心理分析 MQ、ES、Redis、Dubbo,上来先问你一些思考性的问题、原理,比如 kafka 高可用架...

李红欧巴
04/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Taro ScrollView 组件的 scrollTop 属性是个坑

官方issue:ScrollView设置scrollTop没效果 同样的,设置 scrollTop=0 并不能实现置顶,官方回复早就修复了,我的 Taro 版本已经是最新的,然而并未修复。 万能的评论区,给出了失效的原因。...

dkvirus
19分钟前
1
0
Qt那些事0.0.21

这次还是关于PRO文件中QMAKE_POST_LINK的故事。 平时都是使用VS2015作为编译器,恰巧想用MinGW编一版程序,结果偏偏出现了错误。话说测试的这个项目可是在Linux下(fodera 20)可以正确编译通...

Ev4n
29分钟前
0
0
OSChina 周六乱弹 —— 抖音外放 亲妈下葬。

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @巴拉迪维 :一直没想明白黎明是怎么混进「四大天王」的,直到最近网易云音乐心动模式开启之后 #今日歌曲推荐# 《那有一天不想你》- 黎明 手机...

小小编辑
今天
316
8
Linux使用源码包安装软件

前言: 最近整理一些以前的学习笔记。 过去都是存储在本地,此次传到网络留待备用。 源码包 Linux软件多数免费、开源,是开发人员编写的,具有很强可读性的一组相关代码文本。 源码包 --> 编...

迷失De挣扎
今天
6
0
IPv4如何转换为IPv6?

ipv6已经逐渐在应用,现在已经有很多的运营商支持ipv6,前天我们也发布了如何让电脑使用ipv6地址?有很多朋友在问?ipv6有什么作用,它的表示方式是什么,今天我们来一起来详细了解下ipv6相关计...

xiangyunyan
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部