RabbitMQ实现延时消息的两种方法实战教程

 更新时间:2023年09月16日 11:17:59   作者:柳落青  
这篇文章主要介绍了RabbitMQ实现延时消息的两种方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

RabbitMQ实现延时消息的两种方法

1、死信队列

1.1消息什么时候变为死信(dead-letter)

  • 消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue 重回队列属性设为false。
  • 消息在队列里得时间超过了该消息设置的过期时间(TTL)。
  • 消息队列到达了它的最大长度,之后再收到的消息。

1.2死信队列的原理

当一个消息再队列里变为死信时,它会被重新publish到另一个exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个交换机就可以了。

1.3 代码实现

引入amqp依赖

声明交换机,队列

package com.lank.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
    //死信交换机,队列,路由相关配置
    public static final String DLK_EXCHANGE = "dlk.exchange";
    public static final String DLK_ROUTEKEY = "dlk.routeKey";
    public static final String DLK_QUEUE = "dlk.queue";
    //业务交换机,队列,路由相关配置
    public static final String DEMO_EXCHANGE = "demo.exchange";
    public static final String DEMO_QUEUE = "demo.queue";
    public static final String DEMO_ROUTEKEY = "demo.routeKey";
    //延时插件DelayedMessagePlugin的交换机,队列,路由相关配置
    public static final String DMP_EXCHANGE = "dmp.exchange";
    public static final String DMP_ROUTEKEY = "dmp.routeKey";
    public static final String DMP_QUEUE = "dmp.queue";
    @Bean
    public DirectExchange demoExchange(){
        return new DirectExchange(DEMO_EXCHANGE,true,false);
    }
    @Bean
    public Queue demoQueue(){
        //只需要在声明业务队列时添加x-dead-letter-exchange,值为死信交换机
        Map<String,Object> map = new HashMap<>(1);
        map.put("x-dead-letter-exchange",DLK_EXCHANGE);
        //该参数x-dead-letter-routing-key可以修改该死信的路由key,不设置则使用原消息的路由key
        map.put("x-dead-letter-routing-key",DLK_ROUTEKEY);
        return new Queue(DEMO_QUEUE,true,false,false,map);
    }
    @Bean
    public Binding demoBind(){
        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY);
    }
    @Bean
    public DirectExchange dlkExchange(){
        return new DirectExchange(DLK_EXCHANGE,true,false);
    }
    @Bean
    public Queue dlkQueue(){
        return new Queue(DLK_QUEUE,true,false,false);
    }
    @Bean
    public Binding dlkBind(){
        return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY);
    }
    //延迟插件使用
    //1、声明一个类型为x-delayed-message的交换机
    //2、参数添加一个x-delayed-type值为交换机的类型用于路由key的映射
    @Bean
    public CustomExchange dmpExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments);
    }
    @Bean
    public Queue dmpQueue(){
        return new Queue(DMP_QUEUE,true,false,false);
    }
    @Bean
    public Binding dmpBind(){
        return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs();
    }
}

声明一个类用于发送带过期时间的消息

package com.lank.demo.rabbitmq;
import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 1. @author lank
 2. @since 2020/12/14 10:33
 */
@Component
@Slf4j
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //使用死信队列发送消息方法封装
    public void send(String message,Integer time){
        String ttl = String.valueOf(time*1000);
        //exchange和routingKey都为业务的就可以,只需要设置消息的过期时间
        rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息的过期时间,是以毫秒为单位的
                message.getMessageProperties().setExpiration(ttl);
                return message;
            }
        });
        log.info("使用死信队列消息:{}发送成功,过期时间:{}秒。",message,time);
    }
    //使用延迟插件发送消息方法封装
    public void send2(String message,Integer time){
        rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
            //使用延迟插件只需要在消息的header中添加x-delay属性,值为过期时间,单位毫秒
                message.getMessageProperties().setHeader("x-delay",time*1000);
                return message;
            }
        });
        log.info("使用延迟插件发送消息:{}发送成功,过期时间:{}秒。",message,time);
    }
}

编写一个类用于消费消息

package com.lank.demo.rabbitmq;
import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MessageReceiver {
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE)
    public void onMessage(Message message){
        log.info("使用死信队列,收到消息:{}",new String(message.getBody()));
    }
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE)
    public void onMessage2(Message message){
        log.info("使用延迟插件,收到消息:{}",new String(message.getBody()));
    }
}

编写Controller调用发送消息方法测试结果

package com.lank.demo.controller;
import com.lank.demo.rabbitmq.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
    @Autowired
    public MessageSender messageSender;
    //死信队列controller
    @GetMapping("/send")
    public String send(@RequestParam String msg,Integer time){
        messageSender.send(msg,time);
        return "ok";
    }
    //延迟插件controller
    @GetMapping("/send2")
    public String sendByPlugin(@RequestParam String msg,Integer time){
        messageSender.send2(msg,time);
        return "ok";
    }
}

配置文件application.properties

server.port=4399
#virtual-host使用默认的/就好,如果需要/demo需自己在控制台添加
spring.rabbitmq.virtual-host=/demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

启动项目,打开rabbitmq控制台,可以看到交换机和队列已经创建好。

在这里插入图片描述

在这里插入图片描述

在浏览器中请求http://localhost:4399/send?msg=hello&time=5,从控制台的输出来看,刚好5s后接收到消息。

