Java RabbitMQ的TTL和DLX全面精解

 更新时间:2021年09月23日 08:59:31   作者:没头脑遇到不高兴  
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。DLX, 可以称之为死信交换机,当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列

本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机、死信队列)

RabbitMQ的TTL

1、TTL概述

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。设置TTL有两种方式:

  1. 第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;
  2. 第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL。

如果两种方式都设置了,则以设置的较小的为准。两者的区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。至于为啥要这样处理很容易想清楚:第一种方式队列的消息有效期都一样,先入队的在队列头部,头部也是最早要过期的消息,RabbitMQ起一个定时任务从队列的头部开始扫描是否有过期消息即可;第二种方式每条消息的过期时间不同,所以只有遍历整个队列才可以筛选出来过期的消息,这样效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投递给消费者时再判断删除,虽然删除的不够及时但是不影响功能,其实就是用空间换时间。

如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

2、设置消息有效期

2.1、通过队列设置有效期

还记得我们之前声明队列的方法吗,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),该方法的最后一个参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。如果不清楚队列属性有哪些,可以查看web控制台的添加队列的地方。

具体代码如下:

//设置队列上所有的消息的有效期,单位为毫秒
Map<String, Object> argss = new HashMap<String , Object>();
arguments.put("x-message-ttl " , 5000);//5秒钟
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

查看控制台的队列列表如下:D表示持久化,TTL表示设置了消息的有效期。

过了几秒钟后发现消息已经不存在了。

也可以用RabbitMQ的命令行模式来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

还可以通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' 
http://ip:15672/api/queues/{vhost}/{queuename}

2.2、通过发送消息时设置有效期

发送消息时basicPublish方法可以设置属性参数,里面通过expiration属性设置消息有效期,单位为毫秒,代码如下所示

Builder bd = new AMQP.BasicProperties().builder();
bd.deliveryMode(2);//持久化
bd.expiration("100000");//设置消息有效期100秒钟
BasicProperties pros = bd.build();
String message = "测试ttl消息";
channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

另外也可以通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}'  
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

完整的通过队列设置消息有效期、发布消息时通过属性设置有效期的代码如下:可以运行后,观察下控制台,可以发现同时设置时,消息的有效期是以较小的为准的。项目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git

package cn.wkp.rabbitmq.newest.ttl;
 
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
/**
 * 
 * @ClassName: Send
 * @Description: 消息有效期 
 * @author wkg
 * @date: 2019年4月1日 下午11:28:22
 */
public class Send {
 
	private final static String EXCHANGE_NAME = "ttl_exchange";
	private final static String QUEUE_NAME = "ttl_queue";
 
	public static void main(String[] argv) throws Exception {
		// 获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		// 从连接中创建通道
		Channel channel = connection.createChannel();
 
		// 声明交换机
		channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
		
		//*****1:通过队列设置有效期 2:通过消息属性设置有效期,如果都设置了以较小的为准*****
		//声明队列
		Map<String, Object> arguments=new HashMap<String,Object>();
		//设置队列上所有的消息的有效期,单位为毫秒
		arguments.put("x-message-ttl", 5000);//5秒钟
		channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
		//绑定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		
		Builder bd = new AMQP.BasicProperties().builder();
		bd.deliveryMode(2);//持久化
		bd.expiration("100000");//设置消息有效期100秒钟
		BasicProperties pros = bd.build();
		String message = "测试ttl消息";
		channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
		System.out.println("Sent message:" + message);
//		 关闭通道和连接
		channel.close();
		connection.close();
	}
}

3、设置队列有效期(不常用,仅作了解)

上面在web管控台添加队列的时候,我们看到有一个x-expires参数,可以让队列在指定时间内 "未被使用" 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 18000);  //队列有效期18秒
channel.queueDeclare("myqueue", false, false, false, args);  

RabbitMQ的DLX

1、DLX是什么

DLX是Dead-Letter-Exchange的简写,意思是死信交换机。

它的作用其实是用来接收死信消息(dead message)的。那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  • 消息过期
  • 队列达到最大长度

当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。

2、DLX有什么用

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。DLX还有一个非常重要的作用,就是结合TTL实现延迟队列(延迟队列的使用范围还是挺广的:比如下单超过多长时间自动关闭;比如我们接入过第三方支付系统的同学一定知道,我们的订单中会传一个notify_url用于接收支付结果知,如果我们给第三方支付响应的不是成功的消息,其会隔一段时间继续调用通知我们的notify_url,超过几次后不再进行通知,一般通知频率都是 0秒-5秒-30秒-5分钟-30分钟-1小时-6小时-12小时;比如我们的家用电器定时关机。。。。。。这些场景都是可以用延迟队列实现的)。

3、DLX使用方式

下面在web管控台添加队列的时候,我们看到有两个DLX相关的参数:x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是设置队列的DLX的;x-dead-letter-routing-key是设置死信消息进入DLX时的routing key的,这个是可以不设置的,如果不设置,则默认使用原队列的routing key。

客户端可以通过channel.queueDeclare方法声明队列时设置x-dead-letter-exchange参数,具体代码如下所示

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置DLX
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)
//为队列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);

