如何利用rabbitMq的死信队列实现延时消息

 更新时间:2023年01月18日 15:43:03   作者:好大的月亮  
这篇文章主要介绍了如何利用rabbitMq的死信队列实现延时消息问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

前言

使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期。

mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信.

mq基本的消息模型

mq死信队列的消息模型

简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。

然后监听这个死信队列的消费.

一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)

通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.

maven依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

配置普通队列和死信队列

yml文件

spring:
  application:
    name: ttl-queue
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan
    connection-timeout: 15000
    # 发送确认
    #publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 签收模式为手动签收-那么需要在代码中手动ACK
        acknowledge-mode: manual
package com.fchan.mq.mqDelay.dlx;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabConfig {

    //定义普通任务队列(其实就是一个普通队列,只是没有消费者)
    @Bean("delayQue")
    public Queue delayQue(){
        //普通队列绑定死信交换机及路由key

        //delayQueue延时队列
        return QueueBuilder.durable("delayQueue")
                //指定这个延时队列下的死信交换机
                //如果消息过时则会被投递到当前对应的my-dlx-exchange
                .withArgument("x-dead-letter-exchange", "my-dlx-exchange")
                //死信交换机绑定的路由key
                //如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列
                //mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式
                .withArgument("x-dead-letter-routing-key", "routing-key-delay")
                .build();
    }

    //定义普通队列的交换机
    @Bean("delayExchange")
    public Exchange delayExchange(){
        return ExchangeBuilder.directExchange("delayExchange").build();
    }

    //绑定普通队列的交换机
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
    }



    //定义死信队列
    @Bean("dlxQueue")
    public Queue dlxQueue(){
        return QueueBuilder.durable("my-dlx-queue").build();
    }


    //定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致
    @Bean("dlxExchange")
    public Exchange dlxExchange(){
        return ExchangeBuilder.directExchange("my-dlx-exchange").build();
    }


    //绑定交换机与死信队列
    @Bean("dlxBinding")
    public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
        //这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致
        return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
    }

}

死信队列消费者

package com.fchan.mq.mqDelay;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = {"my-dlx-queue"})
    public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
        log.info("收到死信信息:{}",map);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
		//手动确认消息        
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

发送消息测试

/**
 * 发送消息到延时队列 delayQueue
 * 消息5秒后会被投递到死信队列
 * @return
 */
@GetMapping("sendMsgToDelay")
public String sendMsgToDelay(String msg){
    Map<String,Object> map = new HashMap<>();
    map.put("msg", msg);
    rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //设置消息过期时间,5秒过期
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    });
    return msg;
}

测试成功

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot+微信小程序实现文件上传与下载功能详解

    SpringBoot+微信小程序实现文件上传与下载功能详解

    这篇文章主要为大家介绍了SpringBoot整合微信小程序实现文件上传与下载功能,文中的实现步骤讲解详细,快跟随小编一起学习一下吧
    2022-03-03
  • Java微服务的打包问题解决

    Java微服务的打包问题解决

    本文主要介绍了Java微服务的打包问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • Java后台如何处理日期参数格式

    Java后台如何处理日期参数格式

    这篇文章主要介绍了Java后台如何处理日期参数格式问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Java中序列化和反序列化的完整讲解

    Java中序列化和反序列化的完整讲解

    序列化是将对象转换成二进制字节流的过程;反序列化是从二进制字节流中恢复对象的过程。文中将为大家详细讲讲二者的原理与实现,需要的可以参考一下
    2022-11-11
  • 解决javaBean规范导致json传参首字母大写将永远获取不到问题

    解决javaBean规范导致json传参首字母大写将永远获取不到问题

    这篇文章主要介绍了解决javaBean规范导致json传参首字母大写将永远获取不到问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Spring实战之FileSystemResource加载资源文件示例

    Spring实战之FileSystemResource加载资源文件示例

    这篇文章主要介绍了Spring实战之FileSystemResource加载资源文件,结合实例形式分析了spring FileSystemResource加载xml资源文件的具体实现步骤与相关操作技巧,需要的朋友可以参考下
    2019-12-12
  • IDEA Ultimate2020.2版本配置Tomcat详细教程

    IDEA Ultimate2020.2版本配置Tomcat详细教程

    这篇文章主要介绍了IDEA Ultimate2020.2版本配置Tomcat教程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • 解决feign调用接口不稳定的问题

    解决feign调用接口不稳定的问题

    这篇文章主要介绍了解决feign调用接口不稳定的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • StackTraceElement获取方法调用栈信息实例详解

    StackTraceElement获取方法调用栈信息实例详解

    这篇文章主要介绍了StackTraceElement获取方法调用栈信息实例详解,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-02-02
  • Springboot如何使用mybatis实现拦截SQL分页

    Springboot如何使用mybatis实现拦截SQL分页

    这篇文章主要介绍了Springboot使用mybatis实现拦截SQL分页,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06

最新评论