PowerJob的ServerDiscoveryService工作流程源码解读

 更新时间:2023年12月24日 15:26:55   作者:codecraft  
这篇文章主要为大家介绍了PowerJob的ServerDiscoveryService工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的ServerDiscoveryService

ServerDiscoveryService

tech/powerjob/worker/background/ServerDiscoveryService.java

@Slf4j
public class ServerDiscoveryService {
    private final Long appId;
    private final PowerJobWorkerConfig config;
    private String currentServerAddress;
    private final Map<String, String> ip2Address = Maps.newHashMap();
    /**
     *  服务发现地址
     */
    private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
    /**
     * 失败次数
     */
    private static int FAILED_COUNT = 0;
    /**
     * 最大失败次数
     */
    private static final int MAX_FAILED_COUNT = 3;
    public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
        this.appId = appId;
        this.config = config;
    }
    //......
}
ServerDiscoveryService定义了currentServerAddress、ip2Address、服务发现url模版,失败次数,最大失败次数

start

public void start(ScheduledExecutorService timingPool) {
        this.currentServerAddress = discovery();
        if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
            throw new PowerJobException("can't find any available server, this worker has been quarantined.");
        }
        // 这里必须保证成功
        timingPool.scheduleAtFixedRate(() -> {
                    try {
                        this.currentServerAddress = discovery();
                    } catch (Exception e) {
                        log.error("[PowerDiscovery] fail to discovery server!", e);
                    }
                }
                , 10, 10, TimeUnit.SECONDS);
    }
其start方法先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress

discovery

private String discovery() {
        if (ip2Address.isEmpty()) {
            config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
        }
        String result = null;
        // 先对当前机器发起请求
        String currentServer = currentServerAddress;
        if (!StringUtils.isEmpty(currentServer)) {
            String ip = currentServer.split(":")[0];
            // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担
            String firstServerAddress = ip2Address.get(ip);
            if (firstServerAddress != null) {
                result = acquire(firstServerAddress);
            }
        }
        for (String httpServerAddress : config.getServerAddress()) {
            if (StringUtils.isEmpty(result)) {
                result = acquire(httpServerAddress);
            }else {
                break;
            }
        }
        if (StringUtils.isEmpty(result)) {
            log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
            // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
            if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
                log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
                List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
                if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
                    frequentInstanceIds.forEach(instanceId -> {
                        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
                        taskTracker.destroy();
                        log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
                    });
                }
                FAILED_COUNT = 0;
            }
            return null;
        } else {
            // 重置失败次数
            FAILED_COUNT = 0;
            log.debug("[PowerDiscovery] current server is {}.", result);
            return result;
        }
    }
discovery方法从config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值则acquire,否则遍历config.getServerAddress()执行acquire;若还没有获取到则判断FAILED_COUNT是否超出MAX_FAILED_COUNT,超出则遍历HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨个执行remove及destory

acquire

private String acquire(String httpServerAddress) {
        String result = null;
        String url = buildServerDiscoveryUrl(httpServerAddress);
        try {
            result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
        }catch (Exception ignore) {
        }
        if (!StringUtils.isEmpty(result)) {
            try {
                ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    return resultDTO.getData().toString();
                }
            }catch (Exception ignore) {
            }
        }
        return null;
    }
    private String buildServerDiscoveryUrl(String address) {
        ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
                .setAppId(appId)
                .setCurrentServer(currentServerAddress)
                .setProtocol(config.getProtocol().name().toUpperCase());
        String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
        return String.format(DISCOVERY_URL, address, query);
    }
acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取地址

ServerController

tech/powerjob/server/web/controller/ServerController.java

@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {
    private ServerInfo serverInfo;
    private final TransportService transportService;
    private final ServerElectionService serverElectionService;
    private final AppInfoRepository appInfoRepository;
    private final WorkerClusterQueryService workerClusterQueryService;
    //......
    @GetMapping("/acquire")
    public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
        return ResultDTO.success(serverElectionService.elect(request));
    }
    //......    
}
acquireServer方法执行serverElectionService.elect(request)返回server地址

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

public String elect(ServerDiscoveryRequest request) {
        if (!accurate()) {
            final String currentServer = request.getCurrentServer();
            // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
            Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
            if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
                log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
                return currentServer;
            }
        }
        return getServer0(request);
    }
elect方法判断如果是本机就直接返回,否则执行getServer0

getServer0

