Java RabbitMQ的工作队列与消息应答详解

 更新时间:2022年03月08日 14:47:32   作者:江海i  
这篇文章主要为大家详细介绍了Python实现学生成绩管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助

Work Queues

工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

其实就是生产者发送大量的消息,发送到队列之后,由多个消费者(工作线程)来处理消息,并且每个消息只能被处理一次。

在这里插入图片描述

1. 轮询分发消息

多个工作线程按照次序每来一个消息执行一次。

1.1 抽取工具类

直接通过信息获取信道

/**
 * @Description RabbitMQ工具类
 * @date 2022/3/5 10:02
 */
public class RabbitMQUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        return connection.createChannel();
    }
}

1.2 编写两个工作线程

Work2和Work1代码没有区别,只需要对它做出区分即可。

public class Worker1 {
    // 指定队列名称
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        // 获取信道
        Channel channel = RabbitMQUtils.getChannel();

        // 声明:接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("工作线程01:"+ new String(message.getBody()));
        };

        // 声明:取消消费回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("工作线程01取消接收:"+consumerTag);
        };

        System.out.println("工作线程01启动完成......");

        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

1.3 编写生产者

public class Producer {

    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();


        // 产生队列
        channel.queueDeclare(QUEUE_NAME,false,false,true,null);

        // 消息体
        Scanner scanner = new Scanner(System.in);
        int i = 1;
        while (scanner.hasNext()){
            String msg = scanner.next();
            msg = msg + i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("发送成功:" + msg);
        }

        System.out.println("----------==========发送完毕==========----------");
    }

}

1.4 运行测试

先启动两个工作线程,再启动生产者。

出现404异常请参考下方1.6

生产者发送情况:

在这里插入图片描述

轮询状态下两个工作队列接收状态:

在这里插入图片描述

在这里插入图片描述

1.5 异常情况

在先启动两个消费者线程时,会提示404找不到队列

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)

发生这个情况的原因很显然是因为先启动了消费者,但是在RabbitMQ中没有创建相对应的队列名称,解决方法可以:

1.先启动生产者创建队列(也可以在RabbitMQ中创建队列);

2.再启动消费者就不会产生这个错误;

3.再在生产者中使用Scanner类去发送消息测试。

2. 消息应答

消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以删除消息。其目的就是为了保护消息在被处理之前不会消失。

2.1 自动应答

这种方式发送后就被认定为已经传送成功,所以在消息接收到之前消费者的连接或者channel关闭,那么这个消息就会丢失。其特点是消费者可以传递过载的消息,对传递的消息没有限制,但如果因内存耗尽消费者线程被系统杀死,就会使得多条消息丢失。所以这个模式需要在数据安全性和吞吐量之间选择,适合使用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

所以自动应答的方式局限性很高。

2.2 手动应答

优点:可以批量应答和减少网络拥挤。

1.channel.basicAck(long deliveryTag, boolean multiple);:肯应应答,处理完消息之后提醒RabbitMQ可以删除当前队列,deliveryTag:当前队列中选中的消息;multiple:是否批量应答。

2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):否定应答,

3.channel.basicReject(long deliveryTag, boolean requeue):否定并且拒绝应答。

2.3 消息自动重新入队

如果消费者因为一些原因失去了对RabbitMQ的连接,导致没有发送ACK确认,RabbitMQ就会对该消息进行重新排队,并且分发给可以处理该消息的消费者,所以即使某个消费者死亡,也可以保证消息不会丢失。

2.4 手动应答测试

测试目的:在手动应答状态下不会发生消息丢失的情况。

测试方法:

1.创建两个消费者;

2.使用工具类使线程睡眠一定时间;

3.在睡眠时关闭线程,看能否自动重新入队。

2.4.1 生产者代码

/**
 * @Description 手动应答生产者
 * @date 2022/3/5 19:03
 */
public class Producer1 {

    // 指定队列名
    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:'" + msg + "'成功");
        }
    }
}

