详解RabbitMq如何做到消息的可靠性投递

 更新时间:2022年09月05日 14:37:05   作者:S1C  
这篇文章主要为大家介绍了RabbitMq如何做到消息的可靠性投递,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的。 引入消息队列可以给这个项目带来很多的好处:比如

  • 削峰

这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请求

  • 解耦合

比如现在有系统A,当系统A执行完成后,B、C系统需要拿到A系统的结果才可以继续执行,如果不引入MQ,A系统还要调用B、C系统,这样这A、B、C三个系统的耦合性就很大。引入MQ后A系统的执行结果只需要保证将消息投递到MQ就好,其它的两个系统只需要监听这个MQ的某个队列,这样就降低了这三个系统之间的耦合性。

  • 异步

再通过A、B、C这三个系统举例。A系统在返回给用户的执行结果前需要完成B、C系统的调用,这个总的执行时间是A+B+C的执行时间,如果引入MQ,A系统的执行完成后将数据投递到MQ,直接响应用户。B、C再这在通过监听完成数据的处理。这样也降低了用户的等待时间

除了这些好处,当然引入MQ还会有不好的地方:比如

  • 数据一致性问题
    • A系统执行完将数据投递到了MQ,B、C在消费的时候如果出现了问题,是不是就导致了数据不一致的问题
  • 可用性降低
    • 一个好好的系统,引入一个MQ,如果这个MQ拓机了呢?这个可能就需要集群来提高MQ的高可用。
  • 系统的复杂度提高
    • 引入了MQ,我们还需要关注消息是否被成功的投递,MQ中的消息被积压太多怎么办?消费端是否成功的消费的消息。

这些都是问题,所在是否要引入MQ还需要看业务需求

RabbitMq的投递及消费流程

这里有张投递消息到消费的流程图

从这张图上可看出这也是一种AMQP协议的实现。消息的提供者先是通过某一个信道将消息发送到交换机,然后交换机通通RoutingKey来将消息分发到某一个队列上。然后,消费者在临听某一个队列来进行消息的消费。

今天我们的主题是如何保证消息的投递可靠性。那么我们来想想在这个流程中那些位置可能会影响我们消息的投递可靠性?

从上图中我们可以总结出有二个因素影响着消息是否被成功投递和被成功消费

提供者

  • 提供者有没有将消成功的发送到MQ并被处理
  • 发送到MQ中的消息有没有成功的被路由到队列中

消费者

  • 消费者有没有成功的签收消息并成功处理。
  • 消费者是否可以保证消费者的稳定性

提供者如何确保消息的成功投递

解决这个问题,我们可以通过提供者的发送方确认机制来实现,这个发送方确认机制又分成三种:

  • 单条消息的同步确认
  • 多条消息的同步确认
  • 异步消息确认

单条消息的同步确认

首先要在当前的Channel上开启消息确认模式,然后通过waitForConfirms()方法进行消息确认是否发送成功。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            Map<String,String> mes = new HashMap<>();
            mes.put("name","1111");
            String messageStr = objectMapper.writeValueAsString(mes);
            channel.basicPublish(
                    "exchange.drinks",
                    "drinks.juzi",
                    null,
                    messageStr.getBytes());
            boolean isSendSuccess = channel.waitForConfirms();
            if(isSendSuccess){
                System.out.print("消息发送成功");
            }
        }
    }

这样做的话每次发完消息后,都会确保消息是否发送成功。如果发送失败的话进行相应的处理。

多条消息的同步确认

多条消息的确认和单条的差不多,比如我将发送消息的代码放到一个循环内。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            Map<String,String> mes = new HashMap<>();
            mes.put("name","1111");
            String messageStr = objectMapper.writeValueAsString(mes);
            for(int i = 0;i < 100;i++){
                channel.basicPublish(
                        "exchange.drinks",
                        "drinks.juzi",
                        null,
                        messageStr());
            }
            boolean isSendSuccess = channel.waitForConfirms();
            System.out.println(isSendSuccess);
        }
    }

这样的话当一批消息发送完成后,进行统一的消息确认是否发送成功,就成了多条的消息确认,不过并不推荐使用这种确认消息的方式

在多条的消息确认中,比如我先是发送了一批的消息,比如这批消息有100条,这个时候如果有其中的一条消息没有发送成功,这里返回的也是false,然尔我们并不能知道是具体的哪 一条消息发送失败。

异步消息确认

