springboot如何开启和关闭kafka消费

 更新时间:2024年12月16日 09:44:17   作者:阿拉的梦想  
在Kafka消费者中,通过关闭自动消费配置,使用自定义容器工厂,并在消费监听器上设置id,可以手动控制消费的开启和关闭,这是根据个人经验总结的方法,旨在帮助其他开发者

springboot开启和关闭kafka消费

关闭kafka自动消费

配置自定义容器工厂

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class kafkaConfig {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Bean("pingKafkaFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }
}

在消费监听器上使用工厂,并设置id

@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")

这样,启动项目后,就不会自动消费了。

手动开启和关闭消费

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;

/**
 * Kafka消费监听服务实现类.
 */
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {


    /**
     * registry.
     */
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void startListener(String listenerId) {
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).start();
        }
        //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
        //registry.getListenerContainer(listenerId).stop();
        log.info(listenerId + "开启监听成功。");
    }

    /**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void stopListener(String listenerId) {
        registry.getListenerContainer(listenerId).stop();
        log.info(listenerId + "停止监听成功。");
    }

}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • springboot整合swagger3报Unable to infer base url错误问题

    springboot整合swagger3报Unable to infer base&nbs

    这篇文章主要介绍了springboot整合swagger3报Unable to infer base url错误问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • 浅谈Java方法的重载

    浅谈Java方法的重载

    方法重载是指在一个类中定义多个同名的方法,但要求每个方法具有不同的参数的类型或参数的个数。调用重载方法时,Java编译器能通过检查调用的方法的参数类型和个数选择一个恰当的方法。方法重载通常用于创建完成一组任务相似但参数的类型或参数的个数不同的方法。
    2016-04-04
  • SpringBoot整合MyBatis的代码详解

    SpringBoot整合MyBatis的代码详解

    这篇文章主要介绍了SpringBoot整合MyBatis笔记记录,大家需要注意在整合mybatis之前我们需要相对应的导入相关依赖,首先需要在java的目录和resources下创建mapper文件夹,对SpringBoot整合MyBatis的详细过程感兴趣的朋友一起看看吧
    2022-05-05
  • SpringMVC 域对象共享数据的实现示例

    SpringMVC 域对象共享数据的实现示例

    本文主要介绍了SpringMVC 域对象共享数据的实现示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • Java spring定时任务详解

    Java spring定时任务详解

    这篇文章主要为大家详细介绍了Spring定时任务,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2021-10-10
  • Java多线程优化方法及使用方式

    Java多线程优化方法及使用方式

    这篇文章主要介绍了Java多线程优化方法及使用方式,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2018-02-02
  • SpringBoot整合tkMapper的方法

    SpringBoot整合tkMapper的方法

    项目使用SpringBoot2.0,H2数据库,使用了 Lombok 简化代码,下面是本人使用SpringBoot整合tkMapper的一个小demo,记录下来本人在此处踩得坑
    2022-11-11
  • Java编写的24点纸牌游戏

    Java编写的24点纸牌游戏

    这篇文章主要介绍了Java编写的24点纸牌游戏的相关资料,需要的朋友可以参考下
    2015-03-03
  • springboot序列化和反序列化器配置方法

    springboot序列化和反序列化器配置方法

    这篇文章主要介绍了springboot序列化和反序列化器配置方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • @feignclient名字冲突的解决方案

    @feignclient名字冲突的解决方案

    这篇文章主要介绍了@feignclient名字冲突的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10

最新评论