Spring中的@Scheduled源码解析

 更新时间:2023年09月25日 09:15:17   作者:木棉软糖  
这篇文章主要介绍了Spring中的@Scheduled源码解析,定时任务调度的基础是ScheduledAnnotationBeanPostProcessor类,这是一个实现了BeanPostProcessor接口的后置处理器,需要的朋友可以参考下

@Scheduled源码解析

解析部分

定时任务调度的基础是ScheduledAnnotationBeanPostProcessor类,这是一个实现了BeanPostProcessor接口的后置处理器。

关于BeanPostProcessor,最主要就是看postProcessBeforeInitialization方法和postProcessAfterInitialization方法做了什么逻辑。 postProcessBeforeInitialization方法没有实现逻辑,所以看postProcessAfterInitialization方法的逻辑。

@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		/**
		* 上面省略部分代码,看下面的关键代码
		*/
		if (!this.nonAnnotatedClasses.contains(targetClass) &&
				AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
			/**
			* 这里一长串代码是为了获取被@Scheduled和@Schedules注解的方法
			*/
			Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
						Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
								method, Scheduled.class, Schedules.class);
						return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
					});
			//如果没有被@Scheduled和@Schedules注解的方法,当前bean加入到nonAnnotatedClasses集合中,不进行处理
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(targetClass);
				if (logger.isTraceEnabled()) {
					logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
				}
			}
			else {
				//如果存在被@Scheduled和@Schedules注解的方法,针对每个方法调用processScheduled方法
				annotatedMethods.forEach((method, scheduledMethods) ->
						scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
				if (logger.isTraceEnabled()) {
					logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
							"': " + annotatedMethods);
				}
			}
		}
		return bean;
	}

根据以上代码,总结出ScheduledAnnotationBeanPostProcessor 类做的事情:

(1)获取被@Scheduled和@Schedules注解标记的方法,若没有,将此Bean加入到nonAnnotatedClasses集合中。

(2)存在被@Scheduled和@Schedules注解的方法,针对每个方法调用processScheduled方法

所以,接下来就是分析关键在于processScheduled方法做的逻辑

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
		try {
            //将被注解的方法封装为ScheduledMethodRunnable
			Runnable runnable = createRunnable(bean, method);
			boolean processedSchedule = false;
			String errorMessage =
					"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
			Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
			// 解析initialDelay 的值,字符串和整型值不能同时配置
			long initialDelay = scheduled.initialDelay();
			String initialDelayString = scheduled.initialDelayString();
			if (StringUtils.hasText(initialDelayString)) {
				Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
				if (this.embeddedValueResolver != null) {
					initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
				}
				if (StringUtils.hasLength(initialDelayString)) {
					try {
						initialDelay = parseDelayAsLong(initialDelayString);
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
					}
				}
			}
			// Check cron expression
			String cron = scheduled.cron();
			if (StringUtils.hasText(cron)) {
				String zone = scheduled.zone();
				if (this.embeddedValueResolver != null) {
					cron = this.embeddedValueResolver.resolveStringValue(cron);
					zone = this.embeddedValueResolver.resolveStringValue(zone);
				}
				if (StringUtils.hasLength(cron)) {
					Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
					processedSchedule = true;
					if (!Scheduled.CRON_DISABLED.equals(cron)) {
						TimeZone timeZone;
						if (StringUtils.hasText(zone)) {
							timeZone = StringUtils.parseTimeZoneString(zone);
						}
						else {
							timeZone = TimeZone.getDefault();
						}
						tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
					}
				}
			}
			// At this point we don't need to differentiate between initial delay set or not anymore
			if (initialDelay < 0) {
				initialDelay = 0;
			}
			// Check fixed delay
			long fixedDelay = scheduled.fixedDelay();
			if (fixedDelay >= 0) {
				Assert.isTrue(!processedSchedule, errorMessage);
				processedSchedule = true;
				tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
			}
			String fixedDelayString = scheduled.fixedDelayString();
			if (StringUtils.hasText(fixedDelayString)) {
				if (this.embeddedValueResolver != null) {
					fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
				}
				if (StringUtils.hasLength(fixedDelayString)) {
					Assert.isTrue(!processedSchedule, errorMessage);
					processedSchedule = true;
					try {
						fixedDelay = parseDelayAsLong(fixedDelayString);
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
					}
					tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
				}
			}
			// Check fixed rate
			long fixedRate = scheduled.fixedRate();
			if (fixedRate >= 0) {
				Assert.isTrue(!processedSchedule, errorMessage);
				processedSchedule = true;
				tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
			}
			String fixedRateString = scheduled.fixedRateString();
			if (StringUtils.hasText(fixedRateString)) {
				if (this.embeddedValueResolver != null) {
					fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
				}
				if (StringUtils.hasLength(fixedRateString)) {
					Assert.isTrue(!processedSchedule, errorMessage);
					processedSchedule = true;
					try {
						fixedRate = parseDelayAsLong(fixedRateString);
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
					}
					tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
				}
			}
			// Check whether we had any attribute set
			Assert.isTrue(processedSchedule, errorMessage);
			// Finally register the scheduled tasks
			synchronized (this.scheduledTasks) {
				Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
				regTasks.addAll(tasks);
			}
		}
		catch (IllegalArgumentException ex) {
			throw new IllegalStateException(
					"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
		}
	}

