Spring的定时任务@Scheduled源码详解
Spring的定时任务@Scheduled源码详解
EnableScheduling
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Import({SchedulingConfiguration.class}) //@Import注解将SchedulingConfiguration导入到IOC中 @Documented public @interface EnableScheduling { }
SchedulingConfiguration
将ScheduledAnnotationBeanPostProcessor加入到Spring容器中 @Configuration //当结合@Bean注解时,@Bean注解的类可以类比在spring.xml中定义
@Role(2) public class SchedulingConfiguration { public SchedulingConfiguration() { } @Bean( name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"} ) @Role(2) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
ScheduledAnnotationBeanPostProcessor
1、先看下ScheduledAnnotationBeanPostProcessor有哪些属性
//string属性解析器,用来解析${}对应的配置文件的属性,aware接口注入 @Nullable private StringValueResolver embeddedValueResolver; @Nullable private String beanName; @Nullable private BeanFactory beanFactory; //aware接口注入 @Nullable private ApplicationContext applicationContext; //aware接口注入 @Nullable //定时任务线程池,如果不为空使用这个scheduler当作ScheduledTaskRegistrar的线程池 private Object scheduler; //定时任务的注册器,通过这个类将定时任务委托给定时任务线程池 private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar(); //已检测的没有scheduled注解的类的集合 private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap(64)); //保存class与scheduled方法的映射 private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap(16);
2、根据上面的定时任务流程,在每个Bean的属性填充完之后,调用postProcessAfterInitialization方法,将带有@Scheduled 注解的方法,在拿到@Scheduled注解的方法后,调用processScheduled
public Object postProcessAfterInitialization(Object bean, String beanName) { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class); return !scheduledMethods.isEmpty() ? scheduledMethods : null; }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (this.logger.isTraceEnabled()) { this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass()); } } else { annotatedMethods.forEach((method, scheduledMethods) -> { scheduledMethods.forEach((scheduled) -> { //这里调用 this.processScheduled(scheduled, method, bean); }); }); if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
3、要了解processScheduled方法做了什么,可以先看下@Scheduled 注解的定义
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(Schedules.class) public @interface Scheduled { String cron() default ""; String zone() default ""; long fixedDelay() default -1L; String fixedDelayString() default ""; long fixedRate() default -1L; String fixedRateString() default ""; long initialDelay() default -1L; String initialDelayString() default ""; }
可以看到,processScheduled方法主要是使用embeddedValueResolver对带String后缀的属性进行从配置文件读取的操作,根据每个方法上使用的注解判断定时任务的类型是CronTask还是FixedRateTask,将这些任务添加到ScheduledTaskRegistrar中的unresolvedTasks
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet(4); long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0L, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException var25) { throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1L, "'initialDelay' not supported for cron triggers"); processedSchedule = true; TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } //省略了一部分解析过程,和解析cron是一样的 ... ... Assert.isTrue(processedSchedule, errorMessage); synchronized(this.scheduledTasks) { Set<ScheduledTask> registeredTasks = (Set)this.scheduledTasks.get(bean); if (registeredTasks == null) { registeredTasks = new LinkedHashSet(4); this.scheduledTasks.put(bean, registeredTasks); } ((Set)registeredTasks).addAll(tasks); } } catch (IllegalArgumentException var26) { throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + var26.getMessage()); } }
对于3中,比较关键的代码
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
跟踪到this.registrar.scheduleCronTask(),这里跳转到ScheduledTaskRegistrar类的scheduleCronTask()
public ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { //创建ScheduledTask scheduledTask = new ScheduledTask(task); newTask = true; } //可以看到ScheduledTaskRegistrar的初始化方法中没有对taskScheduler赋值 //所以此时this.taskScheduler = null if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { //进入这里 this.addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return newTask ? scheduledTask : null; }
4、在所有单例的Bean实例化完成后,调用afterSingletonsInstantiated() ,在Spring容器初始化完成后,触发ContextRefreshedEvent 事件,调用onApplicationEvent方法,执行finishRegistration()
private void finishRegistration() { //对应a if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } //对应b if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory)this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList(beans.values()); AnnotationAwareOrderComparator.sort(configurers); Iterator var3 = configurers.iterator(); while(var3.hasNext()) { SchedulingConfigurer configurer = (SchedulingConfigurer)var3.next(); configurer.configureTasks(this.registrar); } } //对应c if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException var9) { this.logger.debug("Could not find unique TaskScheduler bean", var9); try { this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException var8) { if (this.logger.isInfoEnabled()) { this.logger.info("More than one TaskScheduler bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var9.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException var10) { this.logger.debug("Could not find default TaskScheduler bean", var10); try { this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException var6) { this.logger.debug("Could not find unique ScheduledExecutorService bean", var6); try { this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException var5) { if (this.logger.isInfoEnabled()) { this.logger.info("More than one ScheduledExecutorService bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var6.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException var7) { this.logger.debug("Could not find default ScheduledExecutorService bean", var7); this.logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } this.registrar.afterPropertiesSet(); }
这个方法主要实现的内容是:
- 用容器中的SchedulingConfigurer配置ScheduledTaskRegistrar,这里是根据ScheduledTaskRegistrar的引用,调用其set方法设置一些属性
public interface SchedulingConfigurer { void configureTasks(ScheduledTaskRegistrar var1); }
- 如果此时ScheduledTaskRegistrar的scheduler还是空,就从容器中取TaskScheduler(byName和byType),如果没有取到就根据容器中的ScheduledExecutorService实例化TaskScheduler
this.registrar.afterPropertiesSet();
所以在容器中注入TaskScheduler或ScheduledExecutorService的类或者实现SchedulingConfigurer接口都可以配置定时任务的线程池
5、afterPropertiesSet
public void afterPropertiesSet() { this.scheduleTasks(); } protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } //省略了内容 ... ... if (this.cronTasks != null) { var1 = this.cronTasks.iterator(); while(var1.hasNext()) { CronTask task = (CronTask)var1.next(); this.addScheduledTask(this.scheduleCronTask(task)); } } //省略了内容 ... ... }
又进入了熟悉的方法scheduleCronTask,在这里将任务提交给taskScheduler
public ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { //走到了这里 scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { this.addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return newTask ? scheduledTask : null; }
6、如果想看taskScheduler是怎么执行定时任务的,可以看taskScheduler的一个默认实现ConcurrentTaskScheduler,大体是有一个任务队列WorkerQueue,这个队列是按小顶堆排序的,排序规则是任务执行的时间,每次取出任务时,将任务提交给线程池执行,在执行任务的时候,计算下一次执行的时间,提交队列…
到此这篇关于Spring的定时任务@Scheduled源码详解的文章就介绍到这了,更多相关Spring定时任务@Scheduled内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java的BigDecimal在math包中提供的API类场景使用详解
这篇文章主要介绍了Java的BigDecimal在math包中提供的API类场景使用详解,BigDecimal,用来对超过16位有效位的数进行精确的运算,双精度浮点型变量double可以处理16位有效数,在实际应用中,需要对更大或者更小的数进行运算和处理,需要的朋友可以参考下2023-12-12Java复制(拷贝)数组的4种方法:arraycopy()方法、clone() 方法、copyOf()和copyOfRa
这篇文章主要介绍了Java复制(拷贝)数组的4种方法:arraycopy()方法、clone() 方法、copyOf()和copyOfRan,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-01-01SpringCloud全面解析@FeignClient标识接口的过程
这篇文章主要介绍了SpringCloud全面解析@FeignClient标识接口的过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-03-03
最新评论