springboot如何开启和关闭kafka消费
更新时间:2024年12月16日 09:44:17 作者:阿拉的梦想
在Kafka消费者中,通过关闭自动消费配置,使用自定义容器工厂,并在消费监听器上设置id,可以手动控制消费的开启和关闭,这是根据个人经验总结的方法,旨在帮助其他开发者
springboot开启和关闭kafka消费
关闭kafka自动消费
配置自定义容器工厂
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Component; @Component @Configuration public class kafkaConfig { @Autowired private ConsumerFactory<String, String> consumerFactory; @Bean("pingKafkaFactory") public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>(); container.setConsumerFactory(consumerFactory); //禁止自动启动 container.setAutoStartup(false); return container; } }
在消费监听器上使用工厂,并设置id
@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")
这样,启动项目后,就不会自动消费了。
手动开启和关闭消费
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Service; /** * Kafka消费监听服务实现类. */ @Service @Slf4j public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService { /** * registry. */ @Autowired private KafkaListenerEndpointRegistry registry; /** * 开启监听. * * @param listenerId 监听ID */ @Override public void startListener(String listenerId) { //判断监听容器是否启动,未启动则将其启动 if (!registry.getListenerContainer(listenerId).isRunning()) { registry.getListenerContainer(listenerId).start(); } //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思 //registry.getListenerContainer(listenerId).stop(); log.info(listenerId + "开启监听成功。"); } /** * 停止监听. * * @param listenerId 监听ID */ @Override public void stopListener(String listenerId) { registry.getListenerContainer(listenerId).stop(); log.info(listenerId + "停止监听成功。"); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
springboot整合swagger3报Unable to infer base&nbs
这篇文章主要介绍了springboot整合swagger3报Unable to infer base url错误问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-05-05
最新评论