SpringBoot实现kafka多源配置的示例代码
更新时间:2024年06月06日 10:42:53 作者:it噩梦
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置,本文介绍了SpringBoot实现kafka多源配置,需要的朋友可以参考下
背景
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。
核心配置
自动化配置类
import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister; import com.example.kafka.autoconfig.kafkaConsumerConfig; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; @EnableKafka @Configuration( proxyBeanMethods = false ) @ConditionalOnWebApplication @EnableConfigurationProperties({kafkaConsumerConfig.class}) @Import({CustomKafkaDataSourceRegister.class}) public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor { public MyKafkaAutoConfiguration() { } public void setBeanFactory(BeanFactory beanFactory) throws BeansException { beanFactory.getBean(CustomKafkaDataSourceRegister.class); } }
注册生产者、消费者核心bean到spring
public void afterPropertiesSet() { Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories(); if (factories != null && !factories.isEmpty()) { factories.forEach((factoryName, consumerConfig) -> { KafkaProperties.Listener listener = consumerConfig.getListener(); Integer concurrency = consumerConfig.getConcurrency(); // 创建监听容器工厂 ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency); // 注册到容器 if (!beanFactory.containsBean(factoryName)) { beanFactory.registerSingleton(factoryName, containerFactory); } }); } Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates(); if (!ObjectUtils.isEmpty(templates)) { templates.forEach((templateName, producerConfig) -> { //registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues); //注册spring bean的两种方式 registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties())); }); } }
配置spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.example.kafka.MyKafkaAutoConfiguration
yml配置
spring: kafka: multiple: consumer: factories: test-factory: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer bootstrap-servers: 192.168.56.112:9092 group-id: group_a concurrency: 25 fetch-min-size: 1048576 fetch-max-wait: 3000 listener: type: batch properties: spring-json-trusted-packages: '*' key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: latest producer: templates: test-template: bootstrap-servers: 192.168.56.112:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
使用
到此这篇关于SpringBoot实现kafka多源配置的示例代码的文章就介绍到这了,更多相关SpringBoot kafka多源配置内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
浅析javax.servlet.Servlet,ServletContext接口
本篇文章是对javax.servlet.Servlet,ServletContext接口进行了纤细的分析介绍,需要的朋友参考下2013-07-07Spring注解@Configuration和@Component区别详解
@Component和@Configuration都可以作为配置类,之前一直都没觉得这两个用起来有什么差别,可能有时程序跑的和自己想的有所区别也没注意到,下面这篇文章主要给大家介绍了关于Spring注解@Configuration和@Component区别的相关资料,需要的朋友可以参考下2023-04-04
最新评论