Nacos源码分析(注册发现、集群同步、心跳、Eureka对比)

原创
04/18 17:35
阅读数 6.9K

nacos-discoveryspring.factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

这里的核心类是NacosDiscoveryAutoConfiguration,它主要注册了NacosAutoServiceRegistration

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

 

NacosAutoServiceRegistration

继承了AbstractAutoServiceRegistration,它实现了ApplicationListener<WebServerInitializedEvent>,那么容器启动的最后阶段会去执行这里实现的onApplicationEvent方法。

	@Override
	@SuppressWarnings("deprecation")
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}

bind:

	@Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort()); // CAS修改端口
		this.start(); // 调用start方法做一些事情
	}

	public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}

可以看到start中会进行初始化的判断,然后调用register():

	@Override
	public void register(Registration registration) {

		String serviceId = registration.getServiceId(); // 获得服务id

		Instance instance = getNacosInstanceFromRegistration(registration); // 创建instance对象

		try {
			namingService.registerInstance(serviceId, instance); // 服务注册
			log.info("nacos registry, {} {}:{} register finished", serviceId,
					instance.getIp(), instance.getPort());
		}
		catch (Exception e) {
			log.error("nacos registry, {} register failed...{},", serviceId,
					registration.toString(), e);
		}
	}

registerInstance:

    @Override
    public void registerInstance(String serviceName, Instance instance) throws NacosException {
        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
    }

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) { // 是否是临时实例,默认true
            // 临时实例则构建心跳定时任务
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            long instanceInterval = instance.getInstanceHeartBeatInterval();
            // DEFAULT_HEART_BEAT_INTERVAL 默认5s
            beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
            // BeatReactor里面维护了一个ScheduledExecutorService,通过它添加定时任务
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        // 注册服务的逻辑
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

首先看下添加心跳机制的addBeatInfo:

    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
        // 直接添加一个定时任务BeatTask
        executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

    class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            // 向远程服务发送心跳请求,是一个PUT请求
            long result = serverProxy.sendBeat(beatInfo);
            // 获得下次执行时间并添加到线程池中
            long nextTime = result > 0 ? result : beatInfo.getPeriod();
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

serverProxy.registerService:

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);

        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));

        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }

可以看到也是向远程服务发送请求,只不过是POST类型。

 

至此客户端大概流程已分析完毕,接下来看下服务端逻辑:

 

InstanceController

可以看出是Restful风格的,一个实体一个方法,使用不同的请求方式区分增删改查

服务注册:com.alibaba.nacos.naming.controllers.InstanceController#register

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        // 将request转为Instance对象
        final Instance instance = parseInstance(request);
        // 服务注册
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

registerInstance:

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 根据命名空间和服务名构建service
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        // 获得service
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        // 添加实例主要逻辑
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

createEmptyService:

    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        // 根据namespaceId获得serviceMap,它是一个ConcurrentHashMap
        // 根据serviceName从中取出服务
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            // 服务为空则去构建
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            // 放入服务,初始化定时任务,入队
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

putServiceAndInit:

    private void putServiceAndInit(Service service) throws NacosException {
        // 向serviceMap放入service
        putService(service);
        // 构建心跳检查任务
        service.init();
        // 向队列中添加一个Service,有异步线程处理
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

心跳检查任务是一个ClientBeatCheckTask,主要逻辑:

            // 获得所有实例
            List<Instance> instances = service.allIPs(true);
            // 遍历实例集合
            // 当前时间 - 上次心跳时间 > 超时时间(默认15s)
            // 超时则将健康状态设置为false
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            // 通过deleteIp方法调用自己本机的过期接口

            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }

至此registerInstance方法中的createEmptyService逻辑已经分析完成,继续看addInstance的逻辑:

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        // 构建key,如果ephemeral为true则key中存在"ephemeral."
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            // 服务注册,使用CopyOnWrite思想
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            // 返回的instanceList是修改后的列表
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // key中存在"ephemeral."则consistencyService就是DistroConsistencyServiceImpl
            consistencyService.put(key, instances);
        }
    }

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put:

    public void put(String key, Record value) throws NacosException {
        // 实例放入队列
        onPut(key, value);
        // 如果是集群环境,将节点信息放入ConcurrentHashMap中
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }

onPut:

        notifier.addTask(key, DataOperation.CHANGE);


        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }

这里的tasks是

        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

这里我有个没想通的点,为什么要使用ArrayBlockingQueue?LinkedBlockingQueue性能更好才对

此处run方法会循环从队列中拿数据,拿到则去注册;因为是阻塞队列,队列为空时会睡眠,不存在CPU浪费的问题。

        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }

 

distroProtocol.sync:集群环境所用,将修改的信息放入Map中,异步线程会拿到它去调用远程接口同步数据

    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        // 获得所有节点遍历
        for (Member each : memberManager.allMembersWithoutSelf()) {
        // 节点信息封装为DistroKey
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // 节点信息放入ConcurrentHashMap中
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }

 

com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder

@Component
public class DistroTaskEngineHolder {
    
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
}

成员变量DistroDelayTaskExecuteEngine,它的父构造方法:

    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        // 向定时线程池添加ProcessRunnable
        processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }

ProcessRunnable逻辑:

    private class ProcessRunnable implements Runnable {

        @Override
        public void run() {
            try {
                processTasks();
            } catch (Throwable e) {
                getEngineLog().error(e.toString(), e);
            }
        }
    }

