Async的线程池使用选择解析

 更新时间:2023年06月15日 16:59:31   作者:我是一颗小虎牙_  
这篇文章主要为大家介绍了Async的线程池使用选择解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

在Spring中我们经常会用到异步操作,注解中使用 @EnableAsync@Async 就可以使用它了。但是最近发现在异步中线程号使用的是我们项目中自定义的线程池 ThreadPoolTaskExecutor 而不是之前熟悉的 SimpleAsyncTaskExecutor

那么来看一下他的执行过程吧。

正文

  • 首先要使异步生效,我们得在启动类中加入 @EnableAsync 那么就点开它看看。它会使用 @Import 注入一个 AsyncConfigurationSelector 类,启动是通过父类可以决定它使用的是配置类 ProxyAsyncConfiguration
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    public AsyncConfigurationSelector() {
    }
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch(adviceMode) {
        case PROXY:
            return new String[]{ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
        default:
            return null;
        }
    }
}
  • 点开能够看到注入一个 AsyncAnnotationBeanPostProcessor 。它实现了 BeanPostProcessor 接口,因此它是一个后处理器,用于将 Spring AOPAdvisor 应用于给定的 bean 。从而该bean上通过异步注解所定义的方法在调用时会被真正地异步调用起来。
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    public ProxyAsyncConfiguration() {
    }
    @Bean(
        name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
    )
    @Role(2)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
        return bpp;
    }
}
  • AsyncAnnotationBeanPostProcessor 的父类实现了 BeanFactoryAware ,那么会在 AsyncAnnotationBeanPostProcessor 实例化之后回调 setBeanFactory() 来实例化切面 AsyncAnnotationAdvisor
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    //定义一个切面
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}
  • AsyncAnnotationAdvisor 构造和声明切入的目标(切点)和代码增强(通知)。
    public AsyncAnnotationAdvisor(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
        Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
        asyncAnnotationTypes.add(Async.class);
        try {
            asyncAnnotationTypes.add((Class<? extends Annotation>)
                    ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
        }
        catch (ClassNotFoundException ex) {
            // If EJB 3.1 API not present, simply ignore.
        }
        //通知
        this.advice = buildAdvice(executor, exceptionHandler);
        //切入点
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }
  • 通知就是最终要执行的。buildAdvice 用于构建通知,主要是创建一个 AnnotationAsyncExecutionInterceptor 类型的拦截器,并且配置好使用的执行器和异常处理器。真正的异步执行的代码在 AsyncExecutionAspectSupport 中!
protected Advice buildAdvice(
            @Nullable Supplier&lt;Executor&gt; executor, @Nullable Supplier&lt;AsyncUncaughtExceptionHandler&gt; exceptionHandler) {
        AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
        //配置拦截器
        interceptor.configure(executor, exceptionHandler);
        return interceptor;
    }
  • 配置拦截器,通过参数配置自定义的执行器和异常处理器或者使用默认的执行器和异常处理器。
public void configure(@Nullable Supplier&lt;Executor&gt; defaultExecutor,
            @Nullable Supplier&lt;AsyncUncaughtExceptionHandler&gt; exceptionHandler) {
        //默认执行器
        this.defaultExecutor = new SingletonSupplier&lt;&gt;(defaultExecutor, () -&gt; getDefaultExecutor(this.beanFactory));
        this.exceptionHandler = new SingletonSupplier&lt;&gt;(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
    }
  • getDefaultExecutor() 方法,用来查找默认的执行器,父类 AsyncExecutionAspectSupport 首先寻找唯一一个类型为 TaskExecutor 的执行器并返回,若存在多个则寻找默认的执行器 taskExecutor ,若无法找到则直接返回null。子类 AsyncExecutionInterceptor 重写 getDefaultExecutor 方法,首先调用父类逻辑,返回null则配置一个名为 SimpleAsyncTaskExecutor 的执行器
/**
 * 父类
 * 获取或构建此通知实例的默认执行器
 * 这里返回的执行器将被缓存以供后续使用
 * 默认实现搜索唯一的TaskExecutor的bean
 * 在上下文中,用于名为“taskExecutor”的Executor bean。
 * 如果两者都不是可解析的,这个实现将返回 null
 */
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    if (beanFactory != null) {
        try {
            // 搜索唯一的一个TaskExecutor类型的bean并返回
            return beanFactory.getBean(TaskExecutor.class);
        }
        catch (NoUniqueBeanDefinitionException ex) {
            //找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
            logger.debug("Could not find unique TaskExecutor bean", ex);
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskExecutor bean found within the context, and none is named " +
                            "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
                            "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            //未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
            logger.debug("Could not find default TaskExecutor bean", ex);
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                logger.info("No task executor bean found for async processing: " +
                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
            }
            // Giving up -&gt; either using local default executor or none at all...
        }
    }
    return null;
}
/**
 * 子类
 * 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器
 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
    return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

所以,到了这一步就可以理解为什么异步线程名默认叫 SimpleAsyncTaskExecutor-xx ,为什么有了自己的线程池有可能异步用到了自己的线程池配置。

我们有这个切入点之后,每次请求接口执行异步方法前都会执行 AsyncExecutionInterceptor#invoke()determineAsyncExecutor 用来决策使用哪个执行器

@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    //在缓存的执行器中选择一个对应方法的执行器
    AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
    if (executor == null) {
        //获取@Async注解中的value(指定的执行器)
        String qualifier = this.getExecutorQualifier(method);
        Executor targetExecutor;
        if (StringUtils.hasLength(qualifier)) {
            //获取指定执行器的bean
            targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
        } else {
            //选择默认的执行器
            targetExecutor = (Executor)this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
        //将执行器进行缓存
        this.executors.put(method, executor);
    }
    return (AsyncTaskExecutor)executor;
}

当有了执行器调用 doSubmit 方法将任务加入到执行器中。

异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:

不复用线程,也就是说为每个任务新起一个线程。但是可以通过 concurrencyLimit 属性来控制并发线程数量,但是默认情况下不做限制( concurrencyLimit 取值为-1)。
因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!

总结

本文主要以看源码的方式来了解异步注解 @Async 是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响。

以上就是Async的线程池使用选择解析的详细内容,更多关于Async线程池使用的资料请关注脚本之家其它相关文章!

相关文章

  • Java阻塞队列中的BlockingQueue接口详解

    Java阻塞队列中的BlockingQueue接口详解

    这篇文章主要介绍了Java阻塞队列中的BlockingQueue接口详解,对于Queue而言,BlockingQueue是主要的线程安全的版本,具有阻塞功能,可以允许添加、删除元素被阻塞,直到成功为止,BlockingQueue相对于Queue而言增加了两个方法put、take元素,需要的朋友可以参考下
    2023-09-09
  • SpringBoot3.2.2整合MyBatis-Plus3.5.5依赖不兼容的问题解决

    SpringBoot3.2.2整合MyBatis-Plus3.5.5依赖不兼容的问题解决

    这篇文章给大家介绍了Spring Boot 3.2.2整合MyBatis-Plus 3.5.5依赖不兼容问题,文中通过代码示例和图文介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-01-01
  • Java Process中waitFor()的问题详解

    Java Process中waitFor()的问题详解

    这篇文章主要给大家介绍了关于Java Process中waitFor()问题的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-12-12
  • 解析Java异常的栈轨迹及其相关方法

    解析Java异常的栈轨迹及其相关方法

    这篇文章主要介绍了解析Java异常的栈轨迹及其相关方法,包括栈轨迹的打印和fillInStackTrace方法等,需要的朋友可以参考下
    2015-11-11
  • 基于mybatis-plus timestamp返回为null问题的排除

    基于mybatis-plus timestamp返回为null问题的排除

    这篇文章主要介绍了mybatis-plus timestamp返回为null问题的排除,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Eclipse操作SVN时中断锁定,文件的解锁方法

    Eclipse操作SVN时中断锁定,文件的解锁方法

    这篇文章主要介绍了Eclipse操作SVN时中断锁定,文件的解锁方法,需要的朋友可以参考下
    2014-08-08
  • 浅析Java中静态代理和动态代理的应用与区别

    浅析Java中静态代理和动态代理的应用与区别

    代理模式在我们生活中很常见,而Java中常用的两个的代理模式就是动态代理与静态代理,这篇文章主要为大家介绍了二者的应用与区别,需要的可以参考下
    2023-08-08
  • Java实现文件监控器FileMonitor的实例代码

    Java实现文件监控器FileMonitor的实例代码

    这篇文章主要介绍了Java实现文件监控器FileMonitor的实例代码,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-12-12
  • 在eclipse中使用SVN的方法(图文)

    在eclipse中使用SVN的方法(图文)

    这篇文章主要介绍了在eclipse中使用SVN的方法(图文),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • 关于maven依赖 ${xxx.version}报错问题

    关于maven依赖 ${xxx.version}报错问题

    这篇文章主要介绍了关于maven依赖 ${xxx.version}报错问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01

最新评论