springboot配置kafka批量消费,并发消费方式

 更新时间:2024年12月30日 16:50:22   作者:梵法利亚  
文章介绍了如何在Spring Boot中配置Kafka进行批量消费,并发消费,需要注意的是,并发量必须小于等于分区数,否则会导致线程空闲,文章还总结了创建Kafka分区的命令,并鼓励读者分享经验

springboot配置kafka批量消费,并发消费

 @KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})},
            containerFactory = "batchFactory")
    public void listener0(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
          //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})},
            containerFactory = "batchFactory")
    public void listener1(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})},
            containerFactory = "batchFactory")
    public void listener2(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})},
            containerFactory = "batchFactory")
    public void listener3(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServersConfig;





    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3");
        log.info("bootstrapServersConfig:自定义配置="+ bootstrapServersConfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) {
        //Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //并发数量
        factory.setConcurrency(3);
        //开启批量监听,消费
        factory.setBatchListener(true);
        //factory.set
        return factory;
    }

}

按照以上配置内容即可,可以达到kafka批量消费的能力。

但是,要特别需要注意的一个点是:

  • 并发量根据实际的分区数量决定
  • 必须小于等于分区数
  • 否则会有线程一直处于空闲状态

下面是创建4个分区的命令写法

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic   personnel_card_real_time_recordinfo    --partitions 4 --replication-factor 1

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java如何在不存在文件夹的目录下创建文件

    Java如何在不存在文件夹的目录下创建文件

    这篇文章主要介绍了Java如何在不存在文件夹的目录下创建文件,代码简单易懂,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-08-08
  • Spring项目如何实现带请求链路id的日志记录

    Spring项目如何实现带请求链路id的日志记录

    我们在做项目的时候通常需要通过请求日志来排查定位线上问题,如果日志比较多而我们又需要查找整个请求的全部日志的时候会比较困难,下面我们就来看看如何用java aop实现请求id的日志记录吧
    2024-12-12
  • Java面向对象之继承的概念详解

    Java面向对象之继承的概念详解

    这篇文章主要介绍了Java面向对象之继承的概念详解,Java是一种面向对象的编程语言,继承是实现面向对象编程的基础之一。通过继承,我们可以使代码更具可读性、可重用性和可维护性,从而提高程序的效率和可靠性,需要的朋友可以参考下
    2023-04-04
  • SpringBoot项目整合Redis教程详解

    SpringBoot项目整合Redis教程详解

    这篇文章主要介绍了SpringBoot项目整合Redis教程详解,Redis 是完全开源的,遵守 BSD 协议,是一个高性能的 key-value 数据库。感兴趣的小伙伴可以参考阅读本文
    2023-03-03
  • Java线程取消的三种方式

    Java线程取消的三种方式

    文章介绍了 Java 线程取消的 3 种方式,不推荐使用 stop 方法和 volatile 设标记位停止线程,线程中断机制是协作式的,一个线程请求中断,另一线程响应,线程可检查自身中断状态或捕获 InterruptedException 来合适处理以响应中断,确保安全有序停止,避免资源泄露等问题
    2024-12-12
  • SpringBoot解析自定义yml文件的流程步骤

    SpringBoot解析自定义yml文件的流程步骤

    这篇文章主要介绍了SpringBoot解析自定义yml文件的流程步骤,文章通过代码示例和图文结合的方式给大家介绍的非常详细, 对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-06-06
  • Java字符串驼峰与下换线格式转换如何实现

    Java字符串驼峰与下换线格式转换如何实现

    这篇文章主要介绍了Java字符串驼峰与下换线格式转换如何实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • IntelliJ idea 如何生成动态的JSON字符串(步骤详解)

    IntelliJ idea 如何生成动态的JSON字符串(步骤详解)

    这篇文章主要介绍了IntelliJ idea 如何生成动态的JSON字符串,本文分步骤给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Java随机数的5种获得方法(非常详细!)

    Java随机数的5种获得方法(非常详细!)

    这篇文章主要给大家介绍了关于Java随机数的5种获得方法,在实际开发中产生随机数的使用是很普遍的,所以在程序中进行产生随机数操作很重要,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-10-10
  • SpringBoot根据注解动态执行类中的方法实现

    SpringBoot根据注解动态执行类中的方法实现

    本文主要介绍了SpringBoot根据注解动态执行类中的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-08-08

最新评论