2.4.2 消费者代码

/**
 * @Description 手动应答消费者1
 * @date 2022/3/5 19:17
 */
public class Worker1 {

    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args)  throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("线程A等待接收......");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模拟并发沉睡一秒
            try {
                Thread.sleep(1000);
                System.out.println("线程A接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                /**
                 * basicAck:
                 *          1. 消息标记
                 *          2. 是否批量
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消费者取消消费");
                });

    }
}

Worker2类和1区别不大,将名称改成B再将睡眠事件改成30即可。

2.4.3 测试

测试方法:

1.先启动生产者创建队列;

2.启动两个消费者接收消息;

3.因为是轮询方式,所以A线程接收之后肯定是B线程接收,在睡眠时关闭B线程,如果A线程接收到说明测试成功。

发送消息:

在这里插入图片描述

线程A接收:

在这里插入图片描述

再发送消息:

在这里插入图片描述

关闭线程B线程A接收到消息:

在这里插入图片描述

测试成功!

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注脚本之家的更多内容!   

相关文章

  • Spring处理@Async导致的循环依赖失败问题的方案详解

    Spring处理@Async导致的循环依赖失败问题的方案详解

    这篇文章主要为大家详细介绍了SpringBoot中的@Async导致循环依赖失败的原因及其解决方案,文中的示例代码讲解详细,感兴趣的可以学习一下
    2022-07-07
  • 快速定位Java 内存OOM的问题

    快速定位Java 内存OOM的问题

    这篇文章主要介绍了快速定位Java 内存OOM的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • Java Web实现登录页面验证码验证功能

    Java Web实现登录页面验证码验证功能

    这篇文章主要介绍了Java Web登录页面验证码验证功能,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-12-12
  • 关于springcloud集成nacos遇到的问题

    关于springcloud集成nacos遇到的问题

    这篇文章主要介绍了关于springcloud集成nacos遇到的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • Java数据结构之链表的增删查改详解

    Java数据结构之链表的增删查改详解

    今天带大家来学习Java链表的增删改查的相关知识,文中有非常详细的代码示例,对正在学习Java的小伙伴们有很好的帮助,需要的朋友可以参考下
    2021-05-05
  • 本地启动RocketMQ未映射主机名产生的超时问题最新解决方案

    本地启动RocketMQ未映射主机名产生的超时问题最新解决方案

    这篇文章主要介绍了本地启动RocketMQ未映射主机名产生的超时问题,本文给大家分享最新解决方案,感兴趣的朋友跟随小编一起看看吧
    2024-02-02
  • JVM垃圾回收算法的概念与分析

    JVM垃圾回收算法的概念与分析

    这篇文章主要给大家介绍了关于JVM垃圾回收算法的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用JVM具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-12-12
  • Java基础第五篇 实施接口

    Java基础第五篇 实施接口

    在public和private的封装机制,我们实际上同时定义了类和接口,类和接口混合在一起。Java还提供了interface这一语法。这一语法将接口从类的具体定义中剥离出来,构成一个独立的主体,下面文章内容将为大家做详细介绍
    2021-09-09
  • SpringBoot实现过滤器和拦截器的方法

    SpringBoot实现过滤器和拦截器的方法

    大家应该都晓得实现过滤器需要实现 javax.servlet.Filter 接口,而拦截器会在处理指定请求之前和之后进行相关操作,配置拦截器需要两步,本文通过实例代码给大家介绍SpringBoot 过滤器和拦截器的相关知识,感兴趣的朋友一起看看吧
    2022-11-11
  • Java基础之throw和throws的示例详解

    Java基础之throw和throws的示例详解

    throw是用来抛出一个具体的异常实例,而throws是用来声明方法可能会抛出哪些类型的异常,是对调用者的一种通知和要求,这篇文章主要介绍了Java基础:throw和throws的详解,需要的朋友可以参考下
    2024-06-06

最新评论