RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解
1. 概述
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都 有可能。此时可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还 没接收到这个消息的回调,那么你可以重发。
在实际项目中,可以利用这一机制保障消息的可靠性投递,如果消息未发送成功,可以在监听事件中记录日志、重新发送消息等操作。
2.原生API中开启Confirm消息确认机制
- 在生产者的channel上开启确认机制: channel.confirmSelect();
- 在channel上添加Confirm监听事件: channel.addConfirmListener(new ConfirmListener() ...
2.1 代码演示
生产者代码
监听事件的两个方法:handleAck() 消息投递成功后回调,handleNack 消息未成功投递回调
public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟主机 connectionFactory.setVirtualHost("/"); //创建一个链接 Connection connection = connectionFactory.newConnection(); //创建channel Channel channel = connection.createChannel(); //消息的确认模式 channel.confirmSelect(); String exchangeName="test_confirm_exchange"; String routeKey="confirm.test"; String msg="RabbitMQ send message confirm test!"; for (int i=0;i<5;i++){ channel.basicPublish(exchangeName,routeKey,null,msg.getBytes()); } //确定监听事件 channel.addConfirmListener(new ConfirmListener() { /** * 消息成功发送 * @param deliveryTag 消息唯一标签 * @param multiple 是否批量 * @throws IOException */ @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("**********Ack*********"); } /** * 消息没有成功发送 * @param deliveryTag * @param multiple * @throws IOException */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("**********No Ack*********"); } }); }
消费者端代码
public static void main(String[] args) throws Exception{ System.out.println("======消息接收start=========="); ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟主机 connectionFactory.setVirtualHost("/"); //创建链接 Connection connection = connectionFactory.newConnection(); //创建channel Channel channel = connection.createChannel(); String exchangeName="test_confirm_exchange"; String exchangeType="topic"; //声明Exchange channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); String queueName="test_confirm_queue"; //声明队列 channel.queueDeclare(queueName,true,false,false,null); String routeKey="confirm.#"; //绑定队列和交换机 channel.queueBind(queueName,exchangeName,routeKey); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息::"+new String(body)); } }); }
到此这篇关于RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解的文章就介绍到这了,更多相关Confirm消息确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
JavaSE的三大接口:Comparator,Comparable和Cloneable详解
这篇文章主要介绍了详解JavaSE中Comparator,Comparable和Cloneable接口的区别的相关资料,希望通过本文大家能彻底掌握这部分内容,需要的朋友可以参考下2021-10-10
最新评论