文档章节

Eureka应用注册与集群数据同步源码解析

Java学习录
 Java学习录
发布于 10/21 22:51
字数 1761
阅读 21
收藏 0

在之前的EurekaClient自动装配及启动流程解析一文中我们提到过,在构造DiscoveryClient类时,会把自身注册到服务端,本文就来分析一下这个注册流程

客户端发起注册
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

这个方法中包含的registrationClientinstanceInfo两个对象在之前的文章中都已经单独拿出来了:Eureka中重要的对象

服务端接受注册

服务端接受注册的Controller在ApplicationResource类中,这里需要注意的是普通客户端注册时其中参数isReplication为false,这个参数就是控制集群是否同步的表示

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // 一系列的参数校验
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // AWS的一些东西,不用细看
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
	//注册
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

接着往下看注册的处理逻辑

 public void register(final InstanceInfo info, final boolean isReplication) {
   // 租约过期时间
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
   // 注册
        super.register(info, leaseDuration, isReplication);
   //集群复制
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

Lease租约这个类之前也分析过了,不再展开了

服务端保存注册信息

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
		  //获取锁
            read.lock();
		  //获取该实例的注册信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
              //如果不存在则添加  
			  gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 当存在这个应用的注册信息时
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                //使用注册时间长的一方的应用信息
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {

                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
		  //创建租约
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
		  //如果存在租约则更新开始时间
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
			  //添加到最近注册队列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

           // 获得应用实例最终状态
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
  1. 实例信息是由一个map对象保存的,在这个map中,key是应用的appName,value是另外一个map,在这个map中key是应用的id,而value则是应用的租约信息,map对象如下:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 
registry        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. 大体流程为,先查看是否存在该appName的注册信息,如不存在则创建。
  2. 接着查看是否存在该id的注册信息,如果存在判断客户端的最后修改时间,记录最后修改的实例,如果不存在则设置自我保护模式的几个参数
  3. 根据实例创建对应的租约信息,然后添加到map对象中
  4. 添加到最近注册队列
  5. 添加到应用实例覆盖状态映射
  6. 设置应用实例最终状态,添加最近租约变更队列,设置缓存等
集群数据同步

isReplication属性为true的时候,就会牵扯到集群信息同步了

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info ,
                                  InstanceStatus newStatus , boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

关于PeerEurekaNode在之前的文章也提到了,保存了集群节点信息,这里可以看到是循环所有的集群节点,然后排除本身的信息之后调用了replicateInstanceActionsToPeers方法

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

这里重点关注Register分支

public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

在不关心Eureka自有的调度处理相关的前提下,核心代码是replicationClient.register(info),也就是说,如果当客户端注册时指定需要同步集群信息时,Eureka会把这个客户端再注册到它所在的集群其它的节点上

服务端发起集群数据同步

在之前得EurekaServer自动装配及启动流程解析一文中,我们提到过在初始化服务端的时候会从EurekaServer集群中同步数据,也就是下面这段代码:

public class EurekaServerBootstrap {   
    protected void initEurekaServerContext() throws Exception {
  //xxxx
    
                EurekaServerContextHolder.initialize(this.serverContext);
    
                log.info("Initialized server context");
    
                // 从其他 Eureka-Server 拉取注册信息
                int registryCount = this.registry.syncUp();
                this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
                // Register all monitoring statistics.
                EurekaMonitors.registerAllStats();
        }

数据同步的代码在syncUp

 public int syncUp() {
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
			  //未读取到注册信息则开始等待
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
		   // 获取注册信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
					  //判断是否可以注册,这里基于AWS环境的判断,如果不是AWS环境直接返回true,故不需要关心这个问题
                        if (isRegisterable(instance)) {
						  //注册
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

这里的处理流程就是当前EurekaServer获取到客户端的注册信息之后,就会再次调用上边咱们提到的服务端Controller调用的register方法,当然这次调用时isReplication参数就变为true了

1

© 著作权归作者所有

Java学习录

Java学习录

粉丝 1
博文 39
码字总数 47002
作品 2
朝阳
私信 提问
Eureka源码解析系列文章汇总

先看一张图0这个图是Eureka官方提供的架构图,整张图基本上把整个Eureka的核心功能给列出来了,当你要阅读Eureka的源码时可以参考着这个图和下方这些文章 EurekaServer EurekaServer就是我们...

Java学习录
10/30
15
0
深入理解Eureka之源码解析

转载请标明出处: http://blog.csdn.net/forezp/article/details/73017664 本文出自方志朋的博客 Eureka的一些概念 Register:服务注册 当Eureka客户端向Eureka Server注册时,它提供自身的元...

forezp
2017/06/11
0
0
白话SpringCloud | 第三章:服务注册与发现-高可用配置(Eureka)-下

前言 上一章节,讲解了在单机模式下的服务注册与发现的相关知识点及简单示例。而在实际生产或者在这种微服务架构的分布式环境中,需要考虑发生故障时,各组件的高可用。而其实高可用,我的简...

oKong
2018/09/09
629
0
微服务Springcloud超详细教程+实战(九)

本人正在找深圳Java实习工作,求大佬带飞 QQ:1172796094 如在文档中遇到什么问题请联系作者 —————————————————————————————————————— 八在审核中,请...

Java小表弟
2018/12/13
0
0
注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 Eureka 1.8.X 版本 1. 概述 2. Eureka-Client 发起全量获...

Java公众号_芋道源码_每日更新
2018/10/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

BigDecimal 去后面无用的0的方法

BigDecimal a=new BigDecimal("0.1000"); System.out.println(a.stripTrailingZeros().toPlainString());...

xiaodong16
18分钟前
5
0
JAVA--高级基础开发

[集合版双色球] 十二、双色球规则:双色球每注投注号码由6个红色球号码和1个蓝色球号码组成。红色球号码从1—33中选择;蓝色球号码从1—16中选择;请随机生成一注双色球号码。(要求同色号码...

李文杰-yaya
昨天
16
0
聊聊rocketmq broker的CONSUMER_SEND_MSG_BACK

序 本文主要研究一下rocketmq broker的CONSUMER_SEND_MSG_BACK CONSUMER_SEND_MSG_BACK rocketmq/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java public class......

go4it
昨天
3
0
API常见接口(下)

system类 StringBuilder和StringBuffer 包装类 1.System类 (java.lang包中) 提供了大量的静态方法,可以获取与系统相关的信息或系统级操作。 常用方法: public static long currentTimeMi...

Firefly-
昨天
4
0
MySQL系列:一句SQL,MySQL是怎么工作的?

对于MySQL而言,其实分为客户端与服务端。 服务端,就是MySQL应用,当我们使用net start mysql命令启动的服务,其实就是启动了MySQL的服务端。 客户端,负责发送请求到服务端并从服务端获取数...

杨小格子
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部