Spring Boot整合Kafka教程详解
正文
本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka。Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量。
在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0。
步骤一:添加依赖项
在 pom.xml 中添加以下依赖项:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version> </dependency>
步骤二:配置 Kafka
在 application.yml
文件中添加以下配置:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
这里我们配置了 Kafka 的服务地址为 localhost:9092
,配置了一个消费者组 ID 为 my-group
,并设置了一个最早的偏移量来读取消息。在生产者方面,我们配置了消息序列化程序为 StringSerializer
。
步骤三:创建一个生产者
现在,我们将创建一个 Kafka 生产者,用于发送消息到 Kafka 服务器。在这里,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。
首先,我们将创建一个 KafkaProducerConfig
类,用于配置 Kafka 生产者:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上面的代码中,我们使用 @Configuration
注解将 KafkaProducerConfig
类声明为配置类。然后,我们使用 @Value
注解注入配置文件中的 bootstrap-servers
属性。
接下来,我们创建了一个 producerConfigs
方法,用于设置 Kafka 生产者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG
、KEY_SERIALIZER_CLASS_CONFIG
和 VALUE_SERIALIZER_CLASS_CONFIG
三个属性。
然后,我们创建了一个 producerFactory
方法,用于创建 Kafka 生产者工厂。在这里,我们使用了 DefaultKafkaProducerFactory
类,并传递了我们的配置。
最后,我们创建了一个 kafkaTemplate
方法,用于创建 KafkaTemplate
实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate
实例。
接下来,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。在这里,我们将使用 @RestController
注解创建一个 RESTful 控制器:
@RestController public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
在上面的代码中,我们使用 @Autowired
注解将 KafkaTemplate
实例注入到 KafkaController
类中。然后,我们创建了一个 sendMessage
方法,用于发送消息到 Kafka。
在这里,我们使用 kafkaTemplate.send
方法发送消息到 my-topic
主题。send 方法返回一个 ListenableFuture
对象,用于异步处理结果。
步骤四:创建一个消费者
现在,我们将创建一个 Kafka 消费者,用于从 Kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从 my-topic
主题读取消息。
首先,我们将创建一个 KafkaConsumerConfig
类,用于配置 Kafka 消费者:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
在上面的代码中,我们使用 @Configuration
注解将 KafkaConsumerConfig
类声明为配置类,并使用 @EnableKafka
注解启用 Kafka。
然后,我们使用 @Value
注解注入配置文件中的 bootstrap-servers
和 consumer.group-id
属性。
接下来,我们创建了一个 consumerConfigs
方法,用于设置 Kafka 消费者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG
、AUTO_OFFSET_RESET_CONFIG
、KEY_DESERIALIZER_CLASS_CONFIG
和 VALUE_DESERIALIZER_CLASS_CONFIG
五个属性。
然后,我们创建了一个 consumerFactory
方法,用于创建 Kafka 消费者工厂。在这里,我们使用了 DefaultKafkaConsumerFactory
类,并传递了我们的配置。
最后,我们创建了一个 kafkaListenerContainerFactory
方法,用于创建一个 ConcurrentKafkaListenerContainerFactory
实例。在这里,我们将消费者工厂注入到 kafkaListenerContainerFactory
实例中。
接下来,我们将创建一个 Kafka 消费者类 KafkaConsumer
,用于监听 my-topic
主题并接收消息:
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
在上面的代码中,我们使用 @KafkaListener
注解声明了一个消费者方法,用于接收从 my-topic
主题中读取的消息。在这里,我们将消费者组 ID 设置为 my-group-id
。
现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run
命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/send
端点,以将消息发送到 Kafka。然后,我们可以在控制台上查看消费者接收到的消息。
这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。
以上就是Spring Boot整合Kafka教程详解的详细内容,更多关于Spring Boot整合Kafka的资料请关注脚本之家其它相关文章!
- Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码
- 基于SpringBoot 使用 Flink 收发Kafka消息的示例详解
- SpringBoot如何获取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不对应问题及解决
- SpringBoot+Nacos+Kafka微服务流编排的简单实现
- SpringBoot集成Kafka的步骤
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka实现producer和consumer的示例代码
- Spring Boot 基于 SCRAM 认证集成 Kafka 的过程详解
相关文章
Spring Cloud Gateway 记录请求应答数据日志操作
这篇文章主要介绍了Spring Cloud Gateway 记录请求应答数据日志操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2020-12-12MyBatis通用Mapper中的通用example(排序)详解
这篇文章主要介绍了MyBatis通用Mapper中的通用example(排序)详解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-12-12
最新评论