SpringBoot Event 事件如何实现异步延迟执行

 更新时间:2023年02月14日 16:39:14   作者:peak_paradise  
这篇文章主要介绍了Spring Boot Event 事件如何实现异步延迟执行问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

SpringBoot Event 事件实现异步延迟执行

Spring的事件(Application Event)非常好用,虽然有一点会出现代码污染,但是在做不使用其他框架来做异步的情况先,还是非常方便的。

使用它只需要三样东西

  • 自定义事件:继承 ApplicationEvent,创建一个你想传的数据的对象,会在监听器那边收到该对象。
  • 定义监听器,实现 ApplicationListener 或者通过 @EventListener 注解到方法上,两种方式都行,但是推荐使用@EventListener,只要参数是你写的继承ApplicationEvent的对象,就会自动找到执行方法。
  • 定义发布者,通过 ApplicationEventPublisher,自带的bean,不需要单独声明,直接@Autowired就能使用,主要只需要publishEvent方法。

但是有时候我需要做延时执行,自带的功能缺不支持,但是我发现ApplicationEvent对象里面有两个成员变量,source和timestamp,构造函数(@since 5.3.8)也提供了同时注入这两个变量数据。

   /**
     * Create a new {@code ApplicationEvent} with its {@link #getTimestamp() timestamp}
     * set to the value returned by {@link Clock#millis()} in the provided {@link Clock}.
     * <p>This constructor is typically used in testing scenarios.
     * @param source the object on which the event initially occurred or with
     * which the event is associated (never {@code null})
     * @param clock a clock which will provide the timestamp
     * @since 5.3.8
     * @see #ApplicationEvent(Object)
     */
    public ApplicationEvent(Object source, Clock clock) {
        super(source);
        this.timestamp = clock.millis();
    }

但是,看了说明timestamp只是标志执行的时间,并不是为了延迟执行,可惜了。

于是查了一些资料,找到java.util.concurrent.DelayQueue对象,JDK自带了延迟的队列对象,我们可以考虑利用自带的timestamp和延迟队列DelayQueue结合一起来实现,具体DelayQueue的使用请自行查询,非常的简单。

首先,继承的ApplicationEvent重新实现一下。

不单单要继承ApplicationEvent,还需要实现Delayed,主要是因为DelayQueue队列中必须是Delayed的实现类

import java.time.Clock;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
 
import org.springframework.context.ApplicationEvent;
 
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
 
@Data
@EqualsAndHashCode(callSuper = false)
public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed {
 
    private static final long serialVersionUID = 1L;
 
    public ApplicationDelayedEvent(Object source) {
        this(source, 0L);
    }
 
    public ApplicationDelayedEvent(Object source, long delaySeconds) {
        super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds)));
    }
 
    @Override
    public int compareTo(Delayed o) {
        // 最好用NANOSECONDS,更精确,但是用处不大
        long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) delta;
    }
 
    @Override
    public long getDelay(TimeUnit unit) {
        // 最好用NANOSECONDS,更精确,但是用处不大,负数也会认为到时间了
        long millis = this.getTimestamp();
        long currentTimeMillis = System.currentTimeMillis();
        long sourceDuration = millis - currentTimeMillis;
        return unit.convert(sourceDuration, unit);
    }
}

多了两个必须实现的方法,compareTo是排序,应该是队列中的顺序。

getDelay是主要的方法,目的是归0的时候会从DelayQueue释放出来,当然那必须是NANOSECONDS级别的,我使用MILLISECONDS,就会出现负数,但也是可以的,也能释放出来。

另一个需要改的就是发布者,所以重新写一个ApplicationDelayEventPublisher

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Component
public class ApplicationDelayEventPublisher implements ApplicationRunner {
 
    // ApplicationDelayedEvent需要import进来
    private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>();
 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
 
    @Autowired
    @Qualifier("watchTaskExecutor")
    private ThreadPoolTaskExecutor poolTaskExecutor;
 
    public void publishEvent(ApplicationDelayedEvent event) {
        boolean result = delayQueue.offer(event);
        log.info("加入延迟队列。。。。{}", result);
    }
 
    @Override
    public void run(ApplicationArguments args) throws Exception {
        poolTaskExecutor.execute(() -> watchThread());
    }
 
