SpringBoot使用Kafka来优化接口请求的并发方式
在Spring Boot中使用 Kafka 来优化接口请求的并发,主要是通过将耗时的任务异步化到Kafka消息队列中来实现。这样,接口可以立即响应客户端,而不需要等待耗时任务完成。
在Spring Boot应用程序中调用Kafka通常涉及使用Spring Kafka库,它提供了与Apache Kafka的高级集成,使得从Spring Boot应用程序中发送和接收消息变得更加简单和直观。
安装Apache Kafka
编写docker-compose.yml
version: '3' services: zookeeper: image: wurstmeister/zookeeper container_name: zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka container_name: kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
使用docker compose启动容器
docker-compose up -d
添加依赖
首先,需要在pom.xml中添加Spring Kafka的依赖。
<!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置Kafka
在application.properties文件中配置Kafka的属性。
# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka生产者服务
创建一个服务类来发送消息到Kafka。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String key, String value) { // 异步发送消息 kafkaTemplate.send(topic, key, value).addCallback(success -> { System.out.println("Message sent successfully: " + value); }, failure -> { System.err.println("Failed to send message: " + value); }); } }
kafkaTemplate.send 方法是 Spring Kafka 提供的一个非常灵活的方法,它允许以不同的方式发送消息到 Kafka 集群。
当调用 kafkaTemplate.send 方法时,可以指定要发送到的 topic、key 和 value,但 key 是可选的。
- 未指定 key:当不指定 key 时,Kafka 会根据配置的分区器(默认是 DefaultPartitioner)来决定消息应该被发送到哪个分区。在没有 key 的情况下,分区器可能会采用轮询(round-robin)或其他算法来随机选择一个分区进行消息发送。这种方式下,消息的分布可能会比较均匀,但无法控制具有相同逻辑标识的消息被发送到同一个分区。
- 指定 key:当指定 key 时,Kafka 会根据 key 的哈希值来计算分区号,确保具有相同 key 的消息被发送到同一个分区。这种方式有助于保持消息的顺序性,因为 Kafka 保证同一个分区内的消息是有序的。
Kafka消费者服务
创建一个监听器来接收Kafka中的消息。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumerService { @KafkaListener(topics = "your-topic-name", groupId = "myGroup") public void listen(String message) { // 处理消息(可能是耗时的操作) System.out.println("Received message in group 'myGroup': " + message); // 处理耗时操作 ... } }
在使用Kafka消费者时,Kafka本身已经设计为支持并发消费,即可以通过配置多个消费者实例(partitions的数量通常决定了并行度的一个上限,因为Kafka会尽量将不同的partitions分配给不同的消费者以提高并行度)来实现并行处理。
但是,如果想要在消费者内部进一步提高处理消息的并发度,可以结合使用Kafka消费者和Java的线程池来实现。
@Service public class KafkaConsumerService { @KafkaListener(topics = "your-topic-name", groupId = "myGroup") public void listen(String message) { // 将消息发送到线程池处理 executorService.submit(() -> processMessage(message)); } private void processMessage(String message) { // 处理消息的逻辑 System.out.println("Processing message: " + message); // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 线程池配置 private ExecutorService executorService = Executors.newFixedThreadPool(30); // 确保优雅关闭线程池 @PreDestroy public void shutdown() { executorService.shutdown(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } }
控制器
在控制器中调用Kafka生产者服务来发送消息,并立即响应客户端。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class MyController { @Autowired private KafkaProducerService kafkaProducerService; @PostMapping("/send") public String sendMessage(@RequestParam String message) { // 发送消息到Kafka,并立即返回响应 kafkaProducerService.sendMessage("your-topic-name", "key1", message); return "Message sent to Kafka"; } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Java的PriorityBlockingQueue优先级阻塞队列代码实例
这篇文章主要介绍了Java的PriorityBlockingQueue优先级阻塞队列代码实例,PriorityBlockingQueue顾名思义是带有优先级的阻塞队列,为了实现按优先级弹出数据,存入其中的对象必须实现comparable接口自定义排序方法,需要的朋友可以参考下2023-12-12
最新评论