spring的异步执行使用与源码详解

 更新时间:2023年05月12日 09:40:21   作者:morris131  
这篇文章主要介绍了spring的异步执行使用与源码详解,Spring中通过在方法上设置@Async注解,可使得方法被异步调用,需要的朋友可以参考下

在实际的开发过程中,有些业务逻辑使用异步的方式处理更为合理。比如在某个业务逻辑中,需要把一些数据存入到redis缓存中,这个操作只是一个辅助的功能,成功或者失败对主业务并不会产生根本影响,这个过程可以通过异步的方法去进行。

Spring中通过在方法上设置@Async注解,可使得方法被异步调用。也就是说该方法会在调用时立即返回,而这个方法的实际执行交给Spring的TaskExecutor去完成。

异步执行的使用

配置类

使用@EnableAsync注解开启异步功能。

package com.morris.spring.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 开启Async
public class AsyncConfig implements AsyncConfigurer {
	@Override
	public Executor getAsyncExecutor() {
		// 自定义线程池
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(2);
		executor.setMaxPoolSize(4);
		executor.setQueueCapacity(10);
		executor.setThreadNamePrefix("MyExecutor-");
		executor.initialize();
		return executor;
	}
}

service层的使用

在需要异步执行的方法上面加上@Async注解。

package com.morris.spring.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@Slf4j
public class AsyncService {
	@Async
	public void noResult() {
		log.info("execute noResult");
	}
	@Async
	public Future<String> hasResult() throws InterruptedException {
		log.info("execute hasResult");
		TimeUnit.SECONDS.sleep(5);
		return new AsyncResult<>("hasResult success");
	}
	@Async
	public CompletableFuture<String> completableFuture() throws InterruptedException {
		log.info(" execute completableFuture");
		TimeUnit.SECONDS.sleep(5);
		return CompletableFuture.completedFuture("completableFuture success");
	}
}

测试类

package com.morris.spring.demo.async;
import com.morris.spring.config.AsyncConfig;
import com.morris.spring.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
 * 异步调用的演示
 */
@Slf4j
public class AsyncDemo {
	@Test
	public void test() throws ExecutionException, InterruptedException {
		AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
		applicationContext.register(AsyncService.class);
		applicationContext.register(AsyncConfig.class);
		applicationContext.refresh();
		AsyncService asyncService = applicationContext.getBean(AsyncService.class);
		asyncService.noResult(); // 无结果
		Future<String> future = asyncService.hasResult();
		log.info("hasResult: {}", future.get()); // 有结果
		CompletableFuture<String> completableFuture = asyncService.completableFuture();
		completableFuture.thenAcceptAsync(System.out::println);// 异步回调
		log.info("completableFuture call down");
	}
}

运行结果如下:

INFO  MyExecutor-1 AsyncService:16 - execute noResult
INFO  MyExecutor-2 AsyncService:21 - execute hasResult
INFO  main AsyncDemo:29 - hasResult: hasResult success
INFO  MyExecutor-1 AsyncService:28 -  execute completableFuture
INFO  main AsyncDemo:33 - completableFuture call down

通过日志可以发现AsyncService的方法都是通过线程名为MyExecutor-1的线程执行的,这个名称的前缀是在AsyncConfig中指定的,而不是通过main线程执行的。

两个疑问:

  • 是否可以不配置Executor线程池,Spring会默认创建默认的Executor,还是会报错?
  • Executor线程池中执行任务时如果抛出了异常,可否自定义异常的处理类对异常进行捕获处理?

源码分析

@EnableAsync

@EnableAsync主要是向Spring容器中导入了AsyncConfigurationSelector类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

AsyncConfigurationSelector

AsyncConfigurationSelector的主要方法当然是selectImports(),注意这里会先调用父类的selectImports() org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)