上面说的可能比较抽象,下面我们通过一个具体的例子,来演示一下DLX的具体使用:

package cn.wkp.rabbitmq.newest.dlx;
 
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class SendDLX {
 
	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明一个交换机,做死信交换机用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//声明一个队列,做死信队列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//队列绑定到交换机上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
		
		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键(可以不设置)
		//为队列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");
		
		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());
		System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));
		
		channel.close();
		connection.close();
	}
}

上面是发送者的代码,运行后观察控制台可以看到如下所示:

死信队列dlx_queue的绑定如下,其已与死信交换机dlx_exchange(topic类型)进行了绑定,routing key为"dlx.*"

队列normal_queue的绑定如下,其已与交换机normal_exchange(fanout类型)进行了绑定

queues视图如下:DLX和DLK表示设置给normal_queue设置了死信交换机和死信消息的routing key,我们看到消息已经被路由到了死信队列上面。整个流程为:

  • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上
  • 因为队列normal_queue没有消费者,消息过期后成为死信消息
  • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage
  • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

然后我们给死信队列添加消费者如下:我们测试一下死信消息进入DLX的时间,先将之前的那个死信消息删除

package cn.wkp.rabbitmq.newest.dlx;
 
import java.io.IOException;
import java.util.Date;
 
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class RecvDLX {
 
	public static void main(String[] argv) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		channel.queueDeclare("dlx_queue", true, false, false, null);
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
 
		// 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
		channel.basicQos(1);
 
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ConnectionUtil.formatDate(new Date()));
				// 消费者手动发送ack应答
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		System.out.println("消费死信队列中的消息======================");
		// 监听队列
		channel.basicConsume("dlx_queue", false, consumer);
	}
}

运行结果如下(先运行的死信队列消费者,然后运行生产者):我们看到消息过期后10毫秒就被死信队列的消费者消费到了,显然,消息成为死信后是立即被发送到了DLX中。

消费死信队列中的消息======================
消费者收到消息:测试死信消息,当前时间:2019-04-13 16:30:05:740

发送消息时间:2019-04-13 16:30:00:730

关于RabbitMQ的TTL和DLX就先介绍到这里,下一节会继续介绍RabbitMQ的高级特性:RabbitMQ的延迟队列。

参考 朱忠华《RabbitMQ实战指南》

到此这篇关于Java RabbitMQ的TTL和DLX全面精解的文章就介绍到这了,更多相关Java RabbitMQ TTL DLX内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • idea 实现git rebase操作应用场景

    idea 实现git rebase操作应用场景

    本文结合idea工具进行rebase的各种场景的操作,借助工具更能直观地观察到分支之间地操作差异,方便我们理解rebase的各种操作以及场景的使用,对idea  git rebase操作知识感兴趣的朋友一起看看吧
    2024-01-01
  • Java设计模式之责任链模式

    Java设计模式之责任链模式

    这篇文章介绍了Java设计模式之责任链模式,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-10-10
  • Java硬币翻转倍数递增试算实例

    Java硬币翻转倍数递增试算实例

    这篇文章主要介绍了Java硬币翻转倍数递增试算实例,有需要的朋友可以参考一下
    2013-12-12
  • 分析讲解Java Random类里的种子问题

    分析讲解Java Random类里的种子问题

    Random类中实现的随机算法是伪随机,也就是有规则的随机。在进行随机时,随机算法的起源数字称为种子数(seed),在种子数的基础上进行一定的变换,从而产生需要的随机数字
    2022-05-05
  • java控制台输出版多人聊天室

    java控制台输出版多人聊天室

    这篇文章主要为大家详细介绍了java控制台输出版多人聊天室,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-09-09
  • 使用Java第三方实现发送短信功能

    使用Java第三方实现发送短信功能

    这篇文章主要介绍了使用Java第三方实现发送短信功能,在一些开发中,经常需要有给用户发送短信接收验证码的功能,那么在Java中该如何实现呢,今天我们就一起来看一看
    2023-03-03
  • Springboot使用jsp具体案例解析

    Springboot使用jsp具体案例解析

    这篇文章主要介绍了Springboot使用jsp具体案例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • java中@SuppressWarnings注解用法详解

    java中@SuppressWarnings注解用法详解

    这篇文章主要介绍了java中@SuppressWarnings注解用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-02-02
  • Java的JSON格式转换库GSON的初步使用笔记

    Java的JSON格式转换库GSON的初步使用笔记

    GSON是Google开发并在在GitHub上开源的Java对象与JSON互转功能类库,在Android开发者中也大受欢迎,这里我们就来看一下Java的JSON格式转换库GSON的初步使用笔记:
    2016-06-06
  • Java基础知识精通循环结构与break及continue

    Java基础知识精通循环结构与break及continue

    循环结构是指在程序中需要反复执行某个功能而设置的一种程序结构。它由循环体中的条件,判断继续执行某个功能还是退出循环,选择结构用于判断给定的条件,根据判断的结果判断某些条件,根据判断的结果来控制程序的流程
    2022-04-04

最新评论