SpringBoot使用异步线程池实现生产环境批量数据推送

 更新时间:2022年02月04日 11:23:24   作者:福隆苑居士  
本文主要介绍了SpringBoot使用异步线程池实现生产环境批量数据推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言

SpringBoot使用异步线程池:

1、编写线程池配置类,自定义一个线程池;

2、定义一个异步服务;

3、使用@Async注解指向定义的线程池;

这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为。但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用线程池上报,极大提高了速度。

编写线程池配置类

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * 类名称:ExecutorConfig
 * ********************************
 * <p>
 * 类描述:线程池配置
 *
 * @author guoj
 * @date 2021-09-07 09:00
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
    /**
     * 定义数据上报线程池
     * @return
     */
    @Bean("dataCollectionExecutor")
    public Executor dataCollectionExecutor() {
 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 
        // 核心线程数量:当前机器的核心数
        executor.setCorePoolSize(
                Runtime.getRuntime().availableProcessors());
 
        // 最大线程数
        executor.setMaxPoolSize(
                Runtime.getRuntime().availableProcessors() * 2);
 
        // 队列大小
        executor.setQueueCapacity(Integer.MAX_VALUE);
 
        // 线程池中的线程名前缀
        executor.setThreadNamePrefix("sjsb-");
 
        // 拒绝策略:直接拒绝
        executor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.AbortPolicy());
 
        // 执行初始化
        executor.initialize();
 
        return executor;
    }
}

 PS:

1)、需要注意,这里一定要自己定义ThreadPoolTaskExecutor线程池,否则springboot的异步注解会执行默认线程池,存在线程阻塞导致CPU飙高及内存溢出的风险。这一点可以参考阿里开发手册,线程池定义这块明确提到了这一点;

2)、在@Bean注解中定义线程池名称,后面异步注解会用到。

编写异步服务

/**
 * 异步方法的服务, 不影响主程序运行。
 */
@Service
public class AsyncService {
 
    private final Logger log = LoggerFactory.getLogger(AsyncService.class);
 
    /**
     * 发送短信
     */
    @Async("sendMsgExecutor")
    public void sendMsg(String access_token, Consult item, Map<String, String> configMap) {
        // 此处编写发送短信业务
        // 1、buildConsultData();
        // 2、sendMsg();
    }
 
    /**
     * 发送微信订阅消息
     */
    @Async
    public void sendSubscribeMsg(String access_token, Consult item, Map<String, String> configMap) {
        // 此处编写发送微信订阅消息业务
        // 1、buildConsultData();
        // 2、sendSubscribeMsg();
    }
 
    /**
     * 数据并上报
     */
    @Async("dataCollectionExecutor")
    public void buildAndPostData(String access_token, Consult item, Map<String, String> configMap) {
        // 此处编写上报业务,如拼接数据,然后执行上报。
        // 1、buildConsultData();
        // 2、postData();
    }
}

PS:
1)、以上是代码片段,个人经验认为专门定义一个异步service存放各个异步方法最佳,这样可以避免编码时一些误操作比如异步方法不是void或者是private修饰,导致@Async注解失效的情况,同时可以安排每个注解指向不同的自定义线程池更加灵活;
2)、@Async注解中的名称就是上面定义的自定义线程池名称,这样业务执行时就会从指定线程池中获取异步线程。

异步批量上报数据

@Autowired
private AsyncService asyncService;
 
/**
 * 手动上报问诊记录,线程池方式。
 */
public void manualUploadConsultRecordsAsync(String channel, Date startTime, Date endTime) {
 
    // 查询指定时间内的问诊记录
   List<Consult> consultList = consultService
       .findPaidListByChannelAndTime(channel, startTime, endTime, configMap.get("serviceId"));
 
   if (!CollectionUtils.isEmpty(consultList)) {
 
       log.debug("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录, 一共[{}]条", consultList.size());
 
       consultList.forEach((item) -> {
           try {
               // 异步调用,使用线程池。
               asyncService.buildAndPostData(access_token, item, configMap);
           } catch (Exception ex) {
               log.error("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录发生异常: ", ex);
           }
       });
   }
}

总结

以上方式已经在生产环境运行,在工作时间内执行过很多次,一次数万条记录基本是几分钟内就全部上报完毕,而正常循环遍历时一次大概需要半个小时左右。

线程池的使用方式往往来源于业务场景,如果类似的业务不存在紧急处理的情况,大体还是以任务调度执行为主,因为更安全。如果存在紧急处理的情况,那么使用SpringBoot+线程池的方式不仅能节省非常多的时间,且不占用主线程的执行空间。

到此这篇关于SpringBoot使用异步线程池实现生产环境批量数据推送的文章就介绍到这了,更多相关SpringBoot 生产环境批量数据推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Mybatis使用useGeneratedKeys获取自增主键

    Mybatis使用useGeneratedKeys获取自增主键

    这篇文章主要为大家介绍了Mybatis使用useGeneratedKeys获取自增主键示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • 带你入门java雪花算法原理

    带你入门java雪花算法原理

    SnowFlake 算法,是 Twitter 开源的分布式 id 生成算法。其核心思想就是:使用一个 64 bit 的 long 型的数字作为全局唯一 id。在分布式系统中的应用十分广泛,且ID 引入了时间戳,基本上保持自增的
    2021-06-06
  • springboot的java配置方式(实例讲解)

    springboot的java配置方式(实例讲解)

    下面小编就为大家分享一篇实例讲解springboot的java配置方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2017-11-11
  • java中雪花算法时钟回拨问题解决

    java中雪花算法时钟回拨问题解决

    本文介绍了分布式系统中使用雪花算法生成唯一ID时可能遇到的时钟回拨问题,以及解决这个问题的几种方法,包括等待机制、扩展位、预留时间戳或逻辑时钟等,感兴趣的可以了解一下
    2024-10-10
  • java设计模式之实现对象池模式示例分享

    java设计模式之实现对象池模式示例分享

    对象池模式经常用在频繁创建、销毁对象(并且对象创建、销毁开销很大)的场景,比如数据库连接池、线程池、任务队列池等。本代码简单,没有限制对象池大小
    2014-02-02
  • Java windows环境构建图文教程

    Java windows环境构建图文教程

    这篇文章主要为大家介绍了Java windows环境构建图文教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪<BR>
    2023-12-12
  • springboot+shiro+jwtsession和token进行身份验证和授权

    springboot+shiro+jwtsession和token进行身份验证和授权

    最近和别的软件集成项目,需要提供给别人接口来进行数据传输,发现给他token后并不能访问我的接口,拿postman试了下还真是不行,检查代码发现项目的shiro配置是通过session会话来校验信息的,修改代码兼容token和session
    2024-06-06
  • springboot应用中使用过滤器的过程详解

    springboot应用中使用过滤器的过程详解

    过滤器通常用于实现跨切面的功能,例如身份验证、日志记录、请求和响应的修改、性能监控等,这篇文章主要介绍了springboot应用中使用过滤器,需要的朋友可以参考下
    2023-06-06
  • 浅谈自定义注解在Spring中的应用

    浅谈自定义注解在Spring中的应用

    这篇文章主要介绍了浅谈自定义注解在Spring中的应用,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • Java如何使用字符流读写非文本文件

    Java如何使用字符流读写非文本文件

    这篇文章主要介绍了Java如何使用字符流读写非文本文件,以Java的字符流读取文件为例:它只能读取0-65535之间的字符,可以看出来字符都是正数,但是二进制的byte是可以为负数的,需要的朋友可以参考下
    2023-04-04

最新评论