RabbitMQ进阶之消息可靠性详解

 更新时间:2023年08月10日 10:53:23   作者:程序员阿红  
这篇文章主要介绍了RabbitMQ进阶之消息可靠性详解,abbitmq消息的投递过程中,怎么确保消息能不丢失,这是一个很重要的问题,哪怕我们做了Rabbitmq持久化,也不能保证我们的业务消息不会被丢失,需要的朋友可以参考下

消息的可靠性

Rabbitmq消息的投递过程中,怎么确保消息能不丢失,这是一个很重要的问题。哪怕我们做了Rabbitmq持久化,也不能保证我们的业务消息不会被丢失。

我们可以从消息的收发过程中来分析,消息首先要从生产者producer发送到broker,再从broker把消息发送给消费者consumer。

image-20230321151738456

所以我们总的可以从发送方(生产者)确认和接收方(消费者)确认来保证消息的可靠性。

image-20230321150208466

异常捕获机制

先执行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常, 在异常处理理的代码块中执行回滚业务操作或者执行重发操作等。

这是一种最大努力确保的方式, 并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

image-20230321152637400

另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试。

AMQP/RabbitMQ的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。 一直到事务提交后都没有异常,确实就说明消息是投递成功了。

但是,这种方式在性能方面的开销 比较大,一般也不推荐使用。

  • 事务实现
channel.txSelect(): 将当前信道设置成事务模式
channel.txCommit(): 用于提交事务
channel.txRollback(): 用于回滚事务

image-20230321152859934

发送端确认机制

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信 道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么 确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。

image-20230321153131995

RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号,另外,通过设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响应。

如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理理该 nack 命令。

package confirm;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherConfirmsProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
        try {
        // 发送消息
        for (int i = 1 ; i < 10000 ; i++){
            channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes());
        }
            // 同步的方式等待RabbitMQ的确认消息
            channel.waitForConfirmsOrDie(5000);
            System.out.println("发送的消息已经得到确认");
        } catch (IOException ex) {
            System.out.println("消息被拒收");
        } catch (IllegalStateException ex) {
            System.out.println("发送消息的通道不是PublisherConfirms通道");
        } catch (TimeoutException ex) {
            System.out.println("等待消息确认超时");
        }
        channel.close();
        connection.close();
    }
}

waitForConfirm方法有个重载的,可以自定义timeout超时时间,超时后会抛TimeoutException。类似的有几个waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后该方法会抛出java.io.IOException。

需要根据异常类型来做区别处理理, TimeoutException超时是属于第三状态(无法确定成功还是失败),而返回Basic.Nack抛出IOException这种是明确的失败。上面的代码主要只是演示confirm机制,实际上还是同步阻塞模式的,性能并不不是太好。

实际上,我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次 waitForConfirms方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),如此看来,批量重发消息肯定会造成部分消息重复。

另外,我们可以通过异步回调的方式来处理Broker的响应。addConfirmListener 方法可以添加ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法:handleAck 和handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。

package confirm;
/**
 * 创建者: 魏红
 * 创建时间: 2023-02-28
 * 描述:
 */
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
public class PublisherConfirmsProducer2 {
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
        String message = "hello-";
        // 批处理的大小
        int batchSize = 10;
        // 用于对需要等待确认消息的计数
        int outstrandingConfirms = 0;
        for (int i = 0; i < 10000; i++) {
            channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
            outstrandingConfirms++;
            if (outstrandingConfirms == batchSize) {
                // 此时已经有一个批次的消息需要同步等待broker的确认消息
                // 同步等待
                channel.waitForConfirmsOrDie(5000);
                System.out.println("消息已经被确认了");
                outstrandingConfirms = 0;
            }
        }
        if (outstrandingConfirms > 0) {
            channel.waitForConfirmsOrDie(5000);
            System.out.println("剩余消息已经被确认了");
        }
        channel.close();
        connection.close();
    }
}

还可以使用异步方法:

