RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解

 更新时间:2023年12月12日 09:57:28   作者:warybee  
这篇文章主要介绍了RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解,生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能,需要的朋友可以参考下

1. 概述

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都 有可能。此时可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还 没接收到这个消息的回调,那么你可以重发。

在这里插入图片描述

在实际项目中,可以利用这一机制保障消息的可靠性投递,如果消息未发送成功,可以在监听事件中记录日志、重新发送消息等操作。

2.原生API中开启Confirm消息确认机制

  • 在生产者的channel上开启确认机制: channel.confirmSelect();
  • 在channel上添加Confirm监听事件: channel.addConfirmListener(new ConfirmListener() ...

2.1 代码演示

生产者代码

监听事件的两个方法:handleAck() 消息投递成功后回调,handleNack 消息未成功投递回调

public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建一个链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //消息的确认模式
        channel.confirmSelect();
        String exchangeName="test_confirm_exchange";
        String routeKey="confirm.test";
        String msg="RabbitMQ send message confirm test!";
        for (int i=0;i<5;i++){
            channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
        }
        //确定监听事件
        channel.addConfirmListener(new ConfirmListener() {
            /**
             *  消息成功发送
             * @param deliveryTag   消息唯一标签
             * @param multiple  是否批量
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********Ack*********");
            }
            /**
             *  消息没有成功发送
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********No Ack*********");
            }
        });
    }

消费者端代码

public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建链接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_confirm_exchange";
        String exchangeType="topic";
        //声明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_confirm_queue";
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="confirm.#";
        //绑定队列和交换机
        channel.queueBind(queueName,exchangeName,routeKey);
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                }
            });
    }

到此这篇关于RabbitMQ中Confirm消息确认机制保障生产端消息的可靠性详解的文章就介绍到这了,更多相关Confirm消息确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java HashMap 如何正确遍历并删除元素的方法小结

    Java HashMap 如何正确遍历并删除元素的方法小结

    这篇文章主要介绍了Java HashMap 如何正确遍历并删除元素的方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-05-05
  • 浅谈IDEA实用的Servlet模板

    浅谈IDEA实用的Servlet模板

    今天给大家分享一下IDEA实用的Servlet模板,文中有非常详细的图文介绍及代码示例,对小伙伴们很有帮助哦,需要的朋友可以参考下
    2021-05-05
  • SpringBoot项目整合拦截器详解

    SpringBoot项目整合拦截器详解

    这篇文章主要介绍了SpringBoot项目整合拦截器详解,java里的拦截器是动态拦截Action调用的对象,它提供了一种机制可以使开发者在一个Action执行的前后执行一段代码,拦截器用于在某个方法或者字段被访问之前进行拦截,然后再之前或者之后加入某些操作,需要的朋友可以参考下
    2023-10-10
  • 一文带你了解Java中的SPI机制

    一文带你了解Java中的SPI机制

    SPI 全称是 Service Provider Interface,是一种 JDK 内置的动态加载实现扩展点的机制,本文主要为大家介绍了SPI机制的原理与使用,需要的可以参考一下
    2023-04-04
  • 深入理解java泛型详解

    深入理解java泛型详解

    这篇文章主要介绍了Java中的泛型详解,什么是泛型,作用以及基础实例等,喜欢的朋友可以参考
    2017-04-04
  • JavaSE的三大接口:Comparator,Comparable和Cloneable详解

    JavaSE的三大接口:Comparator,Comparable和Cloneable详解

    这篇文章主要介绍了详解JavaSE中Comparator,Comparable和Cloneable接口的区别的相关资料,希望通过本文大家能彻底掌握这部分内容,需要的朋友可以参考下
    2021-10-10
  • mybatis plus 开启sql日志打印的方法小结

    mybatis plus 开启sql日志打印的方法小结

    Mybatis-Plus(简称MP)是一个 Mybatis 的增强工具,在 Mybatis 的基础上只做增强不做改变,为简化开发、提高效率而生。本文重点给大家介绍mybatis plus 开启sql日志打印的方法小结,感兴趣的朋友一起看看吧
    2021-09-09
  • SpringAOP中@Pointcut的用法详解

    SpringAOP中@Pointcut的用法详解

    这篇文章主要介绍了SpringAOP中@Pointcut的用法详解,Pointcut(切点)是面向切面编程中的一个非常重要的概念,此概念由spring框架定义,Pointcut只是一种筛选规则,需要的朋友可以参考下
    2023-08-08
  • spring消息转换器使用详解

    spring消息转换器使用详解

    这篇文章主要为大家详细介绍了spring消息转换器的使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-07-07
  • 浅谈Java编程中string的理解与运用

    浅谈Java编程中string的理解与运用

    这篇文章主要介绍了浅谈Java编程中string的理解与运用,还是比较不错的,这里分享给大家,供需要的朋友参考。
    2017-11-11

最新评论