SpringBoot使用Redis实现消息队列的方法小结

 更新时间:2024年04月12日 11:53:02   作者:springdoc.cn  
在应用中把Redis当成消息队列来使用已经屡见不鲜了,我想主要原因是当代应用十有八九都会用到 Redis,因此不用再引入其他消息队列系统,而且Redis提供了好几种实现消息队列的方法,用起来也简单,本文给大家介绍了SpringBoot使用Redis实现消息队列的方法小结

使用 Redis 实现消息队列的几种方式

Redis 提供了多种方式来实现消息队列。

Pub/Sub

订阅发布模式,发布者把消息发布到某个 Channel,该 Channel 的所有订阅者都会收到消息。但是这种方式最大的问题是 「发布出去的消息,如果没有被监听消费,或者消费过程中宕机,那么消息就会永远丢失」。适合用于临时通知之类的场景,对于需要保证数据不丢失的场景不能使用这种方式。

List

List 是 Redis 提供的一种数据类型,底层是链表,可以用来实现队列、栈。

Stream

Stream 是一个由 Redis 5 引入的,功能完善的消息队列。想必也是 Redis 官方团队看到太多人拿 Redis 当消息队列使,于是干脆就在 Redis 上设计出一个类似于 Kafka 的消息队列。

Steam 支持消费组消费,一条消息只能被消费组中的其中一个消费者消费。支持 「消息确认」、支持 「回溯消费」 还支持把未 ACK(确认)的消息转移给其他消费者进行重新消费,在进行转移的时候还会累计消息的转移次数,当次数达到一定阈值还没消费成功,就可以放入死信队列。

这也是 Redis 种最复杂的一种数据类型。如果你真的到了需要使用 Redis Steam 作为消息队列的地步,那不如直接使用 RabbitMQ 等更加成熟且稳定的消息队列系统。

使用 List 实现可靠的消息队列

目前来说,这是用得最多的一种方式,适用于大多数简单的消息队列应用场景。List 类型有很多指令,但是作为消息队列来说用到的只有几个个:

LPUSH key element [element ...]

把元素插入到 List 的首部,如果 List 不存在,会自动创建。

BRPOPLPUSH source destination timeout

移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。

当 source List 中没有元素时,Redis 会阻塞连接,直到有其他客户端向其推送元素或超时。超时时间(秒)为 0 表示永远不超时。

注意,这个命令是 「原子性」 的,也就是说只要客户端获取到了返回的元素,那么这个元素一定就会在 destination List 有备份。这是实现可靠消息队列的关键!

RPOPLPUSH source destination

同上,它是 BRPOPLPUSH 命令的 「非阻塞」 版,如果 List 中没有元素就会立即返回 null

LREM key count element

从 List 中删除元素,count 的值不同,删除的方式也不同:

  • count > 0:从头到尾开始搜索,删除与 element 相等的元素,最多删除 count 个。

  • count < 0:从尾到头开始搜索,删除与 element 相等的元素,最多删除 count (绝对值)个。

  • count = 0:删除所有与元素相等的元素。

BLMOVE 和 LMOVE 命令」

  • LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>

  • BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout

从 Redis 6.2.0 开始,BRPOPLPUSH 和 RPOPLPUSH 命令就被声明为废弃了,取而代之的是语义更加明确的 BLMOVE 和 LMOVE 命令。

BLMOVE 和 LMOVE 可以通过参数指定元素出队列(source)的方向,和入队列(destination)的方向,除此以外并无其他区别。

实现思路

了解了上述几个命令后,一个简单易用且可靠的消息队列就呼之欲出了。

  • 生产者使用 LPUSH 命令往消息队列生产消息

  • 消费者使用 BRPOPLPUSH 命令从队列消费消息,并且还会在获取并返回消息的时候把该消息推送到另一个消息队列,也就是 Pending 队列,这个队列中存储的就是未被消费者 ACK 的消息

  • 消费者成功消费完毕后,使用 LREM 命令从 Pending 队列中删除这条消息,整个消费过程结束

  • 如果消费者在消费过程中出现异常、宕机,那么需要在恢复后从 Pending 队列中获取到这条消息,再进行重新消费,从而保证了消息队列的可靠性,不会丢失消息(可能存在重复消费,需要做好幂等处理)