根据以上代码,大致逻辑如下:

(1)解析initialDelay的值

(2)根据@Scheduled注解的属性配置,分别将此bean的被注解//方法封装为CronTask,FixedDelayTask,FixedRateTask

(3)this.registrar 根据封装的任务类型使用对应的调度方法scheduleXXX 若此时taskScheduler局部变量还没有初始化完成,那么将会加入到一个临时的集合存起来,不进行调度,这个taskScheduler可以看做是一个调度任务专用的线程池

(4)调度方法返回的结果加入到tasks集合中

(5)然后按照bean分类,放入scheduledTasks集合(以bean为key的Map集合)

其中@Scheduled注解 的限制如下:

(1)cron表达式不能与initialDelay,fixedDelay,fixedRate一起配置

(2)fixedDelay不能与cron同时设置

(3)fixedRate不能与cron 同时配置

(4)fixedDelay 和fixedRate不能同时配置

至此postProcessAfterInitialization方法执行完成。 粗略总结一下,这个方法就是把@Scheduled注解的方法解析出来,然后转化为ScheduledTask,这大概是代表了一个定时任务的对象,然后再按bean分组存放到一个Map集合中。

执行部分

经过验证,其实上面在执行postProcessAfterInitialization方法,taskScheduler还是为null的,也就是说,各个定时任务实际上还是没办法开始调度执行。 举个例子:

@Nullable
	public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
		ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
		boolean newTask = false;
		if (scheduledTask == null) {
			scheduledTask = new ScheduledTask(task);
			newTask = true;
		}
		if (this.taskScheduler != null) {
			if (task.getInitialDelay() > 0) {
				Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());
				scheduledTask.future =
						this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
			}
			else {
				scheduledTask.future =
						this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
			}
		}
		else {
			addFixedRateTask(task);
			this.unresolvedTasks.put(task, scheduledTask);
		}
		return (newTask ? scheduledTask : null);
	}

此时由于taskScheduler 为null,因此没有执行this.taskScheduler.scheduleAtFixedRate方法,而是调用了addFixedRateTask(task)。(经过测试,就算自定义了taskScheduler,也不会在这时候赋值的) 那上面的this.taskScheduler.scheduleAtFixedRate方法 在什么执行?带着这个疑问,调试打点,最终发现在onApplicationEvent方法中它才会执行调度,此时taskScheduler不为空。

ScheduledAnnotationBeanPostProcessor 类实现了ApplicationListener接口,监听ContextRefreshedEvent 事件。根据以前学习的Spirng加载流程,ContextRefreshedEvent 事件是Spring容器加载完成之后,执行finishRefesh方法时发布的。 在监听方法里面主要执行了finishRegistration()方法

private void finishRegistration() {
        //片段1
		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, SchedulingConfigurer> beans =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
			List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
			AnnotationAwareOrderComparator.sort(configurers);
			for (SchedulingConfigurer configurer : configurers) {
				configurer.configureTasks(this.registrar);
			}
		}
        //省略若干代码....
		this.registrar.afterPropertiesSet();
	}

有一个地方,个人觉得值得了解的: 片段1:自定义调度线程池时实现了SchedulingConfigurer接口 的configureTasks方法,这个方法就是在片段1执行的。

然后之后比较重要的。主要看this.registrar.afterPropertiesSet方法 this.registrar.afterPropertiesSet方法里面调用了scheduleTasks()方法