package confirm;
/**
 * 创建者: 魏红
 * 创建时间: 2023-02-28
 * 描述:
 */
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import javax.management.loading.MLet;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class PublisherConfirmsProducer3 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
//        ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() {
//            @Override
//            public void handle(long deliveryTag, boolean multiple) throws IOException {
//                if (multiple) {
//                    System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");
//                } else {
//                    System.out.println("编号为:" + deliveryTag + " 的消息被确认");
//                }
//            }
//        };
        ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
        ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");
                final ConcurrentNavigableMap<Long, String> headMap
                        = outstandingConfirms.headMap(deliveryTag, true);
                // 清空outstandingConfirms中已经被确认的消息信息
                headMap.clear();
            } else {
                // 移除已经被确认的消息
                outstandingConfirms.remove(deliveryTag);
                System.out.println("编号为:" + deliveryTag + " 的消息被确认");
            }
        };
        ConfirmCallback confirmCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                // 将没有确认的消息记录到一个集合中
                // 此处省略实现
                System.out.println("消息编号小于等于:" + deliveryTag + " 的消息 不确认");
            } else {
                System.out.println("编号为:" + deliveryTag + " 的消息不确认");
            }
        };
        // 设置channel的监听器,处理确认的消息和不确认的消息
        channel.addConfirmListener(clearOutstandingConfirms, confirmCallback);
        String message = "hello-";
        for (int i = 0; i < 500000; i++) {
            // 获取下一条即将发送的消息的消息ID
            final long nextPublishSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
            System.out.println("编号为:" + nextPublishSeqNo + " 的消息已经发送成功,尚未确认");
            outstandingConfirms.put(nextPublishSeqNo, (message + i));
        }
        // 等待消息被确认
        Thread.sleep(10000);
        channel.close();
        connection.close();
    }
}

持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

  • Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不丢失。
  • Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不丢失。
  • 消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2即可实现消息的持久化,保证消息自身不丢失。

image-20230321153556625

接收端确认机制

如何保证消息被消费者成功消费?

前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的 可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我 们又没有任何重试,那结果跟消息丢失没什么分别。

RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自 己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。

一般而言,我们有如下处理手段:

  • 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险
  • 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
  • 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack
package workmode;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法
 */
public class Recer2 {
    public static void main(String[] args) throws  Exception {
        // 1.获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 2.获得通道(信道)
        final Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue",false,false,false,null);
        // 3.从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
//                System.out.println("【顾客2】吃掉 " + s+" ! 总共吃【"+i+++"】串!");
                System.out.println("【消费者2】得到 " + s);
                // 模拟网络延迟
                try{
                    Thread.sleep(400);
                }catch (Exception e){
                }
                // 手动确认(收件人信息,是否同时确认多个消息)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 4.监听队列 false:手动消息确认
        channel.basicConsume("work_queue", false,consumer);
    }
}

本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式

image-20230321154156414

到此这篇关于RabbitMQ进阶之消息可靠性详解的文章就介绍到这了,更多相关RabbitMQ消息可靠性内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java 深入理解创建型设计模式之原型模式

    Java 深入理解创建型设计模式之原型模式

    原型(Prototype)模式的定义如下:用一个已经创建的实例作为原型,通过复制该原型对象来创建一个和原型相同或相似的新对象。在这里,原型实例指定了要创建的对象的种类。用这种方式创建对象非常高效,根本无须知道对象创建的细节
    2022-02-02
  • Java 中使用数组存储和操作数据

    Java 中使用数组存储和操作数据

    本文将介绍Java中常用的数组操作方法,通过详细的示例和解释,帮助读者全面理解和掌握这些方法,具有一定的参考价值,感兴趣的可以了解一下
    2023-09-09
  • 解决Springboot配置excludePathPatterns不生效的问题

    解决Springboot配置excludePathPatterns不生效的问题

    这篇文章主要介绍了解决Springboot配置excludePathPatterns不生效的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-10-10
  • Java设计模式之动态代理

    Java设计模式之动态代理

    今天小编就为大家分享一篇关于Java设计模式之动态代理,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • nacos配置中心持久化相关配置方式

    nacos配置中心持久化相关配置方式

    这篇文章主要介绍了nacos配置中心持久化相关配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • java如何多线程批量更新10万级的数据

    java如何多线程批量更新10万级的数据

    在处理大数据量的批量更新时,直接使用mybatis的updateBatch可能导致效率低下甚至OOM,通过每次处理5000条数据的方式虽然安全但效率低,更优的解决方案是使用多线程处理,将数据分批并多线程执行,有效提高了处理速度并保证了系统稳定性
    2024-10-10
  • MyBatis工厂类封装与简化实现

    MyBatis工厂类封装与简化实现

    工厂类的目的是将对象的创建逻辑封装在一个类中,以便客户端代码无需了解具体的实现细节,本文主要介绍了MyBatis工厂类封装与简化实现,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01
  • nacos单机本地配置文件存储位置方式

    nacos单机本地配置文件存储位置方式

    这篇文章主要介绍了nacos单机本地配置文件存储位置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • Java编程环境搭建和变量基本使用图文教程

    Java编程环境搭建和变量基本使用图文教程

    这篇文章主要介绍了Java编程环境搭建和变量基本使用,结合图文形式详细分析了java编程语言环境搭建、配置、变量、注释的基本使用方法,需要的朋友可以参考下
    2020-02-02
  • 一起来学习Java IO的转化流

    一起来学习Java IO的转化流

    这篇文章主要为大家详细介绍了Java IO的转化流,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03

最新评论