最终它会执行com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor#process,从上面的Map中取出信息放到一个队列中

com.alibaba.nacos.common.task.engine.TaskExecuteWorker.InnerWorker

    private class InnerWorker extends Thread {
        
        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }
        
        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("distro task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[DISTRO-FAILED] " + e.toString(), e);
                }
            }
        }
    }

从队列中拿出任务直接执行run方法,这里执行的是DistroSyncChangeTask中的逻辑:

    public void run() {
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
        try {
            String type = getDistroKey().getResourceType();
            DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
            distroData.setType(DataOperation.CHANGE);
            boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
            if (!result) {
                handleFailedTask();
            }
            Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
        } catch (Exception e) {
            Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
            handleFailedTask();
        }
    }

这里syncData调用远程接口/distro/datum同步数据。

 

DistroProtocol

构造方法

    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        this.distroConfig = distroConfig;
        startDistroTask();
    }

会调用startDistroTask():

    private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }

            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        GlobalExecutor.submitLoadDataTask(
            new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
    }

向线程池添加定时任务DistroLoadDataTask:

    public void run() {
        try {
            load();
            if (!checkCompleted()) {
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
        } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
        }
    }

    private void load() throws Exception {
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
                // 通过loadAllDataSnapshotFromRemote从远程拉取数据
                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
            }
        }
    }

    private boolean loadAllDataSnapshotFromRemote(String resourceType) {
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);

        for (Member each : memberManager.allMembersWithoutSelf()) {
            try {
                DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
                // 拉取数据的逻辑
                boolean result = dataProcessor.processSnapshot(distroData);
                // 如果拉取成功则直接返回,保证只从一台服务器拉取数据
                if (result) {
                    return true;
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
            }
        }
        return false;
    }

 

拉取数据会调用远程的com.alibaba.nacos.naming.controllers.DistroController#onSyncDatum

最终调用dataProcessor.processData:

    public boolean processData(DistroData distroData) {
        DistroHttpData distroHttpData = (DistroHttpData) distroData;
        Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
        onPut(datum.key, datum.value);
        return true;
    }

会调用onPut去注册实例。

 

服务发现

客户端:

com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String):

核心逻辑在getServiceInfo方法中的getServiceInfo0方法中:

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        // 从Map中获取服务
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            updatingMap.put(serviceName, new Object());
            // 如果为空则调用updateServiceNow,获得远程服务列表放入到Map中
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) {
            
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 发布定时任务,定时拉取列表信息去更新Map
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }

服务端:

服务端收到请求,进入com.alibaba.nacos.naming.controllers.InstanceController#list:

会调用doSrvIpxt,核心逻辑就是调用srvIPs方法:

    public List<Instance> srvIPs(List<String> clusters) {
        if (CollectionUtils.isEmpty(clusters)) {
            clusters = new ArrayList<>();
            clusters.addAll(clusterMap.keySet());
        }
        // 获得实例集合并返回
        return allIPs(clusters);
    }

    public List<Instance> allIPs(List<String> clusters) {
        List<Instance> result = new ArrayList<>();
        for (String cluster : clusters) {
            // 这里的信息是注册时候放入的
            Cluster clusterObj = clusterMap.get(cluster);
            if (clusterObj == null) {
                continue;
            }
            
            result.addAll(clusterObj.allIPs());
        }
        return result;
    }

 

 

和Eureka对比

首先给阿里的编码风格点赞,看着就是舒服,Eureka代码一大片逻辑都写在一起了,相比下Nacos看起来简洁清爽了许多

Eureka注册部分代码:

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 加读锁
            read.lock();
            // 获得微服务组
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 根据传入的id获得服务实例
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 如果存在则赋值给registrant
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } else {
                // 不存在,记录数量
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
            }
            // 使用registrant创建Lease
            // 会记录registrationTimestamp(服务注册时间) 
            // lastUpdateTimestamp(最后操作时间) duration(失效时间数)
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                // 设置恢复正常时的状态
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 放入微服务组中
            gMap.put(registrant.getId(), lease);
        } finally {
            read.unlock();
        }

Eureka是同步去注册的,注册时加读锁,Nacos注册时直接入队列,异步线程去进行注册,注册时使用CopyOnWrite空间换时间,提升注册并发

 

服务发现对比

Nacos获取服务实例直接取自ephemeralInstances,Eureka服务注册和发现时会加锁,为了降低锁竞争,有三级缓存

    // 无过期时间,保存服务信息的对外输出数据结构,定时从二级缓存拉取注册信息
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();

    // 为了降低注册表registry读写锁竞争,降低读取频率,本质上是 guava 的缓存,包含定时失效机制
    private final LoadingCache<Key, Value> readWriteCacheMap;

    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

读取顺序:只读缓存->读写缓存->真实数据

只读缓存的数据只会来源于读写缓存,而且没有提供主动更新API。

读写缓存是使用Guava实现的本身设置了 guava 的失效机制,隔一段时间后自己自动失效。

定时更新一级缓存的时候,会读取二级缓存,如果二级缓存没有数据,也会触发load,拉取registry的注册数据

展开阅读全文
打赏
0
6 收藏
分享
加载中
更多评论
打赏
0 评论
6 收藏
0
分享
返回顶部
顶部