PowerJob的OmsLogHandler工作流程源码解析
序
本文主要研究一下PowerJob的OmsLogHandler
OmsLogHandler
tech/powerjob/worker/background/OmsLogHandler.java
@Slf4j public class OmsLogHandler { private final String workerAddress; private final Transporter transporter; private final ServerDiscoveryService serverDiscoveryService; // 处理线程,需要通过线程池启动 public final Runnable logSubmitter = new LogSubmitter(); // 上报锁,只需要一个线程上报即可 private final Lock reportLock = new ReentrantLock(); // 生产者消费者模式,异步上传日志 private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240); // 每次上报携带的数据条数 private static final int BATCH_SIZE = 20; // 本地囤积阈值 private static final int REPORT_SIZE = 1024; public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) { this.workerAddress = workerAddress; this.transporter = transporter; this.serverDiscoveryService = serverDiscoveryService; } /** * 提交日志 * @param instanceId 任务实例ID * @param logContent 日志内容 */ public void submitLog(long instanceId, LogLevel logLevel, String logContent) { if (logQueue.size() > REPORT_SIZE) { // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁 new Thread(logSubmitter).start(); } InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent); boolean offerRet = logQueue.offer(tuple); if (!offerRet) { log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId); } } //...... }
OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024
),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue
LogSubmitter
private class LogSubmitter implements Runnable { @Override public void run() { boolean lockResult = reportLock.tryLock(); if (!lockResult) { return; } try { final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress(); // 当前无可用 Server if (StringUtils.isEmpty(currentServerAddress)) { if (!logQueue.isEmpty()) { logQueue.clear(); log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs."); } return; } List<InstanceLogContent> logs = Lists.newLinkedList(); while (!logQueue.isEmpty()) { try { InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS); logs.add(logContent); if (logs.size() >= BATCH_SIZE) { WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs)); // 不可靠请求,WEB日志不追求极致 TransportUtils.reportLogs(req, currentServerAddress, transporter); logs.clear(); } }catch (Exception ignore) { break; } } if (!logs.isEmpty()) { WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs); TransportUtils.reportLogs(req, currentServerAddress, transporter); } }finally { reportLock.unlock(); } } }
LogSubmitter实现了Runnable接口,其run方法先通过reportLock加锁,成功才继续,它通过serverDiscoveryService.getCurrentServerAddress()获取当前server的地址,若获取不到则清空logQueue;否则while循环,每次从logQueue拉取InstanceLogContent,放到linkedList,超过BATCH_SIZE(20)则创建WorkerLogReportReq,通过TransportUtils.reportLogs(req, currentServerAddress, transporter)上报,然后清空linkedList,跳出循环之后再上报剩下的日志,最后释放锁
reportLogs
tech/powerjob/worker/common/utils/TransportUtils.java
public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) { final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address); transporter.tell(url, req); } public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) { HandlerLocation handlerLocation = new HandlerLocation() .setRootPath(rootPath) .setMethodPath(handlerPath); return new URL() .setServerType(serverType) .setAddress(Address.fromIpv4(address)) .setLocation(handlerLocation); }
reportLogs先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog
tell
AkkaTransporter
tech/powerjob/remote/akka/AkkaTransporter.java
public void tell(URL url, PowerSerializable request) { ActorSelection actorSelection = fetchActorSelection(url); actorSelection.tell(request, null); }
AkkaTransporter直接使用actorSelection发送请求
VertxTransporter
tech/powerjob/remote/http/vertx/VertxTransporter.java
public void tell(URL url, PowerSerializable request) { post(url, request, null); } private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) { final String host = url.getAddress().getHost(); final int port = url.getAddress().getPort(); final String path = url.getLocation().toPath(); RequestOptions requestOptions = new RequestOptions() .setMethod(HttpMethod.POST) .setHost(host) .setPort(port) .setURI(path); // 获取远程服务器的HTTP连接 Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions); // 转换 -> 发送请求获取响应 Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest -> httpClientRequest .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) .send(JsonObject.mapFrom(request).toBuffer()) ); return responseFuture.compose(httpClientResponse -> { // throw exception final int statusCode = httpClientResponse.statusCode(); if (statusCode != HttpResponseStatus.OK.code()) { // CompletableFuture.get() 时会传递抛出该异常 throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s", host, port, path, statusCode, httpClientResponse.statusMessage() )); } return httpClientResponse.body().compose(x -> { if (clz == null) { return Future.succeededFuture(null); } if (clz.equals(String.class)) { return Future.succeededFuture((T) x.toString()); } return Future.succeededFuture(x.toJsonObject().mapTo(clz)); }); }).toCompletionStage(); }
VertxTransporter则使用post方法通过vertx的httpClient发送请求
processWorkerLogReport
tech/powerjob/server/core/handler/AbWorkerRequestHandler.java
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING) public void processWorkerLogReport(WorkerLogReportReq req) { WorkerLogReportEvent event = new WorkerLogReportEvent() .setWorkerAddress(req.getWorkerAddress()) .setLogNum(req.getInstanceLogContents().size()); try { processWorkerLogReport0(req, event); event.setStatus(WorkerLogReportEvent.Status.SUCCESS); } catch (RejectedExecutionException re) { event.setStatus(WorkerLogReportEvent.Status.REJECTED); } catch (Throwable t) { event.setStatus(WorkerLogReportEvent.Status.EXCEPTION); log.warn("[WorkerRequestHandler] process worker report failed!", t); } finally { monitorService.monitor(event); } }
processWorkerLogReport通过processWorkerLogReport0进行处理,最后通过monitorService.monitor(event)上报监控
processWorkerLogReport0
tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java
@Override protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) { // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); }
processWorkerLogReport0通过instanceLogService.submitLogs进行上报
submitLogs
tech/powerjob/server/core/instance/InstanceLogService.java
/** * 提交日志记录,持久化到本地数据库中 * @param workerAddress 上报机器地址 * @param logs 任务实例运行时日志 */ @Async(value = PJThreadPool.LOCAL_DB_POOL) public void submitLogs(String workerAddress, List<InstanceLogContent> logs) { List<LocalInstanceLogDO> logList = logs.stream().map(x -> { instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis()); LocalInstanceLogDO y = new LocalInstanceLogDO(); BeanUtils.copyProperties(x, y); y.setWorkerAddress(workerAddress); return y; }).collect(Collectors.toList()); try { CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList)); }catch (Exception e) { log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e); } }
InstanceLogService通过PJThreadPool.LOCAL_DB_POOL线程池进行异步,它通过localInstanceLogRepository.saveAll(logList)保存到本地数据库
monitor
tech/powerjob/server/monitor/PowerJobMonitorService.java
public void monitor(Event event) { monitors.forEach(m -> m.record(event)); }
monitor方法遍历monitors,挨个执行record
LogMonitor
tech/powerjob/server/monitor/monitors/LogMonitor.java
public void record(Event event) { MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId())); LoggerFactory.getLogger(event.type()).info(event.message()); }
LogMonitor的record方法通过日志打印event信息
小结
PowerJob的OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024
),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue;logSubmitter主要是执行reportLogs,它先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog;服务端的processWorkerLogReport通过processWorkerLogReport0进行处理(通过localInstanceLogRepository.saveAll(logList)保存到本地数据库),最后通过monitorService.monitor(event)上报监控。
以上就是PowerJob的OmsLogHandler的详细内容,更多关于PowerJob的OmsLogHandler的资料请关注脚本之家其它相关文章!
- PowerJob的TimingStrategyHandler工作流程源码解读
- PowerJob的IdGenerateService工作流程源码解读
- PowerJob LockService方法工作流程源码解读
- PowerJob的Evaluator方法工作流程源码解读
- PowerJob的DatabaseMonitorAspect源码流程
- PowerJob的AbstractScriptProcessor实现类工作流程源码解读
- PowerJob的WorkerHealthReporter工作流程源码解读
- PowerJob的ServerDiscoveryService工作流程源码解读
- PowerJob的ProcessorLoader工作流程源码解读
- PowerJob的DispatchStrategy方法工作流程源码解读
相关文章
SpringBoot整合SSO(single sign on)单点登录
这篇文章主要介绍了SpringBoot整合SSO(single sign on)单点登录,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-06-06Java中的HashMap弱引用之WeakHashMap详解
这篇文章主要介绍了Java中的HashMap弱引用之WeakHashMap详解,当内存空间不足,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足的问题,需要的朋友可以参考下2023-09-09MyBatis-Plus多表联查的实现方法(动态查询和静态查询)
本文用示例介绍使用MyBatis-Plus进行多表查询的方法,包括静态查询和动态查询,通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧2022-03-03
最新评论