Nacos客户端本地缓存和故障转移方式
在Nacos客户端从Server获得服务的时候,在某些时候出现了一些故障, 这时候为了保证服务正常,Nacos进行了故障转移,原理就是将之前缓存的服务信息拿出来用,防止服务出现问题,涉及到的核心类为ServiceInfoHolder和FailoverReactor。
本地缓存有两方面,第一方面是从注册中心获得实例信息会缓存在内存当中,也就是通过Map的形式承载,这样查询操作都方便。第二方面便是通过磁盘文件的形式定时缓存起来,以备不时之需。
故障转移也分两方面,第一方面是故障转移的开关是通过文件来标记的;第二方面是当开启故障转移之后,当发生故障时,可以从故障转移备份的文件中来获得服务实例信息。
1. ServiceInfoHolder
ServiceInfoHolder类,顾名思义,服务信息的持有者。每次客户端从注册中心获取新的服务信息时都会调用该类,其中processServiceInfo方法来进行本地化处理,包括更新缓存服务、发布事件、更新本地文件等。
ServiceInfoHolder类持有了ServiceInfo,通过一个ConcurrentMap来储存
// ServiceInfoHolder private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
当从服务端拉会服务信息时,就会往这个map中进行存储。
// ServiceInfoHolder public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { .... //缓存服务信息 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); .... }
在创建ServiceInfoHolder时会做如下事情
- 初始化本地缓存目录
- 根据配置从本地缓存初始化服务,默认false
- 创建FailoverReactor并相互持有ServiceInfoHolder
// ServiceInfoHolder public ServiceInfoHolder(String namespace, Properties properties) { initCacheDir(namespace, properties); if (isLoadCacheAtStart(properties)) { this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap<>(16); } this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushEmptyProtection = isPushEmptyProtect(properties); }
本地缓存目录
在processServiceInfo中会进行本地缓存写入,其实就是写入这个目录(这个目录是在创建ServiceInfoHolder时根据配置初始化的),所以目录中的数据正常是最新读取到的服务信息
// ServiceInfoHolder public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { .... // 记录Service本地文件 DiskCache.write(serviceInfo, cacheDir); .... }
本地缓存目录默认路径:${user.home}/nacos/naming/public,也可以自定义,通过System.setProperty("JM.SNAPSHOT.PATH")自定义
2. FailoverReactor
在ServiceInfoHolder的构造方法中,还会初始化一个FailoverReactor类,同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。
构造故障转移做了如下事情:
// FailoverReactor public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) { // 持有ServiceInfoHolder的引用 this.serviceInfoHolder = serviceInfoHolder; // 拼接故障目录:${user.home}/nacos/naming/public/failover this.failoverDir = cacheDir + FAILOVER_DIR; // 初始化executorService,支持延时执行 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); // 守护线程模式运行 thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.failover"); return thread; } }); // 其他初始化操作,通过executorService开启多个定时任务执行 this.init(); }
init方法执行
在这个方法中开启了三个定时任务,这三个任务其实都是FailoverReactor的内部类
1. 初始化立即执行,然后每间隔5秒再执行SwitchRefresher
2. 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
3. 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter
// FailoverReactor public void init() { // 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS); // 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES); // 10秒后如果故障目录为空,则执行DiskFileWriter任务强制备份 executorService.schedule(new Runnable() { @Override public void run() { try { File cacheDir = new File(failoverDir); ... File[] files = cacheDir.listFiles(); if (files == null || files.length <= 0) { new DiskFileWriter().run(); } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to backup file on startup.", e); } } }, 10000L, TimeUnit.MILLISECONDS); }
DiskFileWriter刷盘任务
将ServiceInfo写入备份磁盘
// FailoverReactor class DiskFileWriter extends TimerTask { @Override public void run() { Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap(); for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) { ServiceInfo serviceInfo = entry.getValue(); ... // 将缓存写入磁盘 DiskCache.write(serviceInfo, failoverDir); } } }
SwitchRefresher根据标记故障转移文件切换内存标志
- 如果故障转移文件不存在,则直接返回(文件开关)
- 比较文件修改时间,如果已经修改,则获取故障转移文件中的内容。
- 故障转移文件中存储了0和1标识。0表示关闭,1表示开启。
- 当为开启状态时,执行线程FailoverFileReader。
// FailoverReactor class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L; @Override public void run() { try { File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH); // 文件不存在则退出 if (!switchFile.exists()) { ... return; } long modified = switchFile.lastModified(); if (lastModifiedMillis < modified) { lastModifiedMillis = modified; // 获取故障转移文件内容 String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString()); if (!StringUtils.isEmpty(failover)) { String[] lines = failover.split(DiskCache.getLineSeparator()); for (String line : lines) { String line1 = line.trim(); // 1 表示开启故障转移模式 if (IS_FAILOVER_MODE.equals(line1)) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString()); new FailoverFileReader().run(); // 0 表示关闭故障转移模式 } else if (NO_FAILOVER_MODE.equals(line1)) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); } } } else { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read failover switch.", e); } } }
FailoverFileReader故障后读取备份文件
该任务是故障转移的核心, 故障转移文件读取,基本操作就是读取failover目录存储的**备份服务信息文件**内容,然后转换成ServiceInfo,并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。
流程如下:
1. 读取failover目录下的所有文件,进行遍历处理
2. 如果文件不存在跳过
3. 如果文件是故障转移开关标志文件跳过
4. 读取文件中的备份内容,转换为ServiceInfo对象
5. 将ServiceInfo对象放入到domMap中
6. 最后判断domMap不为空,赋值给serviceMap
// FailoverReactor class FailoverFileReader implements Runnable { @Override public void run() { Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16); File cacheDir = new File(failoverDir); File[] files = cacheDir.listFiles(); if (files == null) { return; } for (File file : files) { // 如果是故障转移标志文件,则跳过 if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) { continue; } ServiceInfo dom = new ServiceInfo(file.getName()); BufferedReader reader = null; try { String dataString = ConcurrentDiskUtil .getFileContent(file, Charset.defaultCharset().toString()); reader = new BufferedReader(new StringReader(dataString)); String json = reader.readLine(); dom = JacksonUtils.toObj(json, ServiceInfo.class); } finally { reader.close(); } if (!CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } } // 读入缓存 if (domMap.size() > 0) { serviceMap = domMap; } } }·
FailoverReactor.serviceMap的使用
我们在ServiceInfoHolder中的getServiceInfo方法就会判断,如果当前是故障切换状态,就会从FailoverReactor中获取服务,那么这是就用到了FailoverReactor.serviceMap缓存的服务了
// ServiceInfoHolder public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) { ... if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } return serviceInfoMap.get(key); }
// FailoverReactor public ServiceInfo getService(String key) { ServiceInfo serviceInfo = serviceMap.get(key); if (serviceInfo == null) { serviceInfo = new ServiceInfo(); serviceInfo.setName(key); } return serviceInfo; }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
使用springmvc运行流程分析,手写spring框架尝试
这篇文章主要介绍了使用springmvc运行流程分析,手写spring框架尝试,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-10-10SpringBoot2.0 中 HikariCP 数据库连接池原理解析
这篇文章主要介绍了SpringBoot2.0 中 HikariCP 数据库连接池原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-01-01
最新评论