Springboot集成Kafka进行批量消费及踩坑点

 更新时间:2021年12月14日 11:44:53   作者:秀强  
本文主要介绍了Springboot集成Kafka进行批量消费及踩坑点,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

引入依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.3.11.RELEASE</version>
            </dependency>

因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。

在这里插入图片描述

注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

创建配置类

    /**
     * kafka 配置类
     */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {

        private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);

        @Value("${kafka.bootstrap.servers}")
        private String kafkaBootstrapServers;

        @Value("${kafka.group.id}")
        private String kafkaGroupId;

        @Value("${kafka.topic}")
        private String kafkaTopic;

        public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";

        public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";


        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            // 设置并发量,小于或者等于 Topic 的分区数
            factory.setConcurrency(5);
            // 设置为批量监听
            factory.setBatchListener(Boolean.TRUE);
            factory.getContainerProperties().setPollTimeout(30000);
            return factory;
        }

        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            //设置接入点,请通过控制台获取对应Topic的接入点。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
            //设置SSL根证书的路径,请记得将XXX修改为自己的路径。
            //与SASL路径类似,该文件也不能被打包到jar中。
            System.setProperty("java.security.auth.login.config", CONFIG_PATH);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);

            //根证书存储的密码,保持不变。
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入协议,目前支持使用SASL_SSL协议接入。
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鉴权方式,保持不变。
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            // 自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
            //两次Poll之间的最大允许间隔。
            //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
            //设置单次拉取的量,走公网访问时,该参数会有较大影响。
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
            props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
            //每次Poll的最大数量。
            //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
            //消息的反序列化方式。
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //当前消费实例所属的消费组,请在控制台申请之后填写。
            //属于同一个组的消费实例,会负载消费消息。
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
            //Hostname校验改成空。
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
            return props;
        }
    }

注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。

Kafka 消费者

    /**
     * kafka 消息消费类
     */
    @Component
    public class KafkaMessageListener {

        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

        @KafkaListener(topics = {"${kafka.topic}"})
        public void listen(List<ConsumerRecord<String, String>> recordList) {
            for (ConsumerRecord<String,String> record : recordList) {
                // 打印消息的分区以及偏移量
                LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
                String value = record.value();
                System.out.println("value = " + value);
                // 处理业务逻辑 ...
            }
        }
    }

因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。

到此这篇关于Springboot集成Kafka进行批量消费及踩坑点的文章就介绍到这了,更多相关Springboot Kafka批量消费内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring监听器及定时任务实现方法详解

    Spring监听器及定时任务实现方法详解

    这篇文章主要介绍了Spring监听器及定时任务实现方法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Java程序中使用JavaMail发送带图片和附件的邮件

    Java程序中使用JavaMail发送带图片和附件的邮件

    这篇文章主要介绍了Java程序中使用JavaMail发送带图片和附件的邮件,JavaMail是专门用来处理邮件的Java API,需要的朋友可以参考下
    2015-11-11
  • Java实现Word/Pdf/TXT转html的示例

    Java实现Word/Pdf/TXT转html的示例

    这篇文章主要介绍了Java实现Word/Pdf/TXT转html的示例,帮助大家方便的进行文件格式转换,完成需求,感兴趣的朋友可以了解下
    2020-11-11
  • SpringBoot如何通过webjars管理静态资源文件夹

    SpringBoot如何通过webjars管理静态资源文件夹

    这篇文章主要介绍了SpringBoot如何通过webjars管理静态资源文件夹,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10
  • Java数据结构之稀疏矩阵定义与用法示例

    Java数据结构之稀疏矩阵定义与用法示例

    这篇文章主要介绍了Java数据结构之稀疏矩阵定义与用法,结合实例形式分析了java稀疏矩阵的定义、运算、转换等相关操作技巧,需要的朋友可以参考下
    2018-01-01
  • hibernate 三种状态的转换

    hibernate 三种状态的转换

    本文主要介绍了hibernate三种状态的转换。具有很好的参考价值。下面跟着小编一起来看下吧
    2017-03-03
  • 详解spring security安全防护

    详解spring security安全防护

    这篇文章主要介绍了详解spring security安全防护,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • Java过滤器模式原理及用法实例

    Java过滤器模式原理及用法实例

    这篇文章主要介绍了Java过滤器模式原理及用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • java同步开篇入门简单介绍

    java同步开篇入门简单介绍

    java中的CountDownLatch、Semaphore、CyclicBarrier这些类又不属于锁,它们和锁又有很多共同点,都是为了协同多线程的执行,都是一种同步器,所以这里就借用同步来取名字了,也就是“同步系列”的来源。下面小编来简单介绍下
    2019-05-05
  • Intellij IDEA插件开发入门详解

    Intellij IDEA插件开发入门详解

    这篇文章主要介绍了Intellij IDEA插件开发入门详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-02-02

最新评论