SpringBoot使用Redis Stream实现轻量消息队列的示例代码
引言
Redis Stream 是 Redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 Kafka 和类似的消息队列系统,且完全集成在 Redis 中,利用了 Redis 的高性能和持久化特性。
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
说明:此部分定义了 Redis 相关的依赖,确保项目能够引入并使用 Spring Boot 提供的 Redis 启动器。
RedisTemplate 配置
package com.mjg.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); // 注册 Java 8 日期时间模块 om.registerModule(new JavaTimeModule()); om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); jackson2JsonRedisSerializer.serialize(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key 采用 String 的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash 的 key 也采用 String 的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value 序列化方式采用 jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash 的 value 序列化方式采用 jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
说明:此配置类用于设置 RedisTemplate 的序列化方式,以满足不同数据类型的存储和读取需求。
RedisStreamConfig
package com.mjg.config; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisServerCommands; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; import java.net.InetAddress; import java.time.Duration; import java.util.Properties; @Slf4j @RequiredArgsConstructor @Configuration public class RedisStreamConfig implements InitializingBean, DisposableBean { private final RedisTemplate<String, Object> redisTemplate; public static String streamName = "user-event-stream"; public static String userEventGroup = "user-event-group"; private final ThreadPoolTaskExecutor threadPoolTaskExecutor; /** * 消息侦听器容器,用于监听 Redis Stream 中的消息 * * @param connectionFactory Redis 连接工厂,用于创建 Redis 连接 * @param messageConsumer 消息消费者,用于处理接收到的消息 * @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 类型的消息侦听器容器 */ @Bean public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) { StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer); listenerContainer.start(); return listenerContainer; } /** * 创建一个流容器,用于监听 Redis Stream 中的数据 * * @param streamName Redis Stream 的名称 * @param connectionFactory Redis 连接工厂 * @param streamListener 绑定的监听类 * @return 返回 StreamMessageListenerContainer 对象 */ @SneakyThrows private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间 .batchSize(10) // 批量抓取消息 .targetType(String.class) // 传递的数据类型 .executor(threadPoolTaskExecutor) .build(); StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer .create(connectionFactory, options); // 指定消费最新的消息 StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed()); // 创建消费者 StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener); // 指定消费者对象 container.register(streamReadRequest, streamListener); return container; } /** * 生成流读取请求 * * @param offset 偏移量,用于指定从 Redis Stream 中的哪个位置开始读取消息 * @param streamListener 流侦听器,用于处理接收到的消息 * @return 返回一个 StreamReadRequest 对象,表示一个流读取请求 * @throws Exception 当 streamListener 无法识别为 MessageConsumer 类型时,抛出异常 */ private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception { Consumer consumer; if (streamListener instanceof MessageConsumer) { consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName()); } else { throw new Exception("无法识别的 stream key"); } // 关闭自动 ack 确认 return StreamMessageListenerContainer.StreamReadRequest.builder(offset) .errorHandler((error) -> { log.error(error.getMessage()); }) .cancelOnError(e -> false) .consumer(consumer) // 关闭自动 ack 确认 .autoAcknowledge(false) .build(); } /** * 检查 Redis 版本是否符合要求 * * @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,抛出该异常 */ private void checkRedisVersion() { // 获得 Redis 版本 Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info); Assert.notNull(info, "Redis info is null"); Object redisVersion = info.get("redis_version"); Integer anInt = Convert.toInt(redisVersion); if (anInt < 5) { throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", redisVersion)); } } @Override public void destroy() throws Exception { } @Override public void afterPropertiesSet() throws Exception { checkRedisVersion(); StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream(); if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) { streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup); } } }
说明:该配置类实现了对 Redis Stream 的相关配置,包括消息监听容器的创建、流读取请求的生成、Redis 版本的检查以及组的创建等功能。
生产者
package com.mjg.config; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Collections; @Component @RequiredArgsConstructor @Slf4j public class MessageProducer { private final RedisTemplate<String, Object> redisTemplate; public void sendMessage(String streamKey, Object message) { RecordId recordId = redisTemplate .opsForStream().add(StreamRecords.newRecord() .ofMap(Collections.singletonMap("data", message)) .withStreamKey(streamKey)); if (recordId!= null) { log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId); } } }
说明:MessageProducer 类负责向 Redis Stream 发送消息。
消费者
package com.mjg.config; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; @RequiredArgsConstructor @Component public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> { private final RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(ObjectRecord<String, String> message) { String stream = message.getStream(); String messageId = message.getId().toString(); String messageBody = message.getValue(); System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId); System.out.println("Message body: " + messageBody); // 消息应答 redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId()); } }
说明:MessageConsumer 类实现了 StreamListener 接口,用于处理从 Redis Stream 接收到的消息,并进行相应的应答操作。
测试
@RequiredArgsConstructor @Slf4j @RestController public class MessageController { public static String streamName = "user-event-stream"; private final MessageProducer messageProducer; @GetMapping("/send") public void send() { messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now()); } }
说明:MessageController 类中的 send 方法通过调用 MessageProducer 来发送消息到指定的 Redis Stream 中。
以上就是SpringBoot使用Redis Stream实现轻量消息队列的示例代码的详细内容,更多关于SpringBoot Redis Stream轻量消息队列的资料请关注脚本之家其它相关文章!
相关文章
Spring data jpa @Query update的坑及解决
这篇文章主要介绍了Spring data jpa @Query update的坑及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-02-02SpringBoot实现application.yml文件敏感信息加密
本文主要介绍了SpringBoot实现application.yml文件敏感信息加密,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2023-07-07
最新评论