RabbitMQ延时队列详解与Java代码实现

 更新时间:2023年04月28日 10:26:07   作者:小威要向诸佬学习呀  
这篇文章主要介绍了RabbitMQ延时队列详解与Java代码实现,RabbitMQ 延时队列是指消息在发送到队列后,并不立即被消费者消费,而是等待一段时间后再被消费者消费。这种队列通常用于实现定时任务,需要的朋友可以参考下

在这里插入图片描述

RabbitMQ 延时队列介绍

RabbitMQ 延时队列是指消息在发送到队列后,并不立即被消费者消费,而是等待一段时间后再被消费者消费。这种队列通常用于实现定时任务,例如,订单超时未支付系统取消订单释放所占库存等。

RabbitMQ实现延时队列的方法有多种,其中比较常见的是使用插件或者通过DLX(Dead Letter Exchange)机制实现。

使用插件实现延时队列

RabbitMQ提供了rabbitmq_delayed_message_exchange插件,可以通过该插件实现延时队列。该插件的原理是在消息发送时,将消息发送到一个特定的Exchange中,然后该Exchange会根据消息中的延时时间将消息转发到指定的队列中,从而实现延时队列的功能

使用该插件需要先安装插件,然后创建一个Exchange,并将该Exchange的类型设置为x-delayed-message,然后将该Exchange与队列绑定即可。

使用DLX机制实现延时队列

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。而对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果

DLX机制是RabbitMQ提供的一种消息转发机制,它可以将无法被处理的消息转发到指定的Exchange中,从而实现消息的延时处理。具体实现步骤如下:

  • 创建一个普通的Exchange和Queue,并将它们绑定在一起。
  • 创建一个DLX Exchange,并将普通Exchange绑定到该DLX Exchange上。
  • 将Queue设置为具有TTL(Time To Live)属性,并设置消息过期时间。
  • 将Queue绑定到DLX Exchange上。

当消息过期后,会被发送到DLX Exchange中,然后再由DLX Exchange将消息转发到指定的Exchange中,从而实现延时队列的功能。

使用DLX机制实现延时队列的优点是不需要安装额外的插件,但是需要对消息的过期时间进行精确控制,否则可能会出现消息过期时间不准确的情况。

Java语言设置延时队列

下面是使用 Java 语言通过 RabbitMQ 设置延时队列的步骤:

安装插件

首先,需要安装 rabbitmq_delayed_message_exchange 插件。可以通过以下命令安装:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

创建延时交换机

延时队列需要使用延时交换机。可以使用 x-delayed-message 类型创建一个延时交换机。以下是创建延时交换机的示例代码:

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);

创建延时队列

创建延时队列时,需要将队列绑定到延时交换机上,并设置队列的 TTL(Time To Live)参数。以下是创建延时队列的示例代码:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "delayed-exchange");
args.put("x-dead-letter-routing-key", "delayed-queue");
args.put("x-message-ttl", 5000);
channel.queueDeclare("delayed-queue", true, false, false, args);
channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");

在上述代码中,将队列绑定到延时交换机上,并设置了队列的 TTL 参数为 5000 毫秒,即消息在发送到队列后,如果在 5000 毫秒内没有被消费者消费,则会被转发到 delayed-exchange 交换机上,并发送到 delayed-queue 队列中。

发送延时消息

发送延时消息时,需要设置消息的 expiration 属性,该属性表示消息的过期时间。以下是发送延时消息的示例代码:

Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .headers(headers)
        .expiration("5000")
        .build();
channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());

在上述代码中,设置了消息的 expiration 属性为 5000 毫秒,并将消息发送到 delayed-exchange 交换机上,路由键为 delayed-queue,消息内容为 “Hello, delayed queue!”。

消费延时消息

消费延时消息时,需要设置消费者的 QOS(Quality of Service)参数,以控制消费者的并发处理能力。以下是消费延时消息的示例代码:

channel.basicQos(1);
channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("Received message: " + message);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});

在上述代码中,设置了 QOS 参数为 1,即每次只处理一个消息。然后使用 basicConsume 方法消费 delayed-queue 队列中的消息,并在消费完成后,使用 basicAck 方法确认消息已被消费。

通过上述步骤,就可以实现 RabbitMQ 延时队列,用于实现定时任务等功能。

RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

RabbitMQ延时队列具体代码

下面是具体代码(附注释):

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayedQueueExample {
    private static final String EXCHANGE_NAME = "delayed_exchange";
    private static final String QUEUE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "delayed_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /*
         Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;
                                              */
        // 创建一个支持延时队列的Exchange
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);

        // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
        Map<String, Object> queueArguments = new HashMap<>();
        queueArguments.put("x-dead-letter-exchange", "");
        queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
        queueArguments.put("x-message-ttl", 5000);
        channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // 发送消息到延时队列中,设置expiration参数
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000")
                .build();
        String message = "Hello, delayed queue!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
        System.out.println("Sent message to delayed queue: " + message);
        channel.close();
        connection.close();
    }
}

在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

到此这篇关于RabbitMQ延时队列详解与Java代码实现的文章就介绍到这了,更多相关Java实现RabbitMQ延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 简述JAVA同步、异步、阻塞和非阻塞之间的区别

    简述JAVA同步、异步、阻塞和非阻塞之间的区别

    这篇文章主要介绍了JAVA同步、异步、阻塞和非阻塞之间的区别,文中讲解非常细致,帮助大家更好的理解和学习,感兴趣的朋友可以了解下
    2020-08-08
  • spring boot权限管理的几种常见方式

    spring boot权限管理的几种常见方式

    这篇文章主要给大家介绍了关于spring boot权限管理的几种常见方式,在Web应用程序中,用户权限管理是至关重要的,文中通过代码示例介绍的非常详细,需要的朋友可以参考下
    2023-08-08
  • java安全 ysoserial CommonsCollections1示例解析

    java安全 ysoserial CommonsCollections1示例解析

    这篇文章主要介绍了java安全 ysoserial CommonsCollections1示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • Java8 Stream流的常用方法汇总

    Java8 Stream流的常用方法汇总

    Java8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据,下面这篇文章主要给大家介绍了关于Java8 Stream流的常用方法,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • 一文秒懂Java enum常见的用法讲解

    一文秒懂Java enum常见的用法讲解

    这篇文章主要介绍了一文秒懂Java enum常见的用法讲解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • java  interface 接口的使用好处分析

    java interface 接口的使用好处分析

    这篇文章主要介绍了java interface 接口的使用好处,结合实例形式分析了java interface接口的功能、基本使用方法及多态性的使用优点,需要的朋友可以参考下
    2019-11-11
  • springboot中JSONObject遍历并替换部分json值

    springboot中JSONObject遍历并替换部分json值

    这篇文章主要介绍了springboot中JSONObject遍历并替换部分json值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • 使用Java生成32位16进制密钥的代码实现

    使用Java生成32位16进制密钥的代码实现

    在许多加密和安全应用中,生成随机的密钥是至关重要的一步,密钥通常以16进制形式表示,并且具有特定的长度,在这篇博客中,我们将探讨如何使用Java生成一个32位长度的16进制密钥,并展示详细的代码示例和运行结果,需要的朋友可以参考下
    2024-08-08
  • Idea中导入新模块无法被识别的问题

    Idea中导入新模块无法被识别的问题

    这篇文章主要介绍了Idea中导入新模块无法被识别的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-11-11
  • Java实现导出ZIP压缩包的方法

    Java实现导出ZIP压缩包的方法

    这篇文章主要介绍了Java实现导出ZIP压缩包的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11

最新评论