public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
	Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
	Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
	AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
	if (attributes == null) {
		throw new IllegalArgumentException(String.format(
				"@%s is not present on importing class '%s' as expected",
				annType.getSimpleName(), importingClassMetadata.getClassName()));
	}
	AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
	// 模板方法模式,回调子类的selectImports
	String[] imports = selectImports(adviceMode);
	if (imports == null) {
		throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
	}
	return imports;
}

org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports

public String[] selectImports(AdviceMode adviceMode) {
	switch (adviceMode) {
		case PROXY:
			// 奇怪???@Transaction、@EnableCaching都是注入两个类,一个config,一个registrar导入aop的入口类
			// 而这里只有一个config类ProxyAsyncConfiguration
			return new String[] {ProxyAsyncConfiguration.class.getName()};
		case ASPECTJ:
			return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
		default:
			return null;
	}
}

AsyncConfigurationSelector又导入了配置类ProxyAsyncConfiguration。

ProxyAsyncConfiguration

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
	/**
	 * 先看父类AbstractAsyncConfiguration
	 * @return
	 */
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		// 实例化AsyncAnnotationBeanPostProcessor
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}

ProxyAsyncConfiguration向容器中注入了一个AsyncAnnotationBeanPostProcessor。

疑问:这里为啥是BeanPostProcessor,不应该像事务切面或者缓存切面一样,注入一个Advisor和XxxxInterceptor(Advice)吗?

AbstractAsyncConfiguration

AbstractAsyncConfiguration是ProxyAsyncConfiguration的父类。

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
	@Nullable
	protected AnnotationAttributes enableAsync;
	@Nullable
	protected Supplier<Executor> executor;
	@Nullable
	protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
	/**
	 * 实现了ImportAware.setImportMetadata
	 * 在ProxyAsyncConfiguration初始化后被调用
	 * @param importMetadata
	 */
	@Override
	public void setImportMetadata(AnnotationMetadata importMetadata) {
		// 取得@EnableAsync注解
		this.enableAsync = AnnotationAttributes.fromMap(
				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
		if (this.enableAsync == null) {
			throw new IllegalArgumentException(
					"@EnableAsync is not present on importing class " + importMetadata.getClassName());
		}
	}
	/**
	 * Collect any {@link AsyncConfigurer} beans through autowiring.
	 */
	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		// configurers默认为空,除非手动注入AsyncConfigurer
		if (CollectionUtils.isEmpty(configurers)) {
			return;
		}
		if (configurers.size() > 1) {
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
	}
}

从这里可以看出,可以通过向spring容器中注入AsyncConfigurer来指定执行异步任务的线程池和异常处理器。

AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor的继承结构图:

20220424174516480.png

AsyncAnnotationBeanPostProcessor主要实现了BeanFactoryAware和BeanPostProcessor接口。

org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory

public void setBeanFactory(BeanFactory beanFactory) {
	super.setBeanFactory(beanFactory);
	// 实例化Advisor
	AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
	if (this.asyncAnnotationType != null) {
		advisor.setAsyncAnnotationType(this.asyncAnnotationType);
	}
	advisor.setBeanFactory(beanFactory);
	this.advisor = advisor;
}

在AsyncAnnotationBeanPostProcessor实例化时实例化了切面AsyncAnnotationAdvisor。

每个bean实例化完后都会调用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判断是否要生成代理对象。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
	... ...
	/**
	 * @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String)
	 */
	// isEligible会判断哪些bean要生成代理
	// 就是使用advisor中的pointcut进行匹配
	if (isEligible(bean, beanName)) {
		// 创建代理
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);
		return proxyFactory.getProxy(getProxyClassLoader());
	}
	// No proxy needed.
	return bean;
}

AsyncAnnotationAdvisor

切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切点ComposablePointcut。