在 Spring Boot 中实现

首先,创建 Spring Boot 项目,并整合 Redis。

创建一个 OrderConsumer Bean 模拟从队列中消费订单 ID。

package cn.springdoc.demo.consumer;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer implements ApplicationRunner, Runnable {

    static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    // 消息队列
    final String queue = "queue_orders";

    // pending 队列,即待确认消息的队列
    final String pendingQueue = "pending_queue_orders";

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 应用启动后,创建新的线程来执行消费任务
        Thread thread = new Thread(this);
        thread.setName("order-consumer-thread");
        thread.start();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的
                // 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null
                String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS);
                
                if (item == null) {
                    log.info("等待消息 ...");
                    continue ;
                }
                
                try {
                    
                    // 2:解析为 Long
                    Long orderId = Long.parseLong(item);
                    
                    // 模拟消息消费
                    log.info("消费消息: {}", orderId);
                    
                } catch (Exception e) {
                    log.error("消费异常:{}", e.getMessage());
                    continue;
                }
                
                // 3:消费成功,从 pending 队列删除记录,相当于确认消费
                stringRedisTemplate.opsForList().remove(pendingQueue, 0, item);
            } catch (Exception e) {
                log.error("队列监听异常:{}", e.getMessage());
                break;
            }
        }
        log.info("退出消费");
    }
}

OrderConsumer 实现了 ApplicationRunner 接口,在应用就绪后创建新的消费线程进行消费。

stringRedisTemplate.opsForList().rightPopAndLeftPush 方法从 queue 队列消费一条消息,同时把消息添加到  pendingQueue 队列。该方法底层调用的正是 brpoplpush 命令,最多阻塞 5 秒,超时后返回 null

得到消息后解析为 Long 类型,模拟消费,即输出到日志。如果消费成功,则调用 stringRedisTemplate.opsForList().remove 方法(底层正是 LREM 命令)从 pendingQueue 队列中删除消息。如果消费失败,失败的消息会在 pendingQueue 队列中继续存在,不会丢失,可以重新投递消费或者是人工处理。

测试

启动应用后,通过 Redis 客户端往 queue_orders 队列推送消息:

> lpush queue_orders 10000
"1"
> lpush queue_orders 10010
"1"
> lpush queue_orders 10011
"1"
> lpush queue_orders Nan
"1"

往 queue_orders 队列推送了四条订单的 ID。注意最后一条消息值是 Nan,这会导致 Long.parseLong 异常从而导致消费失败。

服务端输出日志如下:

[           main] cn.springdoc.demo.DemoApplication        : Started DemoApplication in 3.769 seconds (process running for 4.18)
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费消息: 10000
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费消息: 10010
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费消息: 10011
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费异常:For input string: "Nan"
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...

符合预期,前面三条消息都成功消费,最后一条消息消费失败。按照设计,这条消费失败的消息应该在 Pending 队列 pending_queue_orders 中存在。且应该只有这一条消息,因为其他三条消息都消费成功。

查看 pending_queue_orders 队列中的所有元素:

> lrange pending_queue_orders 0 -1
1) "Nan"

一切 OK,该队列中只有 Nan 这条消息,正是消费失败的那条消息。

此时,你如果想查看一下 Redis 中的所有 key,你会发现只有 pending_queue_orders 队列存在:

> keys *
1) "pending_queue_orders"

queue_orders 队列呢?这是 Redis List 的一个特性,当从 List 中弹出最后一个元素后,Redis 就会删除这个 Listqueue_orders 中的元素都被弹出了,所以它被删除了。当再次尝试往 queue_orders 中压入消息时,它会自动创建。也就是说 「我们不需要手动预先创建队列, Redis 会自己创建,也会在合适的时间删除,而这一切都是线程安全的」

由于这是线程安全的,所以队列中的 「一条消息只能被一个消费者(客户端)进行消费」,这非常适合在分布式或者是集群模式下使用,不必担心同一条消息被多个消费者消费到。

 

注意,Pending 队列中的消息可能存在重复消费的可能。例如,消费者成功消费消息后,在调用 remove 方法从 Pending 队列中删除消息时失败,那么 Pending 队列中的这条删除失败的消息其实已经是被成功消费了的,需要在业务中考虑到!

使用 BLMOVE 和 LMOVE 命令