protected void scheduleTasks() {
     	//片段1
		if (this.taskScheduler == null) {
			this.localExecutor = Executors.newSingleThreadScheduledExecutor();
			this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
		}
		//片段2
		if (this.triggerTasks != null) {
			for (TriggerTask task : this.triggerTasks) {
				addScheduledTask(scheduleTriggerTask(task));
			}
		}
		if (this.cronTasks != null) {
			for (CronTask task : this.cronTasks) {
				addScheduledTask(scheduleCronTask(task));
			}
		}
		if (this.fixedRateTasks != null) {
			for (IntervalTask task : this.fixedRateTasks) {
				addScheduledTask(scheduleFixedRateTask(task));
			}
		}
		if (this.fixedDelayTasks != null) {
			for (IntervalTask task : this.fixedDelayTasks) {
				addScheduledTask(scheduleFixedDelayTask(task));
			}
		}
	}

片段1:this.taskScheduler 如果为null,则使用Executors.newSingleThreadScheduledExecutor()。

如果是自定义线程池,则不会执行,因为此时已经赋值了。

片段2:根据不同的定时任务类型,分别调用不同的调度API

这里的this.cronTasks,this.fixedRateTasks,this.fixedDelayTasks 就是上面执行processScheduled方法时,因为this.taskScheduler 为null而把定时任务临时存放的地方。 因为现在已经有this.taskScheduler ,因此正式将它们加入调度,并放入scheduledTasks 集合中(已经参与调度的不会重复加入)。

小结

(1)从这个源码分析,可以知道通过实现SchedulingConfigurer接口自定义调度线程池的配置

(2)@Scheduled注解 的限制,不能同时配置多种任务类型

到此这篇关于Spring中的@Scheduled源码解析的文章就介绍到这了,更多相关@Scheduled源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java 实现反射 json动态转实体类--fastjson

    java 实现反射 json动态转实体类--fastjson

    这篇文章主要介绍了java 实现反射 json动态转实体类--fastjson,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • 浅谈@RequestParam(required = true)的误区

    浅谈@RequestParam(required = true)的误区

    这篇文章主要介绍了@RequestParam(required = true)的误区,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • 利用微信小程序+JAVA实现微信支付的全过程

    利用微信小程序+JAVA实现微信支付的全过程

    微信支付是一种在线支付解决方案,允许用户通过微信内的支付功能进行付款,下面这篇文章主要给大家介绍了关于利用微信小程序+JAVA实现微信支付的相关资料,需要的朋友可以参考下
    2024-08-08
  • JAVA多线程之JDK中的各种锁详解(看这一篇就够了)

    JAVA多线程之JDK中的各种锁详解(看这一篇就够了)

    多线程编程可以说是在大部分平台和应用上都需要实现的一个基本需求,下面这篇文章主要给大家介绍了关于JAVA多线程之JDK中各种锁的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-07-07
  • 初次体验MyBatis的注意事项

    初次体验MyBatis的注意事项

    今天给大家带来的是关于MyBatis的相关知识,文章围绕着MyBatis的用法展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • servlet实现文件上传、预览、下载、删除功能

    servlet实现文件上传、预览、下载、删除功能

    这篇文章主要为大家详细介绍了servlet实现文件上传、预览、下载、删除功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-09-09
  • RabbitMQ通过延迟插件实现延迟消息

    RabbitMQ通过延迟插件实现延迟消息

    在RabbitMQ中,使用延迟消息插件比死信队列更优化的实现消息的延迟发送,本文介绍了延迟插件的下载、安装、以及如何通过设置消息头x-delay实现消息的延迟投递,特别指出,使用延迟消息可能会损耗性能,适合短时间的延迟场景
    2024-10-10
  • Spring Boot项目Jar包加密实战教程

    Spring Boot项目Jar包加密实战教程

    本文详细介绍了如何在Spring Boot项目中实现Jar包加密,我们首先了解了Jar包加密的基本概念和作用,然后学习了如何使用Spring Boot的Jar工具和第三方库来实现Jar包的加密和解密,感兴趣的朋友一起看看吧
    2024-02-02
  • Spring security实现对账户进行加密

    Spring security实现对账户进行加密

    这篇文章主要介绍了Spring security实现对账户进行加密,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • 使用Spring底层组件实现Aware接口

    使用Spring底层组件实现Aware接口

    这篇文章主要介绍了使用Spring底层组件实现Aware接口,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-07-07

最新评论