public AsyncAnnotationAdvisor(
		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
	asyncAnnotationTypes.add(Async.class);
	try {
		asyncAnnotationTypes.add((Class<? extends Annotation>)
				ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
	}
	catch (ClassNotFoundException ex) {
		// If EJB 3.1 API not present, simply ignore.
	}
	this.advice = buildAdvice(executor, exceptionHandler); // 创建AnnotationAsyncExecutionInterceptor
	this.pointcut = buildPointcut(asyncAnnotationTypes); // 创建ComposablePointcut
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
	interceptor.configure(executor, exceptionHandler);
	return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
	ComposablePointcut result = null;
	for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
		Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 类
		Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法
		if (result == null) {
			result = new ComposablePointcut(cpc);
		}
		else {
			result.union(cpc); // 类和方法的组合切点
		}
		result = result.union(mpc);
	}
	return (result != null ? result : Pointcut.TRUE);
}

AnnotationMatchingPointcut切面其实就是查看类或者方法上面有没有@Async注解。

AnnotationAsyncExecutionInterceptor

AnnotationAsyncExecutionInterceptor类主要负责增强逻辑的实现。

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

public Object invoke(final MethodInvocation invocation) throws Throwable {
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
	Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
	final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
	// 获得线程池
	AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
	if (executor == null) {
		throw new IllegalStateException(
				"No executor specified and no default executor set on AsyncExecutionInterceptor either");
	}
	// 将目标方法的执行封装为Callable,方便提交到线程池
	Callable<Object> task = () -> {
		try {
			// 执行目标方法
			Object result = invocation.proceed();
			if (result instanceof Future) {
				return ((Future<?>) result).get();
			}
		}
		catch (ExecutionException ex) {
			handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
		}
		catch (Throwable ex) {
			handleError(ex, userDeclaredMethod, invocation.getArguments());
		}
		return null;
	};
	// 提交任务
	return oSubmit(task, executor, invocation.getMethod().getReturnType());
}

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
	AsyncTaskExecutor executor = this.executors.get(method);
	if (executor == null) {
		Executor targetExecutor;
		/**
		 * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier(java.lang.reflect.Method)
		 */
		// 获得@Async注解中的value属性中指定的taskExecutor名称
		String qualifier = getExecutorQualifier(method);
		if (StringUtils.hasLength(qualifier)) {
			targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
		}
		else {
			// 获取默认的taskExecutor
			targetExecutor = this.defaultExecutor.get();
		}
		if (targetExecutor == null) {
			return null;
		}
		executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
				(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
		this.executors.put(method, executor);
	}
	return executor;
}

determineAsyncExecutor()负责获取异步任务执行的线程池,线程池的查找步骤如下:

  1. 从spring容器中寻找@Async注解中的value属性中指定的taskExecutor
  2. 寻找默认的defaultExecutor

默认的defaultExecutor是怎么来的?

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure

public void configure(@Nullable Supplier<Executor> defaultExecutor,
		@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	// defaultExecutor默认为从beanFactory获取TaskExecutor或者bean名字为taskExecutor的Executor,beanFactory.getBean(TaskExecutor.class)
	this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
	// exceptionHandler默认为SimpleAsyncUncaughtExceptionHandler
	this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}

defaultExecutor首先取参数传入的defaultExecutor,这个参数来自接口AsyncConfigurer.getAsyncExecutor(),如果参数为null,那么就调用getDefaultExecutor(),注意这个方法子类AsyncExecutionInterceptor重写了:

org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

如果找不到defaultExecutor就会创建一个SimpleAsyncTaskExecutor。

再来看看父类的AsyncExecutionAspectSupport#getDefaultExecutor: org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	if (beanFactory != null) {
		try {
			// Search for TaskExecutor bean... not plain Executor since that would
			// match with ScheduledExecutorService as well, which is unusable for
			// our purposes here. TaskExecutor is more clearly designed for it.
			return beanFactory.getBean(TaskExecutor.class);
		}
		catch (NoUniqueBeanDefinitionException ex) {
			logger.debug("Could not find unique TaskExecutor bean", ex);
			try {
				// 找名为taskExecutor的Executor
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				if (logger.isInfoEnabled()) {
					logger.info("More than one TaskExecutor bean found within the context, and none is named " +
							"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
							"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
				}
			}
		}
		catch (NoSuchBeanDefinitionException ex) {
			logger.debug("Could not find default TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				logger.info("No task executor bean found for async processing: " +
						"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
			}
			// Giving up -> either using local default executor or none at all...
		}
	}
	return null;
}