异步的消息确认是通过一个监听器来实现的,当消息发送后,会接着执行下面的逻辑,可能在稍会的一段时间,监听器监听到了Broker的返回,再进行逻辑的处理。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            ConfirmListener confirmListener = new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("发送成功:" + deliveryTag + " multiple:" + multiple);
                }
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("发送失败:" + deliveryTag);
                }
            };
            channel.addConfirmListener(confirmListener);
            Map<String,String> mes = new HashMap<>();
            mes.put("name","11111");
            String messageStr = objectMapper.writeValueAsString(mes);
            for(int i = 0;i < 100;i++){
                channel.basicPublish(
                        "exchange.drinks",
                        "drinks.juzi",
                        null,
                        messageStr.getBytes());
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

当成功的发送消息的时候会回调监听器中的handleAck方法,如果没有发送成功会回调handleNack方法 在这个监听器里面有两个参数一个deliveryTagmultiple:

  • deliveryTag:表示当前的Channel发送的第几条消息
  • multiple:是否在确认多条消息

这个异步的虽然在听觉上感觉比较厉害些,这里也不推荐使用,原因和上面的一样,我们并不能具休的知道是哪一条消息没有被确认发送。

综上:这里更加推荐单条消息确认,具体选择哪一种还是要用业务做出选择

注:注意一点是当一条消息成功的发送到Broker,但是如果没有正确的路由到队列,那么这时borker也是会返回true,因为Broker确时接收到了消息只是RoutingKey不可达,所以这里也会返回true,并且直接将消息丢弃

消息的返回机制

这个消息返回机制的作用就是在当一个消息成功的发送,但是并没有正确路由到队列的时候所回调的。

这也弥补了上面确认消息是否发送成功但没有路由到队列所返回true的问题 在使用消息返回机制的时候在发送消息时需要将mandatory置成true。再添加对应的监听器。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return returnMessage) {
                    System.out.println("replyCode:" + returnMessage.getReplyCode() + " replyText:" + returnMessage.getReplyText() + " routingKey:"
                    + returnMessage.getRoutingKey() + " exchange:" + returnMessage.getExchange() + " body:" + new String(returnMessage.getBody()));
                }
            });
            Map<String,String> mes = new HashMap<>();
            mes.put("name","11111");
            String messageStr = objectMapper.writeValueAsString(mes);
            channel.basicPublish(
                    "exchange.drinks",
                    "drinks.juzi1",
                    true,
                    null,
                    messageStr.getBytes());
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

这里的addReturnListener方法有两个重载:只不过是handle的参数不同,一个是参数都显示在了参数列表内,一个是将参数封装到了Return对象内。当handle被回调的时候也可以获取到相应的参数比如:exchange routingkey body。

注:保证消息可靠性投递的前提是服务的高可用,服务不高可用谈其它的都是扯

以上就是详解RabbitMq如何做到消息的可靠性投递的详细内容,更多关于RabbitMq 消息可靠性投递的资料请关注脚本之家其它相关文章!

相关文章

  • 在Java内存模型中测试并发程序代码

    在Java内存模型中测试并发程序代码

    这篇文章主要介绍了在Java内存模型中测试并发程序代码,辅以文中所提到的JavaScript库JCStress进行,需要的朋友可以参考下
    2015-07-07
  • Java解析pdf格式发票的代码实现

    Java解析pdf格式发票的代码实现

    为了减少用户工作量及误操作的可能性,需要实现用户上传PDF格式的发票,系统通过解析PDF文件获取发票内容,并直接将其写入表单,以下文章记录了功能实现的代码,需要的朋友可以参考下
    2024-08-08
  • RocketMq事务消息发送代码流程详解

    RocketMq事务消息发送代码流程详解

    这篇文章主要介绍了RocketMq事务消息发送代码流程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • 使用springboot结合vue实现sso单点登录

    使用springboot结合vue实现sso单点登录

    这篇文章主要为大家详细介绍了如何使用springboot+vue实现sso单点登录,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-06-06
  • JAVA中StackOverflowError错误的解决

    JAVA中StackOverflowError错误的解决

    这篇文章主要介绍了JAVA中StackOverflowError错误的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04
  • JAVA生成八位不重复随机数最快的方法总结(省时间省空间)

    JAVA生成八位不重复随机数最快的方法总结(省时间省空间)

    随机数在实际中使用很广泛,比如要随即生成一个固定长度的字符串、数字,这篇文章主要给大家介绍了关于JAVA生成八位不重复随机数最快的方法,文中介绍的方法省时间省空间,需要的朋友可以参考下
    2024-03-03
  • SpringBoot Bean花式注解方法示例下篇

    SpringBoot Bean花式注解方法示例下篇

    这篇文章主要介绍了SpringBoot Bean花式注解方法,很多时候我们需要根据不同的条件在容器中加载不同的Bean,或者根据不同的条件来选择是否在容器中加载某个Bean
    2023-02-02
  • spring cloud zuul 与 sentinel的结合使用操作

    spring cloud zuul 与 sentinel的结合使用操作

    这篇文章主要介绍了spring cloud zuul 与 sentinel 的结合使用操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • Java继承方法重写实现原理及解析

    Java继承方法重写实现原理及解析

    这篇文章主要介绍了Java继承方法重写实现原理及解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • futuretask用法及使用场景介绍

    futuretask用法及使用场景介绍

    这篇文章主要介绍了futuretask用法及使用场景介绍,小编觉得挺不错的,这里分享给大家,供大家参考。
    2017-10-10

最新评论