PowerJob Alarmable工作流程源码剖析
序
本文主要研究一下PowerJob的Alarmable
Alarmable
tech/powerjob/server/extension/Alarmable.java
public interface Alarmable { void onFailed(Alarm alarm, List<UserInfoDO> targetUserList); }
Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList
Alarm
public interface Alarm extends PowerSerializable { String fetchTitle(); default String fetchContent() { StringBuilder sb = new StringBuilder(); JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this)); content.forEach((key, originWord) -> { sb.append(key).append(": "); String word = String.valueOf(originWord); if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) { try { if (originWord instanceof Long) { word = CommonUtils.formatTime((Long) originWord); } }catch (Exception ignore) { } } sb.append(word).append(OmsConstant.LINE_SEPARATOR); }); return sb.toString(); } }
Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm
DingTalkAlarmService
tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java
@Slf4j @Service @RequiredArgsConstructor public class DingTalkAlarmService implements Alarmable { private final Environment environment; private Long agentId; private DingTalkUtils dingTalkUtils; private Cache<String, String> mobile2UserIdCache; private static final int CACHE_SIZE = 8192; /** * 防止缓存击穿 */ private static final String EMPTY_TAG = "EMPTY"; @Override public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { if (dingTalkUtils == null) { return; } Set<String> userIds = Sets.newHashSet(); targetUserList.forEach(user -> { String phone = user.getPhone(); if (StringUtils.isEmpty(phone)) { return; } try { String userId = mobile2UserIdCache.get(phone, () -> { try { return dingTalkUtils.fetchUserIdByMobile(phone); } catch (PowerJobException ignore) { return EMPTY_TAG; } catch (Exception ignore) { return null; } }); if (!EMPTY_TAG.equals(userId)) { userIds .add(userId); } }catch (Exception ignore) { } }); userIds.remove(null); if (!userIds.isEmpty()) { String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds); List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList(); markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost())); String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA); markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content)); try { dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId); }catch (Exception e) { log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage()); } } } @PostConstruct public void init() { String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID); String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY); String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET); log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId); if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) { log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable"); return; } if (!StringUtils.isNumeric(agentId)) { log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId); return; } this.agentId = Long.valueOf(agentId); dingTalkUtils = new DingTalkUtils(appKey, appSecret); mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build(); log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!"); } }
DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送
MailAlarmService
tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java
@Slf4j @Service public class MailAlarmService implements Alarmable { @Resource private Environment environment; private JavaMailSender javaMailSender; @Value("${spring.mail.username:''}") private String from; @Override public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) { return; } SimpleMailMessage sm = new SimpleMailMessage(); try { sm.setFrom(from); sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new)); sm.setSubject(alarm.fetchTitle()); sm.setText(alarm.fetchContent()); javaMailSender.send(sm); }catch (Exception e) { log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage()); } } @Autowired(required = false) public void setJavaMailSender(JavaMailSender javaMailSender) { this.javaMailSender = javaMailSender; } }
MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送
WebHookAlarmService
tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java
@Slf4j @Service public class WebHookAlarmService implements Alarmable { private static final String HTTP_PROTOCOL_PREFIX = "http://"; private static final String HTTPS_PROTOCOL_PREFIX = "https://"; @Override public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { if (CollectionUtils.isEmpty(targetUserList)) { return; } targetUserList.forEach(user -> { String webHook = user.getWebHook(); if (StringUtils.isEmpty(webHook)) { return; } // 自动添加协议头 if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) { webHook = HTTP_PROTOCOL_PREFIX + webHook; } MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm)); try { String response = HttpUtils.post(webHook, requestBody); log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response); }catch (Exception e) { log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e); } }); } }
WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调
小结
PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient
)、MailAlarmService(用的是spring的JavaMailSender
)、WebHookAlarmService(用的是okhttp3的OkHttpClient
)。
以上就是PowerJob Alarmable工作流程源码剖析的详细内容,更多关于PowerJob Alarmable的资料请关注脚本之家其它相关文章!
相关文章
Java 时间格式转换之impleDateFormat与Data API解析与使用
想必大家对 SimpleDateFormat 并不陌生。SimpleDateFormat 是 Java 中一个非常常用的类,他是以区域敏感的方式格式化和解析日期的具体类。 它允许格式化 (date -> text)、语法分析 (text -> date)和标准化2021-11-11Java并发编程深入理解之Synchronized的使用及底层原理详解 上
在并发编程中存在线程安全问题,主要原因有:1.存在共享数据 2.多线程共同操作共享数据。关键字synchronized可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块,同时synchronized可以保证一个线程的变化可见(可见性),即可以代替volatile2021-09-09
最新评论