spring kafka @KafkaListener详解与使用过程

 更新时间:2023年02月20日 09:20:06   作者:石臻臻的杂货铺  
这篇文章主要介绍了spring-kafka @KafkaListener详解与使用,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

说明

  • 从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;
  • 可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。

比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3

@KafkaListener详解

id 监听器的id

①. 消费者线程命名规则

填写:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的监听器ID不能重复

否则会报错

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.会覆盖消费者工厂的消费组GroupId

假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代码中最终这个消费者的消费组GroupId是 “groupId-test”

该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id

如何获取消费者 group.id

在监听器中调用KafkaUtils.getConsumerGroupId()可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一) topicPartitions 显式分区分配

可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现KafkaListenerErrorHandler; 然后做一些异常处理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}

调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

例如我写一个 批量消费的工厂类

    /**
     * 监听器工厂 批量消费
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字

concurrency并发数

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看Java concurrency之集合

    /**
     * 监听器工厂 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

properties 配置其他属性

kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 获取所有注册的监听器
        registry.getAllListenerContainers();

设置入参验证器

当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

spring-kafka官方文档

扩展:Spring for Apache Kafka @KafkaListener使用及注意事项

官方文档:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo类:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}</code>

配置类及注解:
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

到此这篇关于spring-kafka @KafkaListener详解与使用的文章就介绍到这了,更多相关spring-kafka使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot @FixMethodOrder 如何调整单元测试顺序

    SpringBoot @FixMethodOrder 如何调整单元测试顺序

    这篇文章主要介绍了SpringBoot @FixMethodOrder 调整单元测试顺序方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • java web实现自动登录功能

    java web实现自动登录功能

    这篇文章主要为大家详细介绍了java web实现自动登录功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-10-10
  • SpringBoot项目中连接SQL Server的三种方式

    SpringBoot项目中连接SQL Server的三种方式

    连接SQL Server是许多Spring Boot项目中常见的需求之一,本文主要介绍了SpringBoot项目中连接SQL Server的三种方式,具有一定的参考价值 ,感兴趣的可以了解一下
    2023-09-09
  • Java基于Javafaker生成测试数据

    Java基于Javafaker生成测试数据

    这篇文章主要介绍了Java基于Javafaker生成测试数据的方法,帮助大家更好的理解和使用Java,感兴趣的朋友可以了解下
    2020-12-12
  • Hibernate核心类和接口的详细介绍

    Hibernate核心类和接口的详细介绍

    今天小编就为大家分享一篇关于Hibernate核心类和接口的详细介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • java long转String +Codeforces110A案例

    java long转String +Codeforces110A案例

    这篇文章主要介绍了java long转String +Codeforces110A案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • XFire构建web service客户端的五种方式

    XFire构建web service客户端的五种方式

    本篇文章主要介绍了XFire构建web service客户端的五种方式。具有很好的参考价值,下面跟着小编一起来看下吧
    2017-01-01
  • Java MyBatis实战之QueryWrapper中and和or拼接技巧大全

    Java MyBatis实战之QueryWrapper中and和or拼接技巧大全

    在Java中QueryWrapper是MyBatis-Plus框架中的一个查询构造器,它提供了丰富的查询方法,其中包括and和or方法,可以用于构建复杂的查询条件,这篇文章主要给大家介绍了关于Java MyBatis实战之QueryWrapper中and和or拼接技巧的相关资料,需要的朋友可以参考下
    2024-07-07
  • Lombok在idea中的使用教程

    Lombok在idea中的使用教程

    Lombok是一个可以通过简单的注解形式,来帮助我们简化消除一些必须有但显得很臃肿(如果getter、setter方法)的Java代码的工具,通过使用对应的注解,可以在编译源码的时候生成对应的方法,这篇文章主要介绍了Lombok在idea中的使用,需要的朋友可以参考下
    2023-03-03
  • Java应用服务器之tomcat会话复制集群配置的示例详解

    Java应用服务器之tomcat会话复制集群配置的示例详解

    这篇文章主要介绍了Java应用服务器之tomcat会话复制集群配置的相关知识,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07

最新评论