SpringBoot使用Kafka来优化接口请求的并发方式

 更新时间:2024年07月30日 14:58:57   作者:培根芝士  
这篇文章主要介绍了SpringBoot使用Kafka来优化接口请求的并发方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

在Spring Boot中使用 Kafka 来优化接口请求的并发,主要是通过将耗时的任务异步化到Kafka消息队列中来实现。这样,接口可以立即响应客户端,而不需要等待耗时任务完成。

在Spring Boot应用程序中调用Kafka通常涉及使用Spring Kafka库,它提供了与Apache Kafka的高级集成,使得从Spring Boot应用程序中发送和接收消息变得更加简单和直观。

安装Apache Kafka

编写docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

使用docker compose启动容器

docker-compose up -d

添加依赖

首先,需要在pom.xml中添加Spring Kafka的依赖。

<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置Kafka

在application.properties文件中配置Kafka的属性。

# application.properties  
spring.kafka.bootstrap-servers=localhost:9092  
spring.kafka.consumer.group-id=myGroup  
spring.kafka.consumer.auto-offset-reset=earliest  
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer  
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Kafka生产者服务

创建一个服务类来发送消息到Kafka。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String value) {
        // 异步发送消息
        kafkaTemplate.send(topic, key, value).addCallback(success -> {
            System.out.println("Message sent successfully: " + value);
        }, failure -> {
            System.err.println("Failed to send message: " + value);
        });
    }
}

kafkaTemplate.send 方法是 Spring Kafka 提供的一个非常灵活的方法,它允许以不同的方式发送消息到 Kafka 集群。

当调用 kafkaTemplate.send 方法时,可以指定要发送到的 topic、key 和 value,但 key 是可选的。

  • 未指定 key:当不指定 key 时,Kafka 会根据配置的分区器(默认是 DefaultPartitioner)来决定消息应该被发送到哪个分区。在没有 key 的情况下,分区器可能会采用轮询(round-robin)或其他算法来随机选择一个分区进行消息发送。这种方式下,消息的分布可能会比较均匀,但无法控制具有相同逻辑标识的消息被发送到同一个分区。
  • 指定 key:当指定 key 时,Kafka 会根据 key 的哈希值来计算分区号,确保具有相同 key 的消息被发送到同一个分区。这种方式有助于保持消息的顺序性,因为 Kafka 保证同一个分区内的消息是有序的。

Kafka消费者服务

创建一个监听器来接收Kafka中的消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "your-topic-name", groupId = "myGroup")
    public void listen(String message) {
        // 处理消息(可能是耗时的操作)
        System.out.println("Received message in group 'myGroup': " + message);
        // 处理耗时操作
        ...
    }
}

在使用Kafka消费者时,Kafka本身已经设计为支持并发消费,即可以通过配置多个消费者实例(partitions的数量通常决定了并行度的一个上限,因为Kafka会尽量将不同的partitions分配给不同的消费者以提高并行度)来实现并行处理。

但是,如果想要在消费者内部进一步提高处理消息的并发度,可以结合使用Kafka消费者和Java的线程池来实现。

@Service  
public class KafkaConsumerService {  
  
    @KafkaListener(topics = "your-topic-name", groupId = "myGroup")  
    public void listen(String message) {  
        // 将消息发送到线程池处理  
        executorService.submit(() -> processMessage(message));  
    }  
  
    private void processMessage(String message) {  
        // 处理消息的逻辑  
        System.out.println("Processing message: " + message);  
        // 模拟耗时操作  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
        }  
    }  
  
    // 线程池配置  
    private ExecutorService executorService = Executors.newFixedThreadPool(30);  
  
    // 确保优雅关闭线程池  
    @PreDestroy  
    public void shutdown() {  
        executorService.shutdown();  
        try {  
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {  
                executorService.shutdownNow();  
            }  
        } catch (InterruptedException ex) {  
            executorService.shutdownNow();  
            Thread.currentThread().interrupt();  
        }  
    }  
}

控制器

在控制器中调用Kafka生产者服务来发送消息,并立即响应客户端。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
  
@RestController
public class MyController {

    @Autowired  
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendMessage(@RequestParam String message) {
        // 发送消息到Kafka,并立即返回响应
        kafkaProducerService.sendMessage("your-topic-name", "key1", message);
        return "Message sent to Kafka";
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot自动配置深入探究实现原理

    SpringBoot自动配置深入探究实现原理

    在springboot的启动类中可以看到@SpringBootApplication注解,它是SpringBoot的核心注解,也是一个组合注解。其中@SpringBootConfiguration、@EnableAutoConfiguration、@ComponentScan三个注解尤为重要。今天我们就来浅析这三个注解的含义
    2022-08-08
  • 排序算法图解之Java希尔排序

    排序算法图解之Java希尔排序

    希尔排序是希尔(Donald Shell)于1959年提出的一种排序算法,其也是一种特殊的插入排序,即将简单的插入排序进行改进后的一个更加高效的版本,也称缩小增量排序。本文通过图片和示例讲解了希尔排序的实现,需要的可以了解一下
    2022-11-11
  • Java中方法使用的深入讲解

    Java中方法使用的深入讲解

    这篇文章主要给大家介绍了关于Java中方法使用的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • springboot项目不输出nohup.out日志的解决

    springboot项目不输出nohup.out日志的解决

    这篇文章主要介绍了springboot项目不输出nohup.out日志的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • 详解使用Java代码读取并比较本地两个txt文件区别

    详解使用Java代码读取并比较本地两个txt文件区别

    这篇文章主要为大家介绍了使用Java代码读取并比较本地两个txt文件区别详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • Java实现添加文字水印&图片水印的方法详解

    Java实现添加文字水印&图片水印的方法详解

    为图片添加水印的主要作用是保护图片版权,防止图片被未经授权的人使用或传播。本文为大家介绍了Java实现添加文字水印&图片水印的具体方法,需要的可以参考一下
    2023-02-02
  • SpringBoot定时任务实现数据同步的方法

    SpringBoot定时任务实现数据同步的方法

    业务需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步,这篇文章主要介绍了SpringBoot定时任务实现数据同步的方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-08-08
  • Java面试之如何获取客户端真实IP

    Java面试之如何获取客户端真实IP

    这篇文章主要给大家介绍了关于Java面试之如何获取客户端真实IP的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-09-09
  • Java实现游戏飞机大战-III的示例代码

    Java实现游戏飞机大战-III的示例代码

    这篇文章主要为大家介绍了如何利用Java实现经典的游戏之飞机大战,文中采用了swing技术进行了界面化处理,感兴趣的小伙伴可以动手试一试
    2022-02-02
  • Java的PriorityBlockingQueue优先级阻塞队列代码实例

    Java的PriorityBlockingQueue优先级阻塞队列代码实例

    这篇文章主要介绍了Java的PriorityBlockingQueue优先级阻塞队列代码实例,PriorityBlockingQueue顾名思义是带有优先级的阻塞队列,为了实现按优先级弹出数据,存入其中的对象必须实现comparable接口自定义排序方法,需要的朋友可以参考下
    2023-12-12

最新评论