PowerJob的ServerDiscoveryService工作流程源码解读
序
本文主要研究一下PowerJob的ServerDiscoveryService
ServerDiscoveryService
tech/powerjob/worker/background/ServerDiscoveryService.java
@Slf4j public class ServerDiscoveryService { private final Long appId; private final PowerJobWorkerConfig config; private String currentServerAddress; private final Map<String, String> ip2Address = Maps.newHashMap(); /** * 服务发现地址 */ private static final String DISCOVERY_URL = "http://%s/server/acquire?%s"; /** * 失败次数 */ private static int FAILED_COUNT = 0; /** * 最大失败次数 */ private static final int MAX_FAILED_COUNT = 3; public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) { this.appId = appId; this.config = config; } //...... }
ServerDiscoveryService定义了currentServerAddress、ip2Address、服务发现url模版,失败次数,最大失败次数
start
public void start(ScheduledExecutorService timingPool) { this.currentServerAddress = discovery(); if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) { throw new PowerJobException("can't find any available server, this worker has been quarantined."); } // 这里必须保证成功 timingPool.scheduleAtFixedRate(() -> { try { this.currentServerAddress = discovery(); } catch (Exception e) { log.error("[PowerDiscovery] fail to discovery server!", e); } } , 10, 10, TimeUnit.SECONDS); }
其start方法先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress
discovery
private String discovery() { if (ip2Address.isEmpty()) { config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x)); } String result = null; // 先对当前机器发起请求 String currentServer = currentServerAddress; if (!StringUtils.isEmpty(currentServer)) { String ip = currentServer.split(":")[0]; // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担 String firstServerAddress = ip2Address.get(ip); if (firstServerAddress != null) { result = acquire(firstServerAddress); } } for (String httpServerAddress : config.getServerAddress()) { if (StringUtils.isEmpty(result)) { result = acquire(httpServerAddress); }else { break; } } if (StringUtils.isEmpty(result)) { log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined."); // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务 if (FAILED_COUNT++ > MAX_FAILED_COUNT) { log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker."); List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys(); if (!CollectionUtils.isEmpty(frequentInstanceIds)) { frequentInstanceIds.forEach(instanceId -> { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId); taskTracker.destroy(); log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId); }); } FAILED_COUNT = 0; } return null; } else { // 重置失败次数 FAILED_COUNT = 0; log.debug("[PowerDiscovery] current server is {}.", result); return result; } }
discovery方法从config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值则acquire,否则遍历config.getServerAddress()执行acquire;若还没有获取到则判断FAILED_COUNT是否超出MAX_FAILED_COUNT,超出则遍历HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨个执行remove及destory
acquire
private String acquire(String httpServerAddress) { String result = null; String url = buildServerDiscoveryUrl(httpServerAddress); try { result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); }catch (Exception ignore) { } if (!StringUtils.isEmpty(result)) { try { ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class); if (resultDTO.isSuccess()) { return resultDTO.getData().toString(); } }catch (Exception ignore) { } } return null; } private String buildServerDiscoveryUrl(String address) { ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest() .setAppId(appId) .setCurrentServer(currentServerAddress) .setProtocol(config.getProtocol().name().toUpperCase()); String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap()); return String.format(DISCOVERY_URL, address, query); }
acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取地址
ServerController
tech/powerjob/server/web/controller/ServerController.java
@RestController @RequestMapping("/server") @RequiredArgsConstructor public class ServerController implements ServerInfoAware { private ServerInfo serverInfo; private final TransportService transportService; private final ServerElectionService serverElectionService; private final AppInfoRepository appInfoRepository; private final WorkerClusterQueryService workerClusterQueryService; //...... @GetMapping("/acquire") public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) { return ResultDTO.success(serverElectionService.elect(request)); } //...... }
acquireServer方法执行serverElectionService.elect(request)返回server地址
elect
tech/powerjob/server/remote/server/election/ServerElectionService.java
public String elect(ServerDiscoveryRequest request) { if (!accurate()) { final String currentServer = request.getCurrentServer(); // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol())); if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) { log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer); return currentServer; } } return getServer0(request); }
elect方法判断如果是本机就直接返回,否则执行getServer0
getServer0
private String getServer0(ServerDiscoveryRequest discoveryRequest) { final Long appId = discoveryRequest.getAppId(); final String protocol = discoveryRequest.getProtocol(); Set<String> downServerCache = Sets.newHashSet(); for (int i = 0; i < RETRY_TIMES; i++) { // 无锁获取当前数据库中的Server Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { throw new PowerJobException(appId + " is not registered!"); } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); String activeAddress = activeAddress(originServer, downServerCache, protocol); if (StringUtils.isNotEmpty(activeAddress)) { return activeAddress; } // 无可用Server,重新进行Server选举,需要加锁 String lockName = String.format(SERVER_ELECT_LOCK, appId); boolean lockStatus = lockService.tryLock(lockName, 30000); if (!lockStatus) { try { Thread.sleep(500); }catch (Exception ignore) { } continue; } try { // 可能上一台机器已经完成了Server选举,需要再次判断 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol); if (StringUtils.isNotEmpty(address)) { return address; } // 篡位,如果本机存在协议,则作为Server调度该 worker final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol); if (targetProtocolInfo != null) { // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址 appInfo.setCurrentServer(transportService.defaultProtocol().getAddress()); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); return targetProtocolInfo.getAddress(); } }catch (Exception e) { log.error("[ServerElection] write new server to db failed for app {}.", appName, e); } finally { lockService.unlock(lockName); } } throw new PowerJobException("server elect failed for app " + appId); }
getServer0方法先判断appInfoRepository中当前appId的originServer是否存活,是则直接返回,否则加锁将transportService.defaultProtocol().getAddress()写入到appInfo的currentServer
小结
PowerJob的ServerDiscoveryService定义了start方法,它先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress;
discovery方法主要是遍历config.getServerAddress()执行acquire;
acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取该appId的server地址。
以上就是PowerJob的ServerDiscoveryService工作流程源码解读的详细内容,更多关于PowerJob ServerDiscoveryService的资料请关注脚本之家其它相关文章!
- PowerJob的TimingStrategyHandler工作流程源码解读
- PowerJob的IdGenerateService工作流程源码解读
- PowerJob LockService方法工作流程源码解读
- PowerJob的Evaluator方法工作流程源码解读
- PowerJob的DatabaseMonitorAspect源码流程
- PowerJob的AbstractScriptProcessor实现类工作流程源码解读
- PowerJob的WorkerHealthReporter工作流程源码解读
- PowerJob的OmsLogHandler工作流程源码解析
- PowerJob的ProcessorLoader工作流程源码解读
- PowerJob的DispatchStrategy方法工作流程源码解读
相关文章
springcloud client指定注册到eureka的ip与端口号方式
这篇文章主要介绍了springcloud client指定注册到eureka的ip与端口号方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-03-03java.net.SocketException: Connection reset 解决方法
最近纠结致死的一个java报错java.net.SocketException: Connection reset 终于得到解决2013-03-03Spring Boot2深入分析解决java.lang.ArrayStoreException异常
这篇文章介绍了Spring Boot2深入分析解决java.lang.ArrayStoreException异常的方法,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-12-12
最新评论