Spring Cloud Stream实现数据流处理
1.什么是Spring Cloud Stream?
我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。
我们很容易总结出每个模块的流程:
- 从上一个模块拉取数据
- 处理数据
- 将处理完成的数据发给下一个模块
其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:**提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。**但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。
2.环境准备
采用docker-compose搭建kafaka环境
version: '3' networks: kafka: ipam: driver: default config: - subnet: "172.22.6.0/24" services: zookepper: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest container_name: zookeeper-server restart: unless-stopped volumes: - "/etc/localtime:/etc/localtime" environment: ALLOW_ANONYMOUS_LOGIN: yes ports: - "2181:2181" networks: kafka: ipv4_address: 172.22.6.11 kafka: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1 container_name: kafka restart: unless-stopped volumes: - "/etc/localtime:/etc/localtime" environment: ALLOW_PLAINTEXT_LISTENER: yes KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181 KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092 ports: - "9092:9092" depends_on: - zookepper networks: kafka: ipv4_address: 172.22.6.12 kafka-map: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map container_name: kafka-map restart: unless-stopped volumes: - "./kafka/kafka-map/data:/usr/local/kafka-map/data" environment: DEFAULT_USERNAME: admin DEFAULT_PASSWORD: 123456 ports: - "9080:8080" depends_on: - kafka networks: kafka: ipv4_address: 172.22.6.13
run
docker-compose -f docker-compose-kafka.yml -p kafka up -d
kafka-map
https://github.com/dushixiang/kafka-map
- 访问:http://127.0.0.1:9080
- 账号密码:admin/123456
3.代码工程
实验目标
- 生成UUID并将其发送到Kafka主题
batch-in
。 - 从
batch-in
主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out
主题。 - 监听
batch-out
主题并打印接收到的消息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spring-cloud-stream-kafaka</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Boot Starter Test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
处理流
/* * Copyright 2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.et; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import java.util.List; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; /** * @author Steven Gantz */ @SpringBootApplication public class CloudStreamsFunctionBatch { public static void main(String[] args) { SpringApplication.run(CloudStreamsFunctionBatch.class, args); } @Bean public Supplier<UUID> stringSupplier() { return () -> { var uuid = UUID.randomUUID(); System.out.println(uuid + " -> batch-in"); return uuid; }; } @Bean public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() { return idBatch -> { System.out.println("Removed digits from batch of " + idBatch.size()); return idBatch.stream() .map(UUID::toString) // Remove all digits from the UUID .map(uuid -> uuid.replaceAll("\\d","")) .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build()) .toList(); }; } @KafkaListener(id = "batch-out", topics = "batch-out") public void listen(String in) { System.out.println("batch-out -> " + in); } }
定义一个名为
stringSupplier
的Bean,它实现了Supplier<UUID>
接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in
主题。定义一个名为
digitRemovingConsumer
的Bean,它实现了Function<List<UUID>, List<Message<String>>>
接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。使用
@KafkaListener
注解定义一个Kafka监听器,监听batch-out
主题。当接收到消息时,调用listen
方法并打印接收到的消息内容。
配置文件
spring: cloud: function: definition: stringSupplier;digitRemovingConsumer stream: bindings: stringSupplier-out-0: destination: batch-in digitRemovingConsumer-in-0: destination: batch-in group: batch-in consumer: batch-mode: true digitRemovingConsumer-out-0: destination: batch-out kafka: binder: brokers: localhost:9092 bindings: digitRemovingConsumer-in-0: consumer: configuration: # Forces consumer to wait 5 seconds before polling for messages fetch.max.wait.ms: 5000 fetch.min.bytes: 1000000000 max.poll.records: 10000000
参数解释
spring: cloud: function: definition: stringSupplier;digitRemovingConsumer
spring.cloud.function.definition
:定义了两个函数,stringSupplier
和digitRemovingConsumer
。这两个函数将在应用程序中被使用。
stream: bindings: stringSupplier-out-0: destination: batch-in
stream.bindings.stringSupplier-out-0.destination
:将stringSupplier
函数的输出绑定到Kafka主题batch-in
。
digitRemovingConsumer-in-0: destination: batch-in group: batch-in consumer: batch-mode: true
stream.bindings.digitRemovingConsumer-in-0.destination
:将digitRemovingConsumer
函数的输入绑定到Kafka主题batch-in
。group: batch-in
:指定消费者组为batch-in
,这意味着多个实例可以共享这个组来处理消息。consumer.batch-mode: true
:启用批处理模式,允许消费者一次处理多条消息。
digitRemovingConsumer-out-0: destination: batch-out
stream.bindings.digitRemovingConsumer-out-0.destination
:将digitRemovingConsumer
函数的输出绑定到Kafka主题batch-out
。
以上只是一些关键代码
4.测试
启动弄Spring Boot应用,可以看到控制台输出日志如下:
291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in Removed digits from batch of 5 batch-out -> eacc-ee-dfb-b-dead batch-out -> cbae-e-f-c-acfb batch-out -> ab-dd---adade batch-out -> db-fb-f-bbb-bfdec batch-out -> bdb--d-ad-bbbb
以上就是Spring Cloud Stream实现数据流处理的详细内容,更多关于Spring Cloud Stream数据流处理的资料请关注脚本之家其它相关文章!
相关文章
Springboot项目升级2.2.x升至2.7.x的示例代码
本文主要介绍了Springboot项目升级2.2.x升至2.7.x的示例代码,会有很多的坑,具有一定的参考价值,感兴趣的可以了解一下2023-09-09SpringBoot整合Gson 整合Fastjson的实例详解
这篇文章主要介绍了SpringBoot整合Gson 整合Fastjson的实例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-11-11SpringBoot使用Sa-Token实现账号封禁、分类封禁、阶梯封禁的示例代码
本文主要介绍了SpringBoot使用Sa-Token实现账号封禁、分类封禁、阶梯封禁的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2023-07-07
最新评论