PowerJob Alarmable工作流程源码剖析

 更新时间:2024年01月15日 08:42:24   作者:codecraft  
这篇文章主要为大家介绍了PowerJob Alarmable工作流程源码剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的Alarmable

Alarmable

tech/powerjob/server/extension/Alarmable.java

public interface Alarmable {
    void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}
Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList

Alarm

public interface Alarm extends PowerSerializable {
    String fetchTitle();
    default String fetchContent() {
        StringBuilder sb = new StringBuilder();
        JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
        content.forEach((key, originWord) -> {
            sb.append(key).append(": ");
            String word = String.valueOf(originWord);
            if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
                try {
                    if (originWord instanceof Long) {
                        word = CommonUtils.formatTime((Long) originWord);
                    }
                }catch (Exception ignore) {
                }
            }
            sb.append(word).append(OmsConstant.LINE_SEPARATOR);
        });
        return sb.toString();
    }
}
Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm

DingTalkAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {
    private final Environment environment;
    private Long agentId;
    private DingTalkUtils dingTalkUtils;
    private Cache<String, String> mobile2UserIdCache;
    private static final int CACHE_SIZE = 8192;
    /**
     * 防止缓存击穿
     */
    private static final String EMPTY_TAG = "EMPTY";
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (dingTalkUtils == null) {
            return;
        }
        Set<String> userIds = Sets.newHashSet();
        targetUserList.forEach(user -> {
            String phone = user.getPhone();
            if (StringUtils.isEmpty(phone)) {
                return;
            }
            try {
                String userId = mobile2UserIdCache.get(phone, () -> {
                    try {
                        return dingTalkUtils.fetchUserIdByMobile(phone);
                    } catch (PowerJobException ignore) {
                        return EMPTY_TAG;
                    } catch (Exception ignore) {
                        return null;
                    }
                });
                if (!EMPTY_TAG.equals(userId)) {
                    userIds .add(userId);
                }
            }catch (Exception ignore) {
            }
        });
        userIds.remove(null);
        if (!userIds.isEmpty()) {
            String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
            List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
            markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
            String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
            markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
            try {
                dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
            }catch (Exception e) {
                log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
            }
        }
    }
    @PostConstruct
    public void init() {
        String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
        String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
        String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
        log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
        if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
            log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
            return;
        }
        if (!StringUtils.isNumeric(agentId)) {
            log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
            return;
        }
        this.agentId = Long.valueOf(agentId);
        dingTalkUtils = new DingTalkUtils(appKey, appSecret);
        mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();
        log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
    }
}
DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送

MailAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java

@Slf4j
@Service
public class MailAlarmService implements Alarmable {
    @Resource
    private Environment environment;
    private JavaMailSender javaMailSender;
    @Value("${spring.mail.username:''}")
    private String from;
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
            return;
        }
        SimpleMailMessage sm = new SimpleMailMessage();
        try {
            sm.setFrom(from);
            sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
            sm.setSubject(alarm.fetchTitle());
            sm.setText(alarm.fetchContent());
            javaMailSender.send(sm);
        }catch (Exception e) {
            log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
        }
    }
    @Autowired(required = false)
    public void setJavaMailSender(JavaMailSender javaMailSender) {
        this.javaMailSender = javaMailSender;
    }
}
MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送

WebHookAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java

@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {
    private static final String HTTP_PROTOCOL_PREFIX = "http://";
    private static final String HTTPS_PROTOCOL_PREFIX = "https://";
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (CollectionUtils.isEmpty(targetUserList)) {
            return;
        }
        targetUserList.forEach(user -> {
            String webHook = user.getWebHook();
            if (StringUtils.isEmpty(webHook)) {
                return;
            }
            // 自动添加协议头
            if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
                webHook = HTTP_PROTOCOL_PREFIX + webHook;
            }
            MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
            RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
            try {
                String response = HttpUtils.post(webHook, requestBody);
                log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
            }catch (Exception e) {
                log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
            }
        });
    }
}
WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调

小结

PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient)、MailAlarmService(用的是spring的JavaMailSender)、WebHookAlarmService(用的是okhttp3的OkHttpClient)。

以上就是PowerJob Alarmable工作流程源码剖析的详细内容,更多关于PowerJob Alarmable的资料请关注脚本之家其它相关文章!

相关文章

  • Eclipse中如何显示explorer过程解析

    Eclipse中如何显示explorer过程解析

    这篇文章主要介绍了Eclipse中如何显示explorer过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • SpringBoot 中html的页面间跳转问题小结

    SpringBoot 中html的页面间跳转问题小结

    这篇文章主要介绍了SpringBoot 中html的页面间跳转问题小结,本文给大家分享两种方法,结合实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-10-10
  • Java对线程池做监控的实现方法

    Java对线程池做监控的实现方法

    本文主要介绍了Java对线程池做监控的实现方法,监控线程池可以帮助我们了解线程池的状态,如当前活跃线程数、任务队列长度、已完成任务数等,下面就一起来了解一下
    2024-07-07
  • java的JsonObject对象提取值方法

    java的JsonObject对象提取值方法

    下面小编就为大家分享一篇java的JsonObject对象提取值方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-03-03
  • Springboot事件监听与@Async注解详解

    Springboot事件监听与@Async注解详解

    这篇文章主要介绍了Springboot事件监听与@Async注解详解,在开发中经常可以利用Spring事件监听来实现观察者模式,进行一些非事务性的操作,如记录日志之类的,需要的朋友可以参考下
    2024-01-01
  • Java 时间格式转换之impleDateFormat与Data API解析与使用

    Java 时间格式转换之impleDateFormat与Data API解析与使用

    想必大家对 SimpleDateFormat 并不陌生。SimpleDateFormat 是 Java 中一个非常常用的类,他是以区域敏感的方式格式化和解析日期的具体类。 它允许格式化 (date -> text)、语法分析 (text -> date)和标准化
    2021-11-11
  • java算法导论之FloydWarshall算法实现代码

    java算法导论之FloydWarshall算法实现代码

    这篇文章主要介绍了算法导论之FloydWarshall算法实现代码的相关资料,需要的朋友可以参考下
    2017-05-05
  • IDEA中sout快捷键无效问题的解决方法

    IDEA中sout快捷键无效问题的解决方法

    这篇文章主要介绍了IDEA中sout快捷键无效问题,在类文件中进行操作会造成sout快捷命令无法自动生成,比如操作了import引入其它包之后,本文给大家分享解决方法,感兴趣的朋友一起看看吧
    2022-07-07
  • Java读取properties配置文件的8种方式汇总

    Java读取properties配置文件的8种方式汇总

    读取.properties配置文件在实际的开发中使用的很多,总结了一下,下面这篇文章主要给大家介绍了关于Java读取properties配置文件的8种方式,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-11-11
  • Java并发编程深入理解之Synchronized的使用及底层原理详解 上

    Java并发编程深入理解之Synchronized的使用及底层原理详解 上

    在并发编程中存在线程安全问题,主要原因有:1.存在共享数据 2.多线程共同操作共享数据。关键字synchronized可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块,同时synchronized可以保证一个线程的变化可见(可见性),即可以代替volatile
    2021-09-09

最新评论