2020-12-16 22:47:28.071  INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:hello发送成功,过期时间:5秒。
2020-12-16 22:47:33.145  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:hello

1.4死信队列的一个小注意点

当我往死信队列中发送两条不同过期时间的消息时,如果先发送的消息A的过期时间大于后发送的消息B的过期时间时,由于消息的顺序消费,消息B过期后并不会立即重新publish到死信交换机,而是会等到消息A过期后一起被消费。

依次发送两个请求http://localhost:4399/send?msg=消息A&time=30和http://localhost:4399/send?msg=消息B&time=10,消息A先发送,过期时间30S,消息B后发送,过期时间10S,我们想要的结果应该是10S收到消息B,30S后收到消息A,但结果并不是,控制台输出如下:

2020-12-16 22:54:47.339  INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:消息A发送成功,过期时间:30秒。
2020-12-16 22:54:54.278  INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:消息B发送成功,过期时间:10秒。
2020-12-16 22:55:17.356  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:消息A
2020-12-16 22:55:17.357  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:消息B

消息A30S后被成功消费,紧接着消息B被消费。因此当我们使用死信队列时应该注意是否消息的过期时间都是一样的,比如订单超过10分钟未支付修改其状态。如果当一个队列各个消息的过期时间不一致时,使用死信队列就可能达不到延时的作用。这时候我们可以使用延时插件来实现这需求。

2 、延时插件

RabbitMQ Delayed Message Plugin是一个rabbitmq的插件,所以使用前需要安装它,可以参考的GitHub地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2.1如何实现

  • 安装好插件后只需要声明一个类型type为"x-delayed-message"的exchange,并且在其可选参数下配置一个key为"x-delayed-typ",值为交换机类型(topic/direct/fanout)的属性。
  • 声明一个队列绑定到该交换机
  • 在发送消息的时候消息的header里添加一个key为"x-delay",值为过期时间的属性,单位毫秒。
  • 代码就在上面,配置类为DMP开头的,发送消息的方法为send2()。
  • 启动后在rabbitmq控制台可以看到一个类型为x-delayed-message的交换机。

在这里插入图片描述

在这里插入图片描述

继续在浏览器中发送两个请求http://localhost:4399/send2?msg=消息A&time=30和http://localhost:4399/send2?msg=消息B&time=10,控制台输出如下,不会出现死信队列出现的问题:

2020-12-16 23:31:19.819  INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延迟插件发送消息:消息A发送成功,过期时间:30秒。
2020-12-16 23:31:27.673  INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延迟插件发送消息:消息B发送成功,过期时间:10秒。
2020-12-16 23:31:37.833  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延迟插件,收到消息:消息B
2020-12-16 23:31:49.917  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延迟插件,收到消息:消息A

到此这篇关于RabbitMQ实现延时消息的两种方法的文章就介绍到这了,更多相关RabbitMQ延时消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java高级之HashMap中的entrySet()方法使用

    Java高级之HashMap中的entrySet()方法使用

    这篇文章主要介绍了Java高级之HashMap中的entrySet()方法使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • Java数据结构及算法实例:朴素字符匹配 Brute Force

    Java数据结构及算法实例:朴素字符匹配 Brute Force

    这篇文章主要介绍了Java数据结构及算法实例:朴素字符匹配 Brute Force,本文直接给出实例代码,代码中包含详细注释,需要的朋友可以参考下
    2015-06-06
  • 基于Java8实现提高Excel读写效率

    基于Java8实现提高Excel读写效率

    这篇文章主要介绍了基于Java8实现提高Excel读写效率,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • Mybatis空值关联的具体实现

    Mybatis空值关联的具体实现

    在复杂的数据库查询中,处理空值关联是一项常见的需求,本文就来介绍一下Mybatis空值关联的具体实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-07-07
  • 自己编写IOC控制反转及AOP面向切面

    自己编写IOC控制反转及AOP面向切面

    本文展示通过一个案例来自己手写IOC和AOP代码,通过银行转账案例详细的代码编写和文档解释来说明IOC和AOP的思想,会分享存在的问题和解决问题的思路
    2021-06-06
  • Toolbar制作菜单条过程详解

    Toolbar制作菜单条过程详解

    Toolbar制作菜单条过程详解...
    2006-12-12
  • java地理坐标系及投影间转换代码示例

    java地理坐标系及投影间转换代码示例

    在地图投影中,经常需要将坐标从不同的坐标系之间进行转换,下面这篇文章主要给大家介绍了关于java地理坐标系及投影间转换的相关资料,需要的朋友可以参考下
    2024-08-08
  • Springboot项目如何使用apollo配置中心

    Springboot项目如何使用apollo配置中心

    这篇文章主要介绍了Springboot项目如何使用apollo配置中心,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • 深入讲解SPI 在 Spring 中的应用

    深入讲解SPI 在 Spring 中的应用

    这篇文章主要介绍了深入讲解SPI在Spring中的应用,SPI是Java内置的一种服务提供发现机制,可以用来提高框架的扩展性,主要用于框架的开发中
    2022-06-06
  • Mac下设置Java默认版本的方法

    Mac下设置Java默认版本的方法

    今天工作的时候发现了一个错误,提示java版本太低,无法启动!想起自己装过高版本的Java,但是却没有默认启动,从网上找了一些资料,整理下现在分享给大家,有需要的可以参考借鉴。
    2016-10-10

最新评论