private String getServer0(ServerDiscoveryRequest discoveryRequest) {
        final Long appId = discoveryRequest.getAppId();
        final String protocol = discoveryRequest.getProtocol();
        Set<String> downServerCache = Sets.newHashSet();
        for (int i = 0; i < RETRY_TIMES; i++) {
            // 无锁获取当前数据库中的Server
            Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
            if (!appInfoOpt.isPresent()) {
                throw new PowerJobException(appId + " is not registered!");
            }
            String appName = appInfoOpt.get().getAppName();
            String originServer = appInfoOpt.get().getCurrentServer();
            String activeAddress = activeAddress(originServer, downServerCache, protocol);
            if (StringUtils.isNotEmpty(activeAddress)) {
                return activeAddress;
            }
            // 无可用Server,重新进行Server选举,需要加锁
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = lockService.tryLock(lockName, 30000);
            if (!lockStatus) {
                try {
                    Thread.sleep(500);
                }catch (Exception ignore) {
                }
                continue;
            }
            try {
                // 可能上一台机器已经完成了Server选举,需要再次判断
                AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
                if (StringUtils.isNotEmpty(address)) {
                    return address;
                }
                // 篡位,如果本机存在协议,则作为Server调度该 worker
                final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
                if (targetProtocolInfo != null) {
                    // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
                    appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
                    appInfo.setGmtModified(new Date());
                    appInfoRepository.saveAndFlush(appInfo);
                    log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                    return targetProtocolInfo.getAddress();
                }
            }catch (Exception e) {
                log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
            } finally {
                lockService.unlock(lockName);
            }
        }
        throw new PowerJobException("server elect failed for app " + appId);
    }
getServer0方法先判断appInfoRepository中当前appId的originServer是否存活,是则直接返回,否则加锁将transportService.defaultProtocol().getAddress()写入到appInfo的currentServer

小结

PowerJob的ServerDiscoveryService定义了start方法,它先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress;

discovery方法主要是遍历config.getServerAddress()执行acquire;

acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取该appId的server地址。

以上就是PowerJob的ServerDiscoveryService工作流程源码解读的详细内容,更多关于PowerJob ServerDiscoveryService的资料请关注脚本之家其它相关文章!

相关文章

  • java bootclasspath的具体用法

    java bootclasspath的具体用法

    本文主要介绍了java bootclasspath的具体用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • python和java哪个学起来更简单

    python和java哪个学起来更简单

    在本篇内容里小编给大家分享的是一篇关于python和java哪个学起来更简单的相关内容,有兴趣的朋友们参考下。
    2020-06-06
  • Maven dependency plugin使用心得总结

    Maven dependency plugin使用心得总结

    这篇文章主要给大家介绍了关于Maven dependency plugin使用心得的相关资料,Maven是一个常用的Java build Manager,使用Maven可以很好的对Java Project的dependency进行管理,需要的朋友可以参考下
    2023-10-10
  • springcloud client指定注册到eureka的ip与端口号方式

    springcloud client指定注册到eureka的ip与端口号方式

    这篇文章主要介绍了springcloud client指定注册到eureka的ip与端口号方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Spring Data JPA 映射VO/DTO对象方式

    Spring Data JPA 映射VO/DTO对象方式

    这篇文章主要介绍了Spring Data JPA 映射VO/DTO对象方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • java.net.SocketException: Connection reset 解决方法

    java.net.SocketException: Connection reset 解决方法

    最近纠结致死的一个java报错java.net.SocketException: Connection reset 终于得到解决
    2013-03-03
  • Spring Boot2深入分析解决java.lang.ArrayStoreException异常

    Spring Boot2深入分析解决java.lang.ArrayStoreException异常

    这篇文章介绍了Spring Boot2深入分析解决java.lang.ArrayStoreException异常的方法,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-12-12
  • java版实现2048游戏功能

    java版实现2048游戏功能

    这篇文章主要为大家详细介绍了java版实现2048游戏功能,相加数字出现2048即可,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-07-07
  • java多线程处理执行solr创建索引示例

    java多线程处理执行solr创建索引示例

    这篇文章主要介绍了java多线程处理执行solr创建索引示例,需要的朋友可以参考下
    2014-02-02
  • JWT Token实现方法及步骤详解

    JWT Token实现方法及步骤详解

    这篇文章主要介绍了JWT Token实现方法及步骤详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09

最新评论