springboot使用@KafkaListener监听多个kafka配置实现

 更新时间:2024年04月09日 09:36:45   作者:道不平  
当服务中需要监听多个kafka时, 需要配置多个kafka,本文主要介绍了springboot使用@KafkaListener监听多个kafka配置实现,具有一定的参考价值,感兴趣的可以了解一下

背景

使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用

方案

可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:

1. 导入依赖

        <dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

2. yml配置

kafka:
  # 默认消费者配置
  default-consumer:
    # 自动提交已消费offset
    enable-auto-commit: true
    # 自动提交间隔时间
    auto-commit-interval: 1000
    # 消费的超时时间
    poll-timeout: 1500
    # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
    auto.offset.reset: latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    session.timeout.ms: 120000
    # 消费请求超时时间
    request.timeout.ms: 180000
  # 1号kafka配置
  test1:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx
  # 2号kafka配置
  test2:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx

3. 容器工厂配置

package com.zhdx.modules.backstage.config;

import com.google.common.collect.Maps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.Map;

/**
 * kafka监听容器工厂配置
 * <p>
 * 拓展其他消费者配置只需配置指定的属性和bean即可
 */
@EnableKafka
@Configuration
@RefreshScope
public class KafkaListenerContainerFactoryConfig {

    /**
     *  test1 kafka配置
     */
    @Value("${kafka.test1.bootstrap-servers}")
    private String test1KafkaServerUrls;

    @Value("${kafka.test1.consumer.group-id}")
    private String test1GroupId;

    @Value("${kafka.test1.consumer.sasl.mechanism}")
    private String test1SaslMechanism;

    @Value("${kafka.test1.consumer.security.protocol}")
    private String test1SecurityProtocol;

    @Value("${kafka.test1.consumer.sasl.jaas.config}")
    private String test1SaslJaasConfig;
    /**
     *  test2 kafka配置
     */
    @Value("${kafka.test2.bootstrap-servers}")
    private String test2KafkaServerUrls;

    @Value("${kafka.test2.consumer.group-id}")
    private String test2GroupId;

    @Value("${kafka.test2.consumer.sasl.mechanism}")
    private String test2SaslMechanism;

    @Value("${kafka.test2.consumer.security.protocol}")
    private String test2SecurityProtocol;

    @Value("${kafka.test2.consumer.sasl.jaas.config}")
    private String test2SaslJaasConfig;

    /**
     * 默认消费者配置
     */
    @Value("${kafka.default-consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.default-consumer.poll-timeout}")
    private int pollTimeout;

    @Value("${kafka.default-consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.default-consumer.session.timeout.ms}")
    private int sessionTimeoutMs;

    @Value("${kafka.default-consumer.request.timeout.ms}")
    private int requestTimeoutMs;

    /**
     * test1消费者配置
     */
    public Map<String, Object> test1ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism);
        props.put("security.protocol", test1SecurityProtocol);
        // 账号密码
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig);
        return props;
    }
    
    /**
     * test2消费者配置
     */
    public Map<String, Object> test2ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism);
        props.put("security.protocol", test2SecurityProtocol);
        // 账号密码
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig);
        return props;
    }

    /**
     * 默认消费者配置
     */
    private Map<String, Object> getDefaultConsumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        // 自动提交(按周期)已消费offset 批量消费下设置false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        // 消费请求超时时间
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    /**
     * 消费者工厂类
     */
    public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory(
        Map<String, Object> consumerConfigs) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(initConsumerFactory(consumerConfigs));
        // 是否开启批量消费
        factory.setBatchListener(false);
        // 消费的超时时间
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        return factory;
    }

    /**
     * 创建test1 Kafka 监听容器工厂。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
     */
    @Bean(name = "test1KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test1ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
    

    /**
     * 创建test2 Kafka 监听容器工厂。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
     */
    @Bean(name = "test2KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test2ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
}

4. @KafkaListener使用

package com.zhdx.modules.backstage.kafka;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka监听器
 */
@Slf4j
@Component
public class test1KafkaListener {
    @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")
    public void handleHyPm(ConsumerRecord<String, String> record) {
        log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value()));
    }
}

到此这篇关于springboot使用@KafkaListener监听多个kafka配置实现的文章就介绍到这了,更多相关springboot 监听多个kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • Java在高并发场景下实现点赞计数器

    Java在高并发场景下实现点赞计数器

    点赞计数器的本质就是对某个变量在高并发情况下的修改,这篇文章主要为大家介绍了Java实现点赞计数器的示例代码,感兴趣的小伙伴可以了解一下
    2023-06-06
  • Spring Boot 单元测试和集成测试实现详解

    Spring Boot 单元测试和集成测试实现详解

    这篇文章主要介绍了Spring Boot 单元测试和集成测试实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • Javax Validation自定义注解进行身份证号校验

    Javax Validation自定义注解进行身份证号校验

    这篇文章主要为大家详细介绍了如何通过Javax Validation自定义注解进行身份证号校验,文中的示例代码讲解详细,有需要的小伙伴可以参考一下
    2024-10-10
  • maven查看依赖树的方法实现

    maven查看依赖树的方法实现

    Maven依赖树是以当前项目的POM文件为根节点,列出了所有直接或间接依赖的依赖树结构,本文就详细的来介绍一下如何查看,感兴趣的可以了解一下
    2023-08-08
  • 图解Spring容器中实例化bean的四种方式

    图解Spring容器中实例化bean的四种方式

    这篇文章主要介绍了图解Spring容器中实例化bean的四种方式,传统应用程序可以通过new和反射方式进行实例化Bean,而Spring IOC容器则需要根据 Bean 定义里的配置元数据,使用反射机制来创建Bean,需要的朋友可以参考下
    2023-11-11
  • 如何开发基于Netty的HTTP/HTTPS应用程序

    如何开发基于Netty的HTTP/HTTPS应用程序

    HTTP/HTTPS是最常见的协议套件之一,并且随着智能手机的成功,它的应用也日益广泛,因为对于任何公司来说,拥有一个可以被移动设备访问的网站几乎是必须的。下面就来看看如何开发基于Netty的HTTP/HTTPS应用程序
    2021-06-06
  • SpringBoot跨系统单点登陆的实现方法

    SpringBoot跨系统单点登陆的实现方法

    这篇文章主要介绍了SpringBoot跨系统单点登陆的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • Spring深入讲解实现AOP的三种方式

    Spring深入讲解实现AOP的三种方式

    Spring的AOP就是通过动态代理实现的,使用了两个动态代理,分别是JDK的动态代理和CGLIB动态代理,本文重点给大家介绍下Spring Aop的三种实现,感兴趣的朋友一起看看吧
    2022-05-05
  • Java实现高效PDF文件传输技巧

    Java实现高效PDF文件传输技巧

    你是否曾为PDF文件传输的低效率而苦恼?现在,有了这份Java实现高效PDF文件传输技巧指南,你将能够轻松解决这个问题,我们将分享一些实用的技巧和最佳实践,帮助你优化文件传输过程,不要错过这个提高工作效率的机会,快来阅读这份指南吧!
    2024-03-03
  • java连接mysql数据库学习示例

    java连接mysql数据库学习示例

    这篇文章主要介绍了java连接mysql数据库学习示例,需要的朋友可以参考下
    2014-03-03

最新评论