Java中RabbitMQ延迟队列实现详解

 更新时间:2023年09月20日 10:13:11   作者:CD4356  
这篇文章主要介绍了Java中RabbitMQ延迟队列实现详解,消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可,需要的朋友可以参考下

一、RabbitMQ延迟队列实现

1.1、RabbitMQ延迟队列实现流程

cd

  1. 生产者生产一条延迟消息,根据延迟时间的不同,利用不同的routing-key将消息路由到不同的延迟队列,每个队列都设置了不同的 TTL 属性 ( TTL ( Time To Live ) 生存时间 ),并绑定到同一个死信交换机中。
  2. 消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可。

1.2、配置RabbitMQ连接

#[ RabbitMQ相关配置 ]
#rabbitmq服务器IP
spring.rabbitmq.host=安装RabbitMQ的服务器IP
#rabbitmq服务器端口(默认为5672)
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#用户密码
spring.rabbitmq.password=guest
#虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列)
#vhost虚拟主机地址( 默认为/ )
spring.rabbitmq.virtual-host=/

1.3、创建配置类

配置两个交换机、四个队列、以及根据路由键配置交换机和队列的绑定关系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfiguration {
    //延迟交换机
    public static final String DELAY_EXCHANGE = "delay_exchange";
    //延迟队列A
    public static final String DELAY_QUEUE_A = "delay_queue_a";
    //延迟队列B
    public static final String DELAY_QUEUE_B = "delay_queue_b";
    //延迟路由键10S
    public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key";
    //延迟路由键60S
    public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key";
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    //死信队列A
    public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a";
    //死信队列B
    public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b";
    //死信路由键10S
    public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key";
    //死信路由键60S
    public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key";
    //延迟交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE, true, false);
    }
    //延迟队列A
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>();
        //设置延迟队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //设置延迟队列绑定的死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
        //设置延迟队列的 TTL 消息存活时间
        args.put("x-message-ttl", 10*1000);
        return new Queue(DELAY_QUEUE_A, true, false, false, args);
    }
    //延迟队列B
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>();
        //设置延迟队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //设置延迟队列绑定的死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
        //设置延迟队列的 TTL 消息存活时间
        args.put("x-message-ttl", 60*1000);
        return new Queue(DELAY_QUEUE_B, true, false, false, args);
    }
    //延迟队列A的绑定关系
    @Bean("delayBindingA")
    public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY);
    }
    //延迟队列B的绑定关系
    @Bean("delayBindingB")
    public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY);
    }
    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }
    //死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A, true);
    }
    //死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B, true);
    }
    //死信队列A的绑定关系
    @Bean("deadLetterBindingA")
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
                                 @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
    }
    //死信队列B的绑定关系
    @Bean("deadLetterBindingB")
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
                                      @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
    }
}

1.4、创建一个枚举类来配置延迟类型

@Getter
@AllArgsConstructor
public enum DelayTypeEnum {
    //10s
    DELAY_10s(1),
    //60s
    DELAY_60s(2);
    private Integer type;
    /**
     * 延迟类型
     * @param type
     * @return 延迟类型
     */
    public static DelayTypeEnum getDelayTypeEnum(Integer type){
        if(Objects.equals(type, DELAY_10s.type)){
            return DELAY_10s;
        }
        if(Objects.equals(type, DELAY_60s.type)){
            return DELAY_60s;
        }
        return null;
    }
}

1.5、创建生产者类发送消息

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY;
/**
 * 延迟消息生产者
 */
@Component
public class DelayMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送延迟消息
     * @param message  要发送的消息
     * @param type  延迟类型(延时10s的延迟队列 或 延时60s的延迟队列)
     */
    public void sendDelayMessage(String message, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message);
                break;
            default:
                break;
        }
    }
}

1.6、创建消费者类消费消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 监听死信队列A
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_a")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_A)
    public void receiveA(Message message) {
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列A收到的消息:{}", LocalDateTime.now(), msg);
    }
    /**
     * 监听死信队列B
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_b")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_B)
    public void receiveB(Message message){
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},死信队列B收到的消息:{}", LocalDateTime.now(), msg);
    }
}

1.7、创建控制类

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Objects;
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private DelayMessageProducer producer;
    @RequestMapping("/send")
    public void send(String message, Integer delayType){
        // 记录日志
        log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, delayType);
        // 发送延迟消息
        producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType)));
    }
}

1.8、测试

在浏览器中先后提交下面两个请求:

1)localhost:8080/rabbitmq/send?message=测试自定义延迟处理60s&delayType=2

2)localhost:8080/rabbitmq/send?message=测试自定义延迟处理10s&delayType=1

查看idea控制台:

cd

到此这篇关于Java中RabbitMQ延迟队列实现详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java实现斐波那契数列的3种方法

    java实现斐波那契数列的3种方法

    这篇文章主要介绍了java实现斐波那契数列的3种方法,有需要的朋友可以参考一下
    2014-01-01
  • 浅析JavaMail发送邮件后再通过JavaMail接收格式问题

    浅析JavaMail发送邮件后再通过JavaMail接收格式问题

    这篇文章主要介绍了JavaMail发送邮件后再通过JavaMail接收格式问题 ,本文通过代码实例给大家详细解说,需要的朋友可以参考下
    2019-06-06
  • Java算法之位图的概念和实现详解

    Java算法之位图的概念和实现详解

    这篇文章主要介绍了Java算法之位图的概念和实现详解,位图可以利用每一位来对应一个值,比如可以利用int类型的数去存储0~31这个集合的数字,如果该集合内的数字存在,则把对应的位设置位1默认为0,需要的朋友可以参考下
    2023-10-10
  • Mybatisplus自动填充实现方式及代码示例

    Mybatisplus自动填充实现方式及代码示例

    这篇文章主要介绍了Mybatisplus自动填充实现方式及代码示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • SpringBoot如何使用ApplicationContext获取bean对象

    SpringBoot如何使用ApplicationContext获取bean对象

    这篇文章主要介绍了SpringBoot 如何使用ApplicationContext获取bean对象,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • java线程并发countdownlatch类使用示例

    java线程并发countdownlatch类使用示例

    javar的CountDownLatch是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。
    2014-01-01
  • SpringBoot数据层处理方案精讲

    SpringBoot数据层处理方案精讲

    这篇文章主要介绍了SpringBoot数据层技术的解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-10-10
  • springboot运行jar生成的日志到指定文件进行管理方式

    springboot运行jar生成的日志到指定文件进行管理方式

    这篇文章主要介绍了springboot运行jar生成的日志到指定文件进行管理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • Java调用用户芝麻信用分

    Java调用用户芝麻信用分

    这篇文章主要为大家详细介绍了Java调用用户芝麻信用分,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-11-11
  • Java Reference源码解析

    Java Reference源码解析

    这篇文章主要为大家详细解析了Java Reference源码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-03-03

最新评论