Springboot整合RabbitMq测试TTL的方法详解
什么是TTL?
在RabbitMq
中,存在一种高级特性
TTL
。
TTL
即Time To Live
的缩写,含义为存活时间
或者过期时间
。即:
设定消息在队列中存活的时间。
当指定时间内
,消息依旧未被消费
,则由队列自动将其删除。
如何设置TTL?
既然涉及到设定消息的存活时间
,在RabbitMq
中,存在两种设置方式:
- 设置整个队列的过期时间。
- 设置单个消息的过期时间。
设定整个队列的过期时间
按照上一篇文章的依赖导入和配置编写方式进行。
配置类编写
在原有基础之上,新创建几个配置的bean
类,申明bean对象,并进行交换机
与队列
的关联,如下所示:、
package cn.linkpower.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfiguration { // =========================== Direct 直连模式 ================================== //队列名称 public static final String QUEUQ_NAME = "xiangjiao.queue"; //交换器名称 public static final String EXCHANGE = "xiangjiao.exchange"; //路由key public static final String ROUTING_KEY = "xiangjiao.routingKey"; // =========================== Direct 普通队列申明 和 交换机绑定 =================== //创建队列 @Bean(value = "getQueue") public Queue getQueue(){ //QueueBuilder.durable(QUEUQ_NAME).build(); return new Queue(QUEUQ_NAME); } //实例化交换机 @Bean(value = "getDirectExchange") public DirectExchange getDirectExchange(){ //DirectExchange(String name, boolean durable, boolean autoDelete) /** * 参数一:交换机名称;<br> * 参数二:是否永久;<br> * 参数三:是否自动删除;<br> */ //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); return new DirectExchange(EXCHANGE, true, false); //绑定消息队列和交换机 @Bean public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange, @Qualifier(value = "getQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); // =========================== TTL ================================ public static final String ttl_queue_name = "xiangjiao.ttl.queue"; public static final String ttl_exchange_name = "xiangjiao.ttl.exchange"; public static final String ttl_routing_key = "xiangjiao.ttl.routingKey"; @Bean(value = "getTtlQueue") public Queue getTtlQueue(){ // 设置 ttl 队列,并设定 x-message-ttl 参数,表示 消息存活最大时间,单位 ms //return QueueBuilder.durable(ttl_queue_name).withArgument("x-message-ttl",10000).build(); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl",10000); return new Queue(ttl_queue_name,true,false,false,arguments); @Bean(value = "getTTlExchange") public DirectExchange getTTlExchange(){ // 设置交换机属性,并保证交换机持久化 return new DirectExchange(ttl_exchange_name, true, false); public Binding bindExchangeAndQueueTTL(@Qualifier(value = "getTTlExchange") DirectExchange getTTlExchange, @Qualifier(value = "getTtlQueue") Queue queue){ return BindingBuilder.bind(queue).to(getTTlExchange).with(ttl_routing_key); }
对比原有的配置类,不难发现区别:
对
队列
设置过期属性
,只需要传递一个x-message-ttl
的属性值即可。(单位:ms)
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl",10000); return new Queue(ttl_queue_name,true,false,false,arguments);
然后定义交换机类型,并将指定的交换机和队列进行绑定。
为了测试效果,暂未定义任何该队列的消费者信息。
测试
为了便于测试,需要定义一个接口,生产新的数据信息,并将数据向对应的Exchange
中传递。
/** * 发送消息,指定ttl参数信息(队列) * @return */ @RequestMapping("/sendQueueTtl") @ResponseBody public String sendQueueTtl(){ //发送10条消息 for (int i = 0; i < 10; i++) { String msg = "msg"+i; System.out.println("发送消息 msg:"+msg); rabbitmqService.sendMessage(MQConfiguration.ttl_exchange_name,MQConfiguration.ttl_routing_key,msg); //每两秒发送一次 try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } } return "send ok"; }
两条消息之间的过期时间为
8s
。
请求链接进行测试,查看Rabbitmq web
视图信息:
http://localhost/sendQueueTtl
查看控制台输出日志:
消息正常发送到了Exchange
,同时Exchange 也将消息推送到了指定的队列 !
设置有
Confirm
和Return
监听。
【说明:】
给队列设定时间后,单位时间内的消息如果未被消费,则队列会将其中的数据进行删除处理。
对单个消息设定过期时间
上面的操作和测试,已经验证对队列
设定过期时间
,会导致所有的消息过期时间都是一样
的现象。
但实际开发中,可能一个队列
需要存放不同过期时间
的消息信息,如果需要进行实现,就不能再设定队列的过期时间信息了,需要采取下面要说到的针对单个消息,设置不同过期时间
。
配置
既然是针对单个消息
设定不同的过期时间
操作,则需要去掉队列过期
设置。
为了测试的简单化,此处采取直连 Direct 交换机
类型,进行交换机和队列数据的绑定方式。如下所示:
// =========================== Direct 直连模式 ================================== //队列名称 public static final String QUEUQ_NAME = "xiangjiao.queue"; //交换器名称 public static final String EXCHANGE = "xiangjiao.exchange"; //路由key public static final String ROUTING_KEY = "xiangjiao.routingKey"; // =========================== Direct 普通队列申明 和 交换机绑定 =================== //创建队列 @Bean(value = "getQueue") public Queue getQueue(){ //QueueBuilder.durable(QUEUQ_NAME).build(); return new Queue(QUEUQ_NAME); } //实例化交换机 @Bean(value = "getDirectExchange") public DirectExchange getDirectExchange(){ //DirectExchange(String name, boolean durable, boolean autoDelete) /** * 参数一:交换机名称;<br> * 参数二:是否永久;<br> * 参数三:是否自动删除;<br> */ //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); return new DirectExchange(EXCHANGE, true, false); } //绑定消息队列和交换机 @Bean public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange, @Qualifier(value = "getQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
对于消息的发送,依旧沿用之前写的发送处理方式
设定
confirm
和return
监听,保证消息能够正常到达指定的队列中。
测试
编写一个测试的接口,设定单个消息的过期时间属性
,保证不同消息具备不同的过期时间
。
在之前博客中,针对消息的持久化
设置,需要保证消息向队列设定属性时,传递一个deliveryMode
参数值信息。
同理,设定每个消息的过期时间,也需要设定对应的属性信息。如下所示:
/** * 发送消息,指定ttl参数信息(单个消息); * 测试需要将消息消费者关闭监听 * @return */ @RequestMapping("/sendTtl") @ResponseBody public String sendTtl(){ //发送10条消息 for (int i = 0; i < 10; i++) { String msg = "msg"+i; System.out.println("发送消息 msg:"+msg); MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000"); // 针对消息设定时限 // 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中 Message message = new Message(msg.getBytes(), messageProperties); rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, MQConfiguration.ROUTING_KEY,message); //每两秒发送一次 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } return "send ok"; }
上面代码的编写核心为将消息内容体和消息对象属性进行封装
。
MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000"); // 针对消息设定时限 // 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中 Message message = new Message(msg.getBytes(), messageProperties);
引申一点:消息的持久化
Springboot 2.x ——RabbitTemplate为什么会默认消息持久化?
请求连接进行测试:
http://localhost/sendTtl
查看控制台打印日志情况:
总结
1、设置队列过期
时间使用参数:x-message-ttl
,单位:ms
(毫秒),会对整个队列消息统一过期
。
2、设置消息过期
时间使用参数:expiration
。单位:ms
(毫秒),当该消息在队列头部
时(消费时),会单独判断
这一消息是否过期
。
3、如果两者都进行了设置,以时间短的为准。
代码下载
到此这篇关于Springboot整合RabbitMq测试TTL的文章就介绍到这了,更多相关Springboot整合RabbitMq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Spring扩展之基于HandlerMapping实现接口灰度发布实例
这篇文章主要介绍了Spring扩展之基于HandlerMapping实现接口灰度发布实例,灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式,灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度,需要的朋友可以参考下2023-08-08Java属性文件操作之Properties与ResourceBundle详解
这篇文章主要介绍了Java属性文件操作之Properties与ResourceBundle详解,两个类都可以读取属性文件中以key/value形式存储的键值对,ResourceBundle读取属性文件时操作相对简单,需要的朋友可以参考下2023-11-11SpringCloud应用idea实现可相互调用的多模块程序详解
IDEA 全称 IntelliJ IDEA,是java编程语言的集成开发环境。IntelliJ在业界被公认为最好的Java开发工具,尤其在智能代码助手、代码自动提示、重构、JavaEE支持、各类版本工具(git、svn等)、JUnit、CVS整合、代码分析、 创新的GUI设计等方面的功能可以说是超常的2022-07-07
最新评论