聊聊Spring Boot 如何集成多个 Kafka
更新时间:2023年10月20日 09:33:47 作者://承续缘_纪录片
这篇文章主要介绍了Spring Boot 集成多个 Kafka的相关资料,包括配置文件,生成者和消费者配置过程,本文通过实例代码给大家介绍的非常详细,需要的朋友参考下吧
一、配置文件
application.yml spring: kafka: one: bootstrap-servers: IP:PORT consumer: group-id: YOUR_GROUP_ID enable-auto-commit: true two: bootstrap-servers: IP:PORT consumer: group-id: YOUR_GROUP_ID enable-auto-commit: true
二、生产者、消费者配置
2.1 第一个 Kafka
@EnableKafka @Configuration public class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.consumer.group-id}") private String groupId; @Value("${spring.kafka.one.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplate<String, String> kafkaOneTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); // 不能写成 1 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
2.2 第二个 Kafka
@Configuration public class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Value("${spring.kafka.two.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplate<String, String> kafkaTwoTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
三、生产者
@Controller public class TestController { @Autowired private KafkaTemplate kafkaOneTemplate; @Autowired private KafkaTemplate kafkaTwoTemplate; @RequestMapping("/send") @ResponseBody public String send() { final String TOPIC = "TOPIC_1"; kafkaOneTemplate.send(TOPIC, "kafka one"); kafkaTwoTemplate.send(TOPIC, "kafka two"); return "success"; } }
四、消费者
@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); final String TOPIC = "TOPIC_1"; // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同 @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory") public void listenerOne(ConsumerRecord<?, ?> record) { LOGGER.info(" kafka one 接收到消息:{}", record.value()); } @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory") public void listenerTwo(ConsumerRecord<?, ?> record) { LOGGER.info(" kafka two 接收到消息:{}", record.value()); } }
运行结果
c.k.s.consumer.KafkaConsumer : kafka one 接收到消息:kafka one
c.k.s.consumer.KafkaConsumer : kafka two 接收到消息:kafka two
到此这篇关于聊聊Spring Boot 如何集成多个 Kafka的文章就介绍到这了,更多相关Spring Boot集成多个 Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
java的各种集合为什么不安全(List、Set、Map)以及代替方案
这篇文章主要介绍了java的各种集合为什么不安全(List、Set、Map)以及代替方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-10-10BeanUtils.copyProperties复制属性失败的原因及解决方案
这篇文章主要介绍了BeanUtils.copyProperties复制属性失败的原因及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-08-08Spring Boot与Spark、Cassandra系统集成开发示例
本文演示以Spark作为分析引擎,Cassandra作为数据存储,而使用Spring Boot来开发驱动程序的示例。对spring boot 与spark cassandra集成开发示例代码感兴趣的朋友跟着脚本之家小编一起学习吧2018-02-02
最新评论