    private void watchThread() {
        while (true) {
            try {
                log.info("启动延时任务的监听线程。。。。");
                ApplicationDelayedEvent event = this.delayQueue.take();
                log.info("接收到延时任务执行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
                eventPublisher.publishEvent(event);
            } catch (InterruptedException e) {
                log.info("启动延时任务的监听线程关闭");
                this.delayQueue.clear();
                break;
            }
        }
    }
}

需要实现ApplicationRunner作为Spring boot的启动时候运行的bean,目的就是开启监听线程,有事件到了执行时间take方法会得到数据,然后调用Spring原生的事件发布。

另外特别说明的就是监听线程不能随便创建,脱离了Spring容器的线程池会造成关闭服务的时候造成无法关闭的现象,所以建议还是自定义一个ThreadPoolTaskExecutor

    @Bean
    public ThreadPoolTaskExecutor watchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("watch_task_");
 
        // 线程池对拒绝任务的处理策略
//        ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
//        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
//        ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。
//        ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }

最后就是接收事件,跟传统的接收是一样的,异步只需要在配置类上加上@EnableAsync注解就行了,然后在监听的方法上加@Async

import java.util.concurrent.ThreadPoolExecutor;
 
import javax.annotation.PostConstruct;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Configuration
@EnableAsync
@ConditionalOnClass(ApplicationDelayEventPublisher.class)
public class DelayEventConfiguration {
 
    @PostConstruct
    public void init() {
        log.info("延迟Spring事件模块启动中。。。");
    }
    
    // 不能和监听线程放到一个线程池,不然无法执行
    @Bean
    public ThreadPoolTaskExecutor poolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(10000);
        executor.setKeepAliveSeconds(30);
        executor.setThreadNamePrefix("my_task_");
 
        // 线程池对拒绝任务的处理策略
//        ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
//        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
//        ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。
//        ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
    
    @Bean
    public ThreadPoolTaskExecutor watchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("watch_task_");
 
        // 线程池对拒绝任务的处理策略
//        ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
//        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
//        ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。
//        ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}

    @Async("poolTaskExecutor")
    @EventListener
    public void listenDelayEvent(ApplicationDelayedEvent event) {
        log.info("收到执行事件:{}", event.getSource());
    }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • mybatis 传入null值的解决方案

    mybatis 传入null值的解决方案

    这篇文章主要介绍了mybatis 传入null值的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • 深入理解Mybatis中的resultType和resultMap

    深入理解Mybatis中的resultType和resultMap

    这篇文章给大家介绍了mybatis中的resultType和resultMap的用法实例讲解,MyBatis中在查询进行select映射的时候,返回类型可以用resultType,也可以用resultMap,至于两种用法区别,通过本文一起学习吧
    2016-09-09
  • Spring的IOC容器实例化bean的方式总结

    Spring的IOC容器实例化bean的方式总结

    IOC容器实例化bean的三种方式:构造方法、静态工厂、实例工厂,本文将通过代码示例给大家详细讲解一下这三种方式,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-01-01
  • java 使用简单的demo实例告诉你优化算法的强大

    java 使用简单的demo实例告诉你优化算法的强大

    本篇文章介绍了,在java中使用简单的demo实例告诉你优化算法的强大。需要的朋友参考下
    2013-05-05
  • Simple JSON开发指南

    Simple JSON开发指南

    注意:JSONPauser不是线程安全的,需要的朋友可以参考下
    2016-04-04
  • java中的数学计算函数的总结

    java中的数学计算函数的总结

    这篇文章主要介绍了java中的数学计算函数的总结的相关资料,需要的朋友可以参考下
    2017-07-07
  • MyBatis动态SQL实现配置过程解析

    MyBatis动态SQL实现配置过程解析

    这篇文章主要介绍了MyBatis动态SQL实现配置过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • Jmerte分布式压测及分布式压测配置教程

    Jmerte分布式压测及分布式压测配置教程

    这篇文章主要介绍了Jmerte分布式压测及分布式压测配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-04-04
  • SpringBoot RESTful 应用中的异常处理梳理小结

    SpringBoot RESTful 应用中的异常处理梳理小结

    这篇文章主要介绍了SpringBoot RESTful 应用中的异常处理梳理小结,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-05-05
  • 简单阐述一下Java集合的概要

    简单阐述一下Java集合的概要

    今天给大家带来的文章是关于Java的相关知识,文章围绕着Java集合的概要展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06

最新评论