详解RabbitMQ延迟队列的基本使用和优化

 更新时间:2023年05月08日 09:47:13   作者:不断前进的皮卡丘  
这篇文章主要介绍了详解RabbitMQ延迟队列的基本使用和优化,延迟队列中的元素都是带有时间属性的。延迟队列就是用来存放需要在指定时间被处理的元素的队列,需要的朋友可以参考下

1.延迟队列基本介绍

一般队列中的元素总是希望能够早点被取出来进行处理,但是延迟队列中的元素则是希望可以在指定时间内被取出和处理,延迟队列中的元素都是带有时间属性的。延迟队列就是用来存放需要在指定时间被处理的元素的队列

在这里插入图片描述

延迟队列就是想要消息延迟一段时间后被处理,TTL可以让消息在延迟一段时间后变成死信。变成死信的消息都会被投递到死信队列中,这样的话,只要消费者一直消费死信队列里面的消息就可以了,因为里面的消息都是希望被马上处理的消息 生产者生产一条延时消息,根据需要延时时间的不同,通过不同的routing key把消息路由到不同的延迟队列,每一个队列都设置了不同的TTL属性,并且绑定在同一个死信交换机中,消息过期了以后,根据routing key的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理就可以了。注意:不要造成重复消费

2.延迟队列使用场景

下面的场景需要使用延迟队列

  1. 订单在十分钟内没有支付就自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  3. 账单在一周内没有支付,就会自动结算
  4. 用户注册成功以后,如果三天内没有登录就进行短信题提醒
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议以后,需要提前十分钟通知各个参会人员参加会议。

3.Spring Boot集成RabbitMQ

3.1创建项目,引入依赖

在这里插入图片描述

相关依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

3.2application.properties配置文件

# RabbitMQ/配置
#服务器地址
spring.rabbitmq.host=服务器地址
#服务端口号
spring.rabbitmq.port=5672
#虚拟主机名称
spring.rabbitmq.virtual-host=/myhost
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=123456

3.3 队列TTL-代码结构图

在这里插入图片描述

3.4MQ配置类

package com.zyh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * @author zengyihong
 * @create 2022--10--04 16:44
 */
@Configuration
public class TtlQueueConfiguration {
    //普通交换机
    public static final String X_EXCHANGE = "X";
    //普通队列
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信队列QD
    public static final String QUEUE_D = "QD";
    /**
     * 声明普通交换机X
     *
     * @return
     */
    @Bean
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }
    /**
     * 声明队列QA
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        //创建集合保存队列属性
        Map<String, Object> map = new HashMap<>();
        //设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routing key
        map.put("x-dead-letter-routing-key", "YD");
        //设置队列延迟时间 10秒
        map.put("x-message-ttl", 10000);
        //创建队列
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }
    /**
     * 把QA队列和交换机X进行绑定
     *
     * @return
     */
    @Bean
    public Binding queueA_BindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XA");
    }
    /**
     * 声明队列QB
     *
     * @return
     */
    @Bean
    public Queue queueB() {
        //创建集合保存队列属性
        Map<String, Object> map = new HashMap<>();
        //设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routing key
        map.put("x-dead-letter-routing-key", "YD");
        //设置队列延迟时间 10秒
        map.put("x-message-ttl", 40000);
        //创建队列
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }
    /**
     * 把QB队列和交换机X进行绑定
     *
     * @return
     */
    @Bean
    public Binding queueB_BindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XB");
    }
    /**
     * 声明死信交换机Y
     *
     * @return
     */
    @Bean
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    /**
     * 声明死信队列QD
     *
     * @return
     */
    @Bean
    public Queue queueD() {
        return new Queue(QUEUE_D);
    }
    /**
     * 把死信交换机和死信队列进行绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingQD(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("YD");
    }
}

3.5生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 生产者发送消息
     * @param message
     */
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //记录日志
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);
        //给QA队列发送消息
        rabbitTemplate.convertSendAndReceive("X","XA", "消息来自TTL为10秒的队列:"+message);
        rabbitTemplate.convertSendAndReceive("X","XB", "消息来自TTL为40秒的队列:"+message);
    }
}

3.6消费者代码

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D)
    public void receiveQD(Message message, Channel channel){
        //获取消息
        String msg=new String(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}",new Date(),msg);
    }
}

3.7测试

在这里插入图片描述

在这里插入图片描述

启动boot项目,在浏览器输入localhost:8080/ttl/sendMessage/Hello

在这里插入图片描述

但是这种方式有一种缺点,现在我们只有TTL为10s和40s的延迟队列,如果我们需要其他延时时间的队列的话,那么我们又得新增其他队列,这样其实并不方便,我们想要的是能够动态设置TTL,这样就不需要为每个TTL设置新的延迟队列了。

4.延迟队列优化

4.1代码结构图

在这里插入图片描述

4.2配置类

在之前写的代码基础上新增一个配置类