先从beanFactory中获取TaskExecutor类型的对象,然后再找名为taskExecutor的Executor对象。

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
	// 执行任务
	if (CompletableFuture.class.isAssignableFrom(returnType)) {
		return CompletableFuture.supplyAsync(() -> {
			try {
				return task.call();
			}
			catch (Throwable ex) {
				throw new CompletionException(ex);
			}
		}, executor);
	}
	else if (ListenableFuture.class.isAssignableFrom(returnType)) {
		return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
	}
	else if (Future.class.isAssignableFrom(returnType)) {
		return executor.submit(task);
	}
	else {
		executor.submit(task);
		return null;
	}
}

doSubmit()负责将任务提交至线程池中,并对各种方法的返回值进行处理。

到此这篇关于spring的异步执行使用与源码详解的文章就介绍到这了,更多相关spring的异步执行内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java中的static{}块的实例详解

    java中的static{}块的实例详解

    这篇文章主要介绍了java中的static{}块的实例详解的相关资料,这里提供实例来帮助大家理解该如何使用static块,需要的朋友可以参考下
    2017-08-08
  • Java使用Sharding-JDBC分库分表进行操作

    Java使用Sharding-JDBC分库分表进行操作

    Sharding-JDBC 是无侵入式的 MySQL 分库分表操作工具,本文主要介绍了Java使用Sharding-JDBC分库分表进行操作,感兴趣的可以了解一下
    2021-08-08
  • SpringBoot配置的加载流程详细分析

    SpringBoot配置的加载流程详细分析

    了解内部原理是为了帮助我们做扩展,同时也是验证了一个人的学习能力,如果你想让自己的职业道路更上一层楼,这些底层的东西你是必须要会的,这篇文章主要介绍了SpringBoot配置的加载流程
    2023-01-01
  • 深入Java7的一些新特性以及对脚本语言支持API的介绍

    深入Java7的一些新特性以及对脚本语言支持API的介绍

    本篇文章是对Java7的一些新特性以及对脚本语言支持API的概述,需要的朋友参考下
    2013-05-05
  • 生成8位随机不重复的数字编号的方法

    生成8位随机不重复的数字编号的方法

    生成随机不重复的数字编号在某些情况下也会用到,本文以生成8位随机不重复的数字编号为例与大家分享下具体的实现过程,感兴趣的朋友可以参考下
    2013-09-09
  • 使用maven打包生成doc文档和打包源码

    使用maven打包生成doc文档和打包源码

    这篇文章主要介绍了使用maven打包生成doc文档和打包源码的实现,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • 基于字符串常用API(详解)

    基于字符串常用API(详解)

    下面小编就为大家带来一篇基于字符串常用API(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • Sping中如何处理@Bean注解bean同名的问题

    Sping中如何处理@Bean注解bean同名的问题

    这篇文章主要介绍了Sping中如何处理@Bean注解bean同名的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • Java求余%操作引发的一连串故事

    Java求余%操作引发的一连串故事

    取模运算与取余运算两个概念有重叠的部分但又不完全一致。主要的区别在于对负整数进行除法运算时操作不同。本文重点给大家介绍Java求余%操作引发的一连串故事,感兴趣的朋友跟随小编一起看看吧
    2021-05-05
  • 详细分析Java并发集合ArrayBlockingQueue的用法

    详细分析Java并发集合ArrayBlockingQueue的用法

    这篇文章主要介绍了详细分析Java并发集合ArrayBlockingQueue的用法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04

最新评论