上文说过,从 Redis 6.2.0 开始 BRPOPLPUSH 和 RPOPLPUSH 命令就被声明为废弃了,后续版本中推荐使用 BLMOVE 和 LMOVE 命令。

目前 StringRedisTemplate (Spring Boot 3.2.2)并未直接提供与 BLMOVE 和 LMOVE 命令对应的 API 方法,但是可以获取到底层连接对象来调用 BLMOVE 和 LMOVE 命令。

String item = this.stringRedisTemplate.execute(new RedisCallback<String>() {
    @Override
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        // 调用 bLMove 命令
        byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5);
        return ret == null ? null : new String(ret);
    }
});

Redis 的持久化方式

Redis 是一个内存数据库,为了保证数据的安全不丢失,它提供了两种数据备份(持久化)方式,即 「RDB」 和 「AOF」

  • 「RDB」:生成某一时刻的数据快照,通过子进程进行备份,数据可能不完整(取决于备份周期)。

  • 「AOF」:通过记录执行的指令到文件来实现数据备份,相对完整性较高,但是会记录每一条执行命令,性能会有一定影响。

这就需要根据你的业务场景来选择合适的持久化方式,也可以同时配合使用 「RDB」 和 「AOF」 两种方式,兼顾性能和数据安全。

总结

本文介绍了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH/BLMOVE 命令来实现一个线程安全且可靠的消息队列。

以上就是SpringBoot使用Redis实现消息队列的方法小结的详细内容,更多关于SpringBoot Redis消息队列的资料请关注脚本之家其它相关文章!

相关文章

  • Spring Boot 项目做性能监控的操作流程

    Spring Boot 项目做性能监控的操作流程

    这篇文章主要介绍了Spring Boot 项目如何做性能监控,本文通过实例代码图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • java 可重启线程及线程池类的设计(详解)

    java 可重启线程及线程池类的设计(详解)

    下面小编就为大家带来一篇java 可重启线程及线程池类的设计(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01
  • 简单实现Spring的IOC原理详解

    简单实现Spring的IOC原理详解

    这篇文章主要介绍了简单实现Spring的IOC原理详解,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • 讲解ssm框架整合(最通俗易懂)

    讲解ssm框架整合(最通俗易懂)

    这篇文章主要介绍了讲解ssm框架整合(最通俗易懂),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Java中的异常Exception详细解析

    Java中的异常Exception详细解析

    这篇文章主要介绍了Java中的异常Exception详细解析,Java语言中,将程序执行中发生的不正常情况称为"异常",(开发过程中的语法错误和逻辑错误不是异常),异常分为两大类,运行时异常和编译时异常,需要的朋友可以参考下
    2024-01-01
  • 浅谈spring-boot 允许接口跨域并实现拦截(CORS)

    浅谈spring-boot 允许接口跨域并实现拦截(CORS)

    本篇文章主要介绍了浅谈spring-boot 允许接口跨域并实现拦截(CORS),具有一定的参考价值,有兴趣的可以了解一下
    2017-08-08
  • Java中@ConditionalOnProperty注解使用

    Java中@ConditionalOnProperty注解使用

    在Spring Boot中,@ConditionalOnProperty注解是一种方便的工具,用于根据应用程序配置文件中的属性值来控制Bean的创建和加载,本文就来介绍一下Java中@ConditionalOnProperty注解使用,感兴趣的可以了解一下
    2023-11-11
  • Java中的信息摘要算法MessageDigest类用法详解

    Java中的信息摘要算法MessageDigest类用法详解

    这篇文章主要介绍了Java中的信息摘要算法MessageDigest类用法详解,java.security.MessageDigest类为应用程序提供信息摘要算法的功能,如MD5或SHA-1或SHA-256算法,信息摘要是安全的单向哈希函数,它接收任意大小的数据,并输出固定长度的哈希值,需要的朋友可以参考下
    2024-01-01
  • java操作mongodb之多表联查的实现($lookup)

    java操作mongodb之多表联查的实现($lookup)

    这篇文章主要介绍了java操作mongodb之多表联查的实现($lookup),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • Java @GlobalLock注解详细分析讲解

    Java @GlobalLock注解详细分析讲解

    这篇文章主要介绍了Java @GlobalLock注解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-11-11

最新评论