package com.zyh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * @author zengyihong
 * @create 2022--10--05 10:44
 */
@Configuration
public class MessageTtlQueueConfiguration {
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列
    public static final String QUEUE_C = "QC";
    /**
     * 声明QC队列
     * @return
     */
    @Bean
    public Queue queueC(){
        //创建集合保存队列属性
        Map<String, Object> map = new HashMap<>();
        //设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routing key
        map.put("x-dead-letter-routing-key", "YD");
        //设置队列延迟时间 10秒
        map.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_C).withArguments(map).build();
    }
    /**
     * 把QC队列和正常交换机X进行绑定
     *
     * @return
     */
    @Bean
    public Binding queueC_BindingX(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XC");
    }
}

4.3生产者

package com.zyh.controller;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.annotation.Resources;
import java.util.Date;
/**
 * @author zengyihong
 * @create 2022--10--04 19:36
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 生产者发送消息
     *
     * @param message
     */
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //记录日志
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), message);
        //给QA队列发送消息
        rabbitTemplate.convertSendAndReceive("X", "XA", "消息来自TTL为10秒的队列:" + message);
        rabbitTemplate.convertSendAndReceive("X", "XB", "消息来自TTL为40秒的队列:" + message);
    }
    /**
     * 生产者发送消息(动态设置有效期)
     *
     * @param message
     */
    @GetMapping("/sendMessage/{message}/{ttlTime}")
    public void sendMessage(@PathVariable String message, @PathVariable String ttlTime) {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息有效期
                message.getMessageProperties().setExpiration(ttlTime);
                return message;
            }
        };
        //记录日志
        log.info("当前时间:{},发送一条时长{}毫秒信息给队列QC:{}", new Date(),ttlTime, message);
        //给QC队列发送消息
        rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
    }
}

4.4消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D)
    public void receiveQD(Message message, Channel channel){
        //获取消息
        String msg=new String(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}",new Date(),msg);
    }
}

4.5测试

启动boot项目

image.jpg

在浏览器输入

http://localhost:8080/ttl/sendMessage/Hello/20000
http://localhost:8080/ttl/sendMessage/你好/2000

image.jpg

如果在消息属性上设置TTL的方式,那么消息可能不会按时死亡,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

到此这篇关于详解RabbitMQ延迟队列的基本使用和优化的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解Java中的实例初始化块(IIB)

    详解Java中的实例初始化块(IIB)

    在本篇文章里我们针对Java中的实例初始化块(IIB)做想详细分析,有需要的朋友们可以跟着学习参考下。
    2018-10-10
  • SWT(JFace)体验之StyledText类

    SWT(JFace)体验之StyledText类

    有的时候Text需要实现这种那种的样式。先提供在不使用StyledText类的情况:
    2009-06-06
  • Springmvc异常映射2种实现方法

    Springmvc异常映射2种实现方法

    这篇文章主要介绍了Springmvc异常映射2种实现方法以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。,需要的朋友可以参考下
    2020-05-05
  • spring bean标签的primary属性用法讲解

    spring bean标签的primary属性用法讲解

    这篇文章主要介绍了spring bean标签的primary属性用法讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 基于SpringBoot实现自定义插件的流程详解

    基于SpringBoot实现自定义插件的流程详解

    在SpringBoot中,插件是一种扩展机制,它可以帮助我们在应用程序中快速地添加一些额外的功能,在本文中,我们将介绍如何使用 SpringBoot实现自定义插件,需要的朋友可以参考下
    2023-06-06
  • Java线程状态运行原理解析

    Java线程状态运行原理解析

    这篇文章主要介绍了Java线程状态运行原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • spring data jpa分页查询示例代码

    spring data jpa分页查询示例代码

    本篇文章主要介绍了spring data jpa分页查询示例代码,分页在很多项目中都能使用,具有一定的参考价值,有兴趣的可以了解一下。
    2017-03-03
  • java数据结构与算法之奇偶排序算法完整示例

    java数据结构与算法之奇偶排序算法完整示例

    这篇文章主要介绍了java数据结构与算法之奇偶排序算法,较为详细的分析了奇偶算法的原理并结合完整示例形式给出了实现技巧,需要的朋友可以参考下
    2016-08-08
  • Springboot整合Thymeleaf引入公共的CSS和JS文件的方法及注意点

    Springboot整合Thymeleaf引入公共的CSS和JS文件的方法及注意点

    有时候很多css文件是公共的,我们必须要在每个html文件中引入它们,下面这篇文章主要给大家介绍了关于Springboot整合Thymeleaf引入公共的CSS和JS文件的方法及注意点,需要的朋友可以参考下
    2024-06-06
  • java基础之String知识总结

    java基础之String知识总结

    今天带大家来回顾一下Java基础,文中详细总结了String的相关知识,对正在学习java基础的小伙伴们有很好的帮助,需要的朋友可以参考下
    2021-05-05

最新评论