springboot配置kafka批量消费,并发消费方式
更新时间:2024年12月30日 16:50:22 作者:梵法利亚
文章介绍了如何在Spring Boot中配置Kafka进行批量消费,并发消费,需要注意的是,并发量必须小于等于分区数,否则会导致线程空闲,文章还总结了创建Kafka分区的命令,并鼓励读者分享经验
springboot配置kafka批量消费,并发消费
@KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})}, containerFactory = "batchFactory") public void listener0(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //业务处理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})}, containerFactory = "batchFactory") public void listener1(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //业务处理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})}, containerFactory = "batchFactory") public void listener2(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //业务处理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})}, containerFactory = "batchFactory") public void listener3(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //业务处理 } catch (Exception e) { log.error(e.toString()); } }
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Slf4j @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServersConfig; public Map<String,Object> consumerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3"); log.info("bootstrapServersConfig:自定义配置="+ bootstrapServersConfig); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) { //Map<String, Object> consumerProperties = properties.buildConsumerProperties(); ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //并发数量 factory.setConcurrency(3); //开启批量监听,消费 factory.setBatchListener(true); //factory.set return factory; } }
按照以上配置内容即可,可以达到kafka批量消费的能力。
但是,要特别需要注意的一个点是:
- 并发量根据实际的分区数量决定
- 必须小于等于分区数
- 否则会有线程一直处于空闲状态
下面是创建4个分区的命令写法
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic personnel_card_real_time_recordinfo --partitions 4 --replication-factor 1
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
IntelliJ idea 如何生成动态的JSON字符串(步骤详解)
这篇文章主要介绍了IntelliJ idea 如何生成动态的JSON字符串,本文分步骤给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-08-08
最新评论