springboot项目配置多个kafka的示例代码

 更新时间:2023年04月24日 14:44:41   作者:CccccDi  
这篇文章主要介绍了springboot项目配置多个kafka,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1.spring-kafka

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.3.5.RELEASE</version>
</dependency>

2.配置文件相关信息

kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
        
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false

3.kafka配置类

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${kafka.consumer.enable.auto.commit}")
    private String autoCommit;

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;


    @Value("${kafka.consumer.group.id.pic}")
    private String groupIdPic;

    @Value("${kafka.consumer.concurrency.pic}")
    private int concurrencyPic;

    @Value("${kafka.consumer.enable.auto.commit.pic}")
    private String autoCommitPic;

    @Value("${kafka.bootstrap-servers.pic}")
    private String bootstrapServerPic;


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        String bootstrapServers = bootstrapServer;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }




    @Bean
    public ConsumerFactory<String, String> consumerFactoryPic() {
        String bootstrapServers = bootstrapServerPic;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryPic());
        factory.setConcurrency(concurrencyPic);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

4.消费主题消息

@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            String jsonString = message.value();
            if (StringUtils.isNoneBlank(jsonString)) {
                log.info("消费:{}",jsonString);
                //TODO ....
            }
        } catch (Exception e) {
            log.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            if (StringUtils.isNoneBlank(message.value())) {
                  //TODO ....
            }
        } catch (Exception e) {
            logger.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

到此这篇关于springboot项目配置多个kafka的文章就介绍到这了,更多相关springboot配置多个kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Netty网络编程零基础入门

    Netty网络编程零基础入门

    Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端,如果你还不了解它的使用,就赶快继续往下看吧
    2022-08-08
  • MybatisPlus3.3.0没有MybatisPlusInterceptor类问题的解决方法

    MybatisPlus3.3.0没有MybatisPlusInterceptor类问题的解决方法

    项目使用的是mybatis-plus-extension3.3.0依赖,然后在我使用分页插件的时候,发现无法导入MybatisPlusInterceptor类所以本文给大家介绍了MybatisPlus3.3.0没有MybatisPlusInterceptor类问题的解决方法,需要的朋友可以参考下
    2023-12-12
  • Maven创建项目过慢的4种解决办法

    Maven创建项目过慢的4种解决办法

    最近经常会遇到一个困扰,那就是用idea创建maven项目时,速度很慢,本文就来介绍一下Maven创建项目过慢的4种解决办法,感兴趣的可以了解一下
    2021-12-12
  • springMVC实现前台带进度条文件上传的示例代码

    springMVC实现前台带进度条文件上传的示例代码

    本篇文章主要介绍了springMVC实现前台带进度条文件上传的示例代码,具有一定的参考价值,有兴趣的可以了解一下。
    2017-01-01
  • 通过面试题解析 Java 类加载机制

    通过面试题解析 Java 类加载机制

    类加载是 Java 语言的一个创新,也是 Java 语言流行的重要原因之一。它使得 Java 类可以被动态加载到 Java 虚拟机中并执行。下面小编和大家来一起学习一下吧
    2019-05-05
  • 配置IDEA中java项目配置swagger全过程

    配置IDEA中java项目配置swagger全过程

    这篇文章主要介绍了配置IDEA中java项目配置swagger全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-05-05
  • javax.validation.constraints注解使用

    javax.validation.constraints注解使用

    这篇文章主要介绍了javax.validation.constraints注解使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • Kotlin 和 Java 混合开发入门教程

    Kotlin 和 Java 混合开发入门教程

    这篇文章主要介绍了入门 Kotlin 和 Java 混合开发,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09
  • JAVA异常和自定义异常处理方式

    JAVA异常和自定义异常处理方式

    这篇文章主要介绍了JAVA异常和自定义异常处理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • spring boot实战教程之shiro session过期时间详解

    spring boot实战教程之shiro session过期时间详解

    这篇文章主要给大家介绍了关于spring boot实战教程之shiro session过期时间的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起看看吧。
    2017-10-10

最新评论