nacos-discovery中spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
这里的核心类是NacosDiscoveryAutoConfiguration,它主要注册了NacosAutoServiceRegistration
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
NacosAutoServiceRegistration
继承了AbstractAutoServiceRegistration,它实现了ApplicationListener<WebServerInitializedEvent>,那么容器启动的最后阶段会去执行这里实现的onApplicationEvent方法。
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
bind:
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort()); // CAS修改端口
this.start(); // 调用start方法做一些事情
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
可以看到start中会进行初始化的判断,然后调用register():
@Override
public void register(Registration registration) {
String serviceId = registration.getServiceId(); // 获得服务id
Instance instance = getNacosInstanceFromRegistration(registration); // 创建instance对象
try {
namingService.registerInstance(serviceId, instance); // 服务注册
log.info("nacos registry, {} {}:{} register finished", serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
registerInstance:
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) { // 是否是临时实例,默认true
// 临时实例则构建心跳定时任务
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
// DEFAULT_HEART_BEAT_INTERVAL 默认5s
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
// BeatReactor里面维护了一个ScheduledExecutorService,通过它添加定时任务
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 注册服务的逻辑
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
首先看下添加心跳机制的addBeatInfo:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
// 直接添加一个定时任务BeatTask
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 向远程服务发送心跳请求,是一个PUT请求
long result = serverProxy.sendBeat(beatInfo);
// 获得下次执行时间并添加到线程池中
long nextTime = result > 0 ? result : beatInfo.getPeriod();
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
serverProxy.registerService:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
可以看到也是向远程服务发送请求,只不过是POST类型。
至此客户端大概流程已分析完毕,接下来看下服务端逻辑:
InstanceController
可以看出是Restful风格的,一个实体一个方法,使用不同的请求方式区分增删改查
服务注册:com.alibaba.nacos.naming.controllers.InstanceController#register
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 将request转为Instance对象
final Instance instance = parseInstance(request);
// 服务注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
registerInstance:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 根据命名空间和服务名构建service
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 获得service
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加实例主要逻辑
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
createEmptyService:
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 根据namespaceId获得serviceMap,它是一个ConcurrentHashMap
// 根据serviceName从中取出服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
// 服务为空则去构建
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 放入服务,初始化定时任务,入队
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
putServiceAndInit:
private void putServiceAndInit(Service service) throws NacosException {
// 向serviceMap放入service
putService(service);
// 构建心跳检查任务
service.init();
// 向队列中添加一个Service,有异步线程处理
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
心跳检查任务是一个ClientBeatCheckTask,主要逻辑:
// 获得所有实例
List<Instance> instances = service.allIPs(true);
// 遍历实例集合
// 当前时间 - 上次心跳时间 > 超时时间(默认15s)
// 超时则将健康状态设置为false
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
// 通过deleteIp方法调用自己本机的过期接口
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
至此registerInstance方法中的createEmptyService逻辑已经分析完成,继续看addInstance的逻辑:
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 构建key,如果ephemeral为true则key中存在"ephemeral."
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 服务注册,使用CopyOnWrite思想
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 返回的instanceList是修改后的列表
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// key中存在"ephemeral."则consistencyService就是DistroConsistencyServiceImpl
consistencyService.put(key, instances);
}
}
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put:
public void put(String key, Record value) throws NacosException {
// 实例放入队列
onPut(key, value);
// 如果是集群环境,将节点信息放入ConcurrentHashMap中
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
onPut:
notifier.addTask(key, DataOperation.CHANGE);
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
这里的tasks是
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
这里我有个没想通的点,为什么要使用ArrayBlockingQueue?LinkedBlockingQueue性能更好才对
此处run方法会循环从队列中拿数据,拿到则去注册;因为是阻塞队列,队列为空时会睡眠,不存在CPU浪费的问题。
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
distroProtocol.sync:集群环境所用,将修改的信息放入Map中,异步线程会拿到它去调用远程接口同步数据
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 获得所有节点遍历
for (Member each : memberManager.allMembersWithoutSelf()) {
// 节点信息封装为DistroKey
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 节点信息放入ConcurrentHashMap中
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder:
@Component
public class DistroTaskEngineHolder {
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
}
成员变量DistroDelayTaskExecuteEngine,它的父构造方法:
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 向定时线程池添加ProcessRunnable
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
ProcessRunnable逻辑:
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
最终它会执行com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor#process,从上面的Map中取出信息放到一个队列中
com.alibaba.nacos.common.task.engine.TaskExecuteWorker.InnerWorker:
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("distro task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[DISTRO-FAILED] " + e.toString(), e);
}
}
}
}
从队列中拿出任务直接执行run方法,这里执行的是DistroSyncChangeTask中的逻辑:
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
这里syncData调用远程接口/distro/datum同步数据。
DistroProtocol
构造方法
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startDistroTask();
}
会调用startDistroTask():
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
向线程池添加定时任务DistroLoadDataTask:
public void run() {
try {
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
private void load() throws Exception {
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
// 通过loadAllDataSnapshotFromRemote从远程拉取数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 拉取数据的逻辑
boolean result = dataProcessor.processSnapshot(distroData);
// 如果拉取成功则直接返回,保证只从一台服务器拉取数据
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
拉取数据会调用远程的com.alibaba.nacos.naming.controllers.DistroController#onSyncDatum:
最终调用dataProcessor.processData:
public boolean processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
onPut(datum.key, datum.value);
return true;
}
会调用onPut去注册实例。
服务发现
客户端:
com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String):
核心逻辑在getServiceInfo方法中的getServiceInfo0方法中:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 从Map中获取服务
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 如果为空则调用updateServiceNow,获得远程服务列表放入到Map中
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 发布定时任务,定时拉取列表信息去更新Map
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
服务端:
服务端收到请求,进入com.alibaba.nacos.naming.controllers.InstanceController#list:
会调用doSrvIpxt,核心逻辑就是调用srvIPs方法:
public List<Instance> srvIPs(List<String> clusters) {
if (CollectionUtils.isEmpty(clusters)) {
clusters = new ArrayList<>();
clusters.addAll(clusterMap.keySet());
}
// 获得实例集合并返回
return allIPs(clusters);
}
public List<Instance> allIPs(List<String> clusters) {
List<Instance> result = new ArrayList<>();
for (String cluster : clusters) {
// 这里的信息是注册时候放入的
Cluster clusterObj = clusterMap.get(cluster);
if (clusterObj == null) {
continue;
}
result.addAll(clusterObj.allIPs());
}
return result;
}
和Eureka对比
首先给阿里的编码风格点赞,看着就是舒服,Eureka代码一大片逻辑都写在一起了,相比下Nacos看起来简洁清爽了许多
Eureka注册部分代码:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 加读锁
read.lock();
// 获得微服务组
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 根据传入的id获得服务实例
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 如果存在则赋值给registrant
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
// 不存在,记录数量
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
}
// 使用registrant创建Lease
// 会记录registrationTimestamp(服务注册时间)
// lastUpdateTimestamp(最后操作时间) duration(失效时间数)
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 设置恢复正常时的状态
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 放入微服务组中
gMap.put(registrant.getId(), lease);
} finally {
read.unlock();
}
Eureka是同步去注册的,注册时加读锁,Nacos注册时直接入队列,异步线程去进行注册,注册时使用CopyOnWrite空间换时间,提升注册并发
服务发现对比
Nacos获取服务实例直接取自ephemeralInstances,Eureka服务注册和发现时会加锁,为了降低锁竞争,有三级缓存
// 无过期时间,保存服务信息的对外输出数据结构,定时从二级缓存拉取注册信息
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// 为了降低注册表registry读写锁竞争,降低读取频率,本质上是 guava 的缓存,包含定时失效机制
private final LoadingCache<Key, Value> readWriteCacheMap;
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
读取顺序:只读缓存->读写缓存->真实数据
只读缓存的数据只会来源于读写缓存,而且没有提供主动更新API。
读写缓存是使用Guava实现的,本身设置了 guava 的失效机制,隔一段时间后自己自动失效。
定时更新一级缓存的时候,会读取二级缓存,如果二级缓存没有数据,也会触发load,拉取registry的注册数据。