SpringCloud轮询拉取注册表与服务发现流程详解
一、前言
上一篇我们讨论了关于周期性任务的一些应用等,本篇文章我们来探究一下这些内容:周期性刷新注册表?全量拉取注册表还是增量拉取注册表、更新本地缓存?服务发现的入口、获取本地服务列表?
二、轮询拉取注册表
1、构造初始化
同样是在Spring容器初始化的过程中初始化的,基于SpringBoot自动装配集成。上一节也讲了一部分,这里补充:
@Singleton public class DiscoveryClient implements EurekaClient { ....省略n行代码...... private final ScheduledExecutorService scheduler; // additional executors for supervised subtasks监督子任务的附加执行器 private final ThreadPoolExecutor heartbeatExecutor; private final ThreadPoolExecutor cacheRefreshExecutor; private TimedSupervisorTask cacheRefreshTask; private TimedSupervisorTask heartbeatTask; ....省略n行代码...... // Spring容器初始化时候调用 public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) { // 调用下面重载方法 this(applicationInfoManager, config, args, ResolverUtils::randomize); } public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) { this(applicationInfoManager, config, args, new Provider<BackupRegistry>() { ....省略n行代码...... } @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { try { // default size of 2 - 1 each for heartbeat and cacheRefresh心跳和缓存刷新的默认大小分别为2-1 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 心跳执行者 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 缓存刷新执行者 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 初始化通信封装类 eurekaTransport = new EurekaTransport(); ....省略n行代码...... } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 默认true,可更改配置不建议 if (clientConfig.shouldFetchRegistry()) { try {// 初始化注册表 boolean primaryFetchRegistryResult = fetchRegistry(false); // 下面主要打印失败日志,初始化时控制台可见是处理成功的 if (!primaryFetchRegistryResult) { // 从主服务器初始注册表提取失败 logger.info("Initial registry fetch from primary servers failed"); } boolean backupFetchRegistryResult = true; if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { // 如果所有的eureka服务器网址都无法访问,从备份注册表中获取注册表信息也失败。 backupFetchRegistryResult = false; // 从备份服务器初始注册表提取失败 logger.info("Initial registry fetch from backup servers failed"); } if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { // 在启动时获取注册表错误。初始获取失败。 throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { logger.error("Fetch registry error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } ....省略n行代码...... // 最后,初始化调度任务(例如,集群解析器、 heartbeat、 instanceInfo replicator、 fetch initScheduledTasks(); ....省略n行代码...... }
主要逻辑:
- 初始化缓存刷新执行器,用于周期性执行任务,下面继续分析
- 默认需要刷新注册表,默认不使用全量拉取,但是初始化时使用下面2分析:会调用注册中心完成注册表初始化,返回是否刷新成功;如果从主服务器初始注册表提取失败打印日志;如果所有的eureka服务器网址都无法访问,从备份注册表中获取注册表信息也失败打印日志;在启动时获取注册表错误,抛异常。
- 可见在2中没有特殊原因的话,一般是使用全量拉取注册表初始化成功了,否则的话抛异常
缓存刷新
大部分逻辑在前面的章节已经分析,TimedSupervisorTask跟发送心跳服务续约逻辑是一样的,这里补充刷新本地服务列表任务。
private void initScheduledTasks() { // 默认true,可更改配置不建议 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer注册表缓存刷新计时器 // 默认30 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { /* LeaseInfo: public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30; // Client settings private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL; */ // 默认30 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer心跳任务 heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); // 默认的情况下会每隔30秒向注册中心 (eureka.instance.lease-renewal-interval-in-seconds)发送一次心跳来进行服务续约 scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator实例信息复制任务 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 状态变更监听者 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { // Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING] logger.info("Saw local status change event {}", statusChangeEvent); instanceInfoReplicator.onDemandUpdate(); } }; // 初始化状态变更监听者 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 3.2 定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
主要逻辑:
默认需要刷新注册表,要达到服务高可用。
1)clientConfig.getRegistryFetchIntervalSeconds()获取注册表刷新时间,默认30秒,可在配置文件更改;
2)初始化cacheRefreshTask为TimedSupervisorTask类型,跟心跳任务一样处理逻辑,我们这节就只分析CacheRefreshThread刷新缓存逻辑,见3
2、刷新注册表
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications如果 delta 被禁用或者是第一次,那么获取所有的应用程序 Applications applications = getApplications(); // shouldDisableDelta默认false if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { // 第一次 logger.info("Disable delta property false: {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property null: {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch false: {}", forceFullRegistryFetch); logger.info("Application is null false: {}", (applications == null)); logger.info("Registered Applications size is zero true: {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1true: {}", (applications.getVersion() == -1)); // 全量拉取 getAndStoreFullRegistry(); } else { // 增量拉取 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}", appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 在更新实例远程状态之前通知缓存刷新 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 根据缓存中保存的刷新数据更新远程状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
主要逻辑:
- 由上面的1.1可见状态变更监听者还没有初始化,从前面的文章也知道它的作用完成服务注册,故这里从本地获取应用就为空。所以先打印下日志,调用全量拉取注册表方法,下面分析。轨迹跟踪对象非空,关闭。
- 在更新实例远程状态之前通知缓存刷新
- 根据缓存中保存的刷新数据更新远程状态
全量拉取注册表
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); // 从 eureka 服务器上获取所有实例注册信息 logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // RegistryRefreshSingleVipAddress默认空 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); // 响应成功获取应用 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } // 200 logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 缓存到本地 localRegionApps.set(this.filterAndShuffle(apps)); // 如:UP_1_ logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
主要逻辑:
- RegistryRefreshSingleVipAddress默认空,故调用eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())请求注册中心
- 响应成功获取应用
- 如果应用apps空则打印下日志;一般CAS成功,在筛选仅具有 UP 状态的实例的应用程序并对它们进行洗牌之后获取应用程序,缓存到本地localRegionApps(AtomicReference<Applications>类型);否则打印下日志
3、缓存刷新任务
class CacheRefreshThread implements Runnable { @Override public void run() { refreshRegistry(); } } @VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. // 这可以确保对要获取的远程区域的动态更改得到遵守。 // 默认null String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync // RemoteRegionsToFetch 和 AzToRegionMapper.regionsToFetch 都需要同步 synchronized (instanceRegionChecker.getAzToRegionMapper()) { // CAS if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { // 并发获取修改的远程区域,忽略从{}到{}的更改 logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change // 只需刷新映射以反映任何 DNS/属性更改 instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 刷新注册表 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
CacheRefreshThread 实现了Runnable接口,但是run()中任务逻辑封装了出去,在refreshRegistry()中处理。在2中分析了初始化时已经使用全量拉取注册表并缓存应用到本地localRegionApps,那么这里使用延迟任务处理的话就会执行增量拉取逻辑了,在下面4分析
4、增量拉取注册表
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 增量查询获取 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); // 响应成功 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { // 服务器不允许应用delta修订,因为它不安全。因此得到了完整的登记表,即转换为全量拉取 logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // CAS成功 logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { // 加锁成功 try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason出于某种原因,数量有所不同 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall这个可以远程呼叫 } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
主要逻辑:
- 增量查询获取,响应成功获取数据
- CAS成功并且加锁成功,将响应结果更新到本地,然后释放锁
增量更新到本地缓存
private void updateDelta(Applications delta) { int deltaCount = 0; for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { // 从本地获取,以便更新 Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } // 将实例{}添加到区域{}中的现有应用程序 // ceam-config:8888,ceam-auth:8005,region null logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); // 即添加到Application的instancesMap applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } // 修改现有应用程序的实例{} logger.debug("Modified instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { // 删除现有应用程序的实例{} logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. * 我们从应用程序中找到所有的实例列表(实例状态的状态不仅是状态是 UP, * 还有其他状态)如果实例列表为空,我们删除应用程序。 */ if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); // 对提供的实例进行洗牌,以便它们不总是以相同的顺序返回。 getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); // 对提供的实例进行洗牌,以便它们不总是以相同的顺序返回。 applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }
主要逻辑:
- 遍历增量的应用数据,遍历应用中的实例
- 从本地获取应用数据,以便更新处理
- 如果是ADDED,则将实例添加到区域中的现有应用程序;如果是MODIFIED,则修改现有应用程序的实例;如果是DELETED,则删除现有应用程序的实例,并且从应用程序中找到所有的实例列表(实例状态的状态不仅是状态是 UP,还有其他状态)如果实例列表为空,删除应用程序。
- 对提供的实例进行洗牌,以便它们不总是以相同的顺序返回。
三、服务发现
1、客户端获取服务实例
@Override public List<ServiceInstance> getInstances(String serviceId) { // 委托eurekaClient处理 List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false); List<ServiceInstance> instances = new ArrayList<>(); for (InstanceInfo info : infos) { instances.add(new EurekaServiceInstance(info)); } return instances; }
跟Nacos的入口是类似的,需要实现spring-cloud-commons的DiscoveryClient接口。这里EurekaDiscoveryClient会委托Eureka项目里面的EurekaClient处理,见下面2分析。然后将Instances列表转换为spring-cloud-commons里面的ServiceInstance类型列表。
2、从本地列表获取
@Override public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure) { return getInstancesByVipAddress(vipAddress, secure, instanceRegionChecker.getLocalRegion()); } @Override public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, @Nullable String region) { if (vipAddress == null) { throw new IllegalArgumentException( "Supplied VIP Address cannot be null"); } Applications applications; // 如果eureka:region:没有指定,则使用默认值且非空,即默认使用默认localRegion if (instanceRegionChecker.isLocalRegion(region)) { // 本地获取 applications = this.localRegionApps.get(); } else { applications = remoteRegionVsApps.get(region); if (null == applications) { logger.debug("No applications are defined for region {}, so returning an empty instance list for vip " + "address {}.", region, vipAddress); return Collections.emptyList(); } } // secure默认false if (!secure) { return applications.getInstancesByVirtualHostName(vipAddress); } else { return applications.getInstancesBySecureVirtualHostName(vipAddress); } }
主要逻辑:
- 调用重载方法,vipAddress即serviceId,secure为false
- 如果eureka:region:没有指定,则使用默认值且非空,即默认使用默认localRegion。从本地应用获取列表。
- secure为false,获取与虚拟主机名关联的instance列表
到此这篇关于SpringCloud轮询拉取注册表与服务发现流程详解的文章就介绍到这了,更多相关SpringCloud注册表与服务发现内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java System.currentTimeMillis()时间的单位转换与计算方式案例详解
这篇文章主要介绍了Java System.currentTimeMillis()时间的单位转换与计算方式案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下2021-08-08
最新评论