Springboot系列之kafka操作使用详解

 更新时间:2023年08月01日 10:05:13   作者:wotrd  
这篇文章主要为大家介绍了Springboot系列之kafka操作使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

kafka简介

ApacheKafka®是一个分布式流媒体平台。有三个关键功能:

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 以容错的持久方式存储记录流。
  • 记录发生时处理流。

Kafka通常用于两大类应用:

  • 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序

kafka概念

(1)什么是流处理?

所谓流处理,我的理解是流水线处理。例如,电子厂每个人负责一个功能,来了就处
理,不来就等着。

(2)partition和replication和broker有关吗?

partition和replication是分区和备份的概念。即使是单机一个broker也一样支持。

(3)consumer如何设置和存储partition的offset偏移量,有哪几种消费模式,怎么确定消息是否被消费,将偏移量移到前面会立即消费到最后吗?

使用KafkaConsumer设置partition和offset。有自动提交和手动ack模式提交偏移量两种消费方式。将偏移量移到前面需要设置成为消费状态会立即被消费(设置新消费组)。

(4)AckMode模式有哪几种?

RECORD:处理记录后,侦听器返回时提交偏移量

BATCH:在处理poll()返回的所有记录时提交偏移量

TIME:只要已超过自上次提交以来的ackTime,就会在处理poll()返回的所有记录时提交偏移量

COUNT:只要自上次提交以来已收到ackCount记录,就会在处理poll()返回的所有记录时提交偏移量

COUNT_TIME:与TIME和COUNT类似,但如果任一条件为真,则执行提交

MANUAL:消息监听器负责确认()确认。 之后,应用与BATCH相同的语义    

MANUAL_IMMEDIATE:当侦听器调用Acknowledgment.acknowledge()方法时,立即提交偏移量

Springboot使用kafka

(1)注入NewTopic自动在broker中添加topic

@Bean
public NewTopic topic() {
    return new NewTopic("topic1", 2, (short) 1);
}

(2)使用KafkaTemplate发送消息时,topic自动创建,自动创建的partition是0,长度为1

(3)使用KafkaTemplate发送消息

@RequestMapping("sendMsgWithTopic")
public String sendMsgWithTopic(@RequestParam String topic, @RequestParam int partition, @RequestParam String key,
                               @RequestParam String value) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, value);
    return "success";
}

(4)异步发送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);
    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(data);
            }
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(data, record, ex);
           }
    });
}

(5)同步发送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);
    try {
            template.send(record).get(10, TimeUnit.SECONDS);
            handleSuccess(data);
    }catch (ExecutionException e) {
            handleFailure(data, record, e.getCause());
    }catch (TimeoutException | InterruptedException e) {
            handleFailure(data, record, e);
    }
}

(6)事务

(1)Spring事务支持一起使用(@Transactional,TransactionTemplate等)
(2)使用template执行事务
    boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
    });

(7)消费者

(1)简单使用
 @KafkaListener(id = "myListener", topics = "myTopic",
    autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
 public void listen(String data) {
    ...
 }
(2)配置多个topic和partition,TopicPartition中partitions和PartitionOffset不能同时使用
 @KafkaListener(id = "thing2", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
 public void listen(ConsumerRecord<?, ?> record) {
    ...
 }
(3)使用ack手动确认模式
 @KafkaListener(id = "cat", topics = "myTopic",
      containerFactory = "kafkaManualAckListenerContainerFactory")
 public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
 }
 (4)获取消息的header信息
 @KafkaListener(id = "qux", topicPattern = "myTopic1")
 public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
    ...
 }
(5)批处理
 @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
 public void listen(List<String> list,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
 }
(6)使用@Valid校验数据
 @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
   containerFactory = "kafkaJsonListenerContainerFactory")
 public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
 }
 @Bean
 public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
 }
(7)topic根据参数类型映射不同方法
 @KafkaListener(id = "multi", topics = "myTopic")
 static class MultiListenerBean {
    @KafkaHandler
    public void listen(String cat) {
        ...
    }
    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }
    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }
 }

Springboot使用kafka踩坑

(1)需要修改server.properties的listener主机地址不然Java获取不到消息。

(2)不同服务配置相同groupId只有一个监听者可以收到消息

kafka图形化工具 kafka tool

下载地址 http://www.kafkatool.com/down...

以上就是Springboot系列之kafka操作使用详解的详细内容,更多关于Springboot kafka操作的资料请关注脚本之家其它相关文章!

相关文章

  • 使用RabbitMQ实现延时消息自动取消的案例详解

    使用RabbitMQ实现延时消息自动取消的案例详解

    这篇文章主要介绍了使用RabbitMQ实现延时消息自动取消的简单案例,案例代码包括导包的过程和相关配置文件,本文结合代码给大家讲解的非常详细,需要的朋友可以参考下
    2024-03-03
  • Java IO之序列化与反序列化详解

    Java IO之序列化与反序列化详解

    这篇文章主要为大家介绍了Java IO之序列化与反序列化,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-01-01
  • Java中的字符型文件流FileReader和FileWriter详细解读

    Java中的字符型文件流FileReader和FileWriter详细解读

    这篇文章主要介绍了Java中的字符型文件流FileReader和FileWriter详细解读,与字节型文件流不同,字节型文件流读取和写入的都是一个又一个的字节,而字符型文件流操作的单位是一个又一个的字符,字符型流认为一个字母是一个字符,而一个汉字也是一个字符,需要的朋友可以参考下
    2023-10-10
  • 一文详解Java中流程控制语句

    一文详解Java中流程控制语句

    在一个程序执行的过程中,各条语句的执行顺序对程序的结果是有直接影响的。也就是说,程序的流程对运行结果有直接的影响。所以,我们必须清楚每条语句的执行流程。本文就来通过一些示例带大家详细了解一下
    2022-10-10
  • Java内存泄漏问题处理方法经验总结

    Java内存泄漏问题处理方法经验总结

    今天小编就为大家分享一篇关于Java内存泄漏问题处理方法经验总结,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • SpringBoot集成Nacos实现注册中心与配置中心流程详解

    SpringBoot集成Nacos实现注册中心与配置中心流程详解

    这篇文章主要介绍了SpringBoot集成Nacos实现注册中心与配置中心流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2023-02-02
  • java应用占用内存过高排查的解决方案

    java应用占用内存过高排查的解决方案

    这篇文章主要介绍了java应用占用内存过高排查的解决方案,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • Java 实现滑动时间窗口限流算法的代码

    Java 实现滑动时间窗口限流算法的代码

    这篇文章主要介绍了Java 实现滑动时间窗口限流算法的代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • java高级用法之注解和反射讲义

    java高级用法之注解和反射讲义

    这篇文章主要给大家介绍了关于java高级用法之注解和反射讲义的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • Spring框架依赖注入方法示例

    Spring框架依赖注入方法示例

    这篇文章主要介绍了Spring框架依赖注入方法示例,分享了三种方法示例,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11

最新评论