RabbitMQ中的prefetch_count参数详解
前提
在某一次用户标签服务中大量用到异步流程,使用了RabbitMQ
进行解耦。
其中,为了提高消费者的处理效率针对了不同节点任务的消费者线程数和prefetch_count
参数都做了调整和测试,得到一个相对合理的组合。
这里深入分析一下prefetch_count
参数在RabbitMQ
中的作用。
prefetch_count参数的含义
先从AMQP
(Advanced Message Queuing Protocol
,即高级消息队列协议,RabbitMQ
实现了此协议的0-9-1
版本的大部分内容)和RabbitMQ
的具体实现去理解prefetch_count
参数的含义,可以查阅对应的文档(见文末参考资料)。
AMQP 0-9-1
定义了basic.qos
方法去限制消费者基于某一个Channel
或者Connection
上未进行ack
的最大消息数量上限。
basic.qos
方法支持两个参数:
global
:布尔值。prefetch_count
:整数。
这两个参数在AMQP 0-9-1
定义中的含义和RabbitMQ
具体实现时有所不同,见下表:
global参数值 | AMQP 0-9-1中prefetch_count参数的含义 | RabbitMQ中prefetch_count参数的含义 |
---|---|---|
false | prefetch_count值在当前Channel的所有消费者共享 | prefetch_count对于基于当前Channel创建的消费者生效 |
true | prefetch_count值在当前Connection的所有消费者共享 | prefetch_count值在当前Channel的所有消费者共享 |
或者用简洁的英文表格理解:
global | prefetch_count in AMQP 0-9-1 | prefetch_count in RabbitMQ |
---|---|---|
false | Per channel limit | Per customer limit |
true | Per connection limit | Per channel limit |
这里画一个图理解一下:
上图仅仅为了区分协议本身和RabbitMQ
中实现的不同,接着说说prefetch_count
对于消费者(线程)和待消费消息的作用。
假定一个前提:RabbitMQ
客户端从RabbitMQ
服务端获取到队列消息的速度比消费者线程消费速度快,目前有两个消费者线程共用一个Channel
实例。
当global
参数为false
时候,效果如下:
而当global
参数为true
时候,效果如下:
在消费者线程处理速度远低于RabbitMQ
客户端从RabbitMQ
服务端获取到队列消息的速度的场景下,prefetch_count
条未进行ack
的消息会暂时存放在一个队列(准确来说是阻塞队列,然后阻塞队列中的消息任务会流转到一个列表中遍历回调消费者句柄,见下一节的源码分析)中等待被消费者处理。
这部分消息会占据JVM
的堆内存,所以在性能调优或者设定应用程序的初始化和最大堆内存的时候,如果刚好用到RabbitMQ
的消费者,必须要考虑这些"预取消息"的内存占用量。
不过值得注意的是:「prefetch_count
是RabbitMQ
服务端的参数,它的设置值或者快照都不会存放在RabbitMQ
客户端」。
同时需要注意prefetch_count
生效的条件和特性(从参数设置的一些demo
和源码上感知):
prefetch_count
参数仅仅在basic.consume
的autoAck
参数设置为false
的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。basic.consume
如果在非自动确认模式下忘记了手动调用basic.ack
,那么prefetch_count
正是未ack
消息数量的最大上限。prefetch_count
是由RabbitMQ
服务端控制,一般情况下能保证各个消费者线程中的未ack
消息分发是均衡的,这点笔者猜测是consumerTag
起到了关键作用。
RabbitMQ客户端中prefetch_count源码跟踪
编写本文的时候引入的RabbitMQ客户端版本为:com.rabbitmq:amqp-client:5.9.0
上面说了这么多都只是根据官方的文档或者博客中的理论依据进行分析,其实更加根本的分析方法是直接阅读RabbitMQ
的Java
客户端源码,主要是针对basic.qos
和basic.consume
两个方法,对应的是com.rabbitmq.client.impl.ChannelN#basicQos()
和com.rabbitmq.client.impl.ChannelN#basicConsume()
两个方法。
先看ChannelN#basicQos()
:
这里的basicQos()
方法多了一个prefetchSize
参数,用于限制分发内容的大小上限,默认值0
代表无限制,而prefetchCount
的取值范围是[0,65535]
,取值为0
也是代表无限制。
这里的ChannelN#basicQos()
实现中直接封装basic.qos
方法参数进行一次RPC
调用,意味着直接更变RabbitMQ
服务端的配置,即时生效,同时参数值完全没有保存在客户端代码中,印证了前面一节的结论。
接着看ChannelN#basicConsume()
方法:
上图已经把关键部分用红圈圈出,因为整个消息消费过程是异步的,涉及太多的类和方法,这里不全量贴出,整理了一个流程图:
整个消息消费过程,prefetch_count
参数并未出现在客户端代码中,又再次印证了前面一节的结论,即prefetch_count
参数的行为和作用完全由RabbitMQ
服务端控制。
而最终Customer
或者常用的DefaultCustomer
句柄是在WorkPoolRunnable
中回调的,这类任务的执行线程来自于ConsumerWorkService
内部的线程池,而这个线程池又使用了Executors.newFixedThreadPool()
去构建,使用了默认的线程工厂类,因此在Customer#handleDelivery()
方法内部打印的线程名称的样子是pool-1-thread-*
。
这里VariableLinkedBlockingQueue就是前一节中的message queue的原型
prefetch_count参数使用
设置prefetch_count
参数比较简单,就是调用Channel#basicQos()
方法:
public class RabbitQos { static String QUEUE = "qos.test"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, true, false, false, null); channel.basicQos(2); channel.basicConsume("qos.test", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("1------" + Thread.currentThread().getName()); sleep(); } }); channel.basicConsume("qos.test", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("2------" + Thread.currentThread().getName()); sleep(); } }); for (int i = 0; i < 20; i++) { channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes()); } sleep(); } private static void sleep() { try { Thread.sleep(Long.MAX_VALUE); } catch (Exception ignore) { } } }
上面是原生的amqp-client
的写法,如果使用了spring-amqp
(spring-boot-starter-amqp
),可以通过配置文件中的spring.rabbitmq.listener.direct.prefetch
属性指定所有消费者线程的prefetch_count
,如果要针对部分消费者线程进行该属性的设置,则需要针对RabbitListenerContainerFactory
进行改造。
prefetch_count参数最佳实践
关于prefetch_count
参数的设置,RabbitMQ
官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。
该文章分析了消息流控的整个流程,其中提到了prefetch_count
参数的一些指标:
这里指出了,如果prefetch_count
的值超过了30
,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count
的值就会变得收效甚微。
也就是说,「官方是建议把prefetch_count
设置为30
」。
这里再参看一下spring-boot-starter-amqp
中对此参数定义的默认值,具体是AbstractMessageListenerContainer
中的DEFAULT_PREFETCH_COUNT
:
如果没有通过spring.rabbitmq.listener.direct.prefetch
进行覆盖,那么使用spring-boot-starter-amqp
中的注解定义的消费者线程中设置的prefetch_count
就是250
。
笔者认为,应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count
的设置。
总结如下(个人经验仅供参考):
- 当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把
prefetch_count
设置为1
。 - 当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未
ack
总消息量的内存极限,从而设计一个合理的prefetch_count
值。 - 当消费者线程的处理速度十分快,远远大于
RabbitMQ
服务端的消息分发,在网络带宽充足的前提下,设置可以把prefetch_count
值设置为0
,不做任何的消息流控。 - 一般场景下,建议使用
RabbitMQ
官方的建议值30
或者spring-boot-starter-amqp
中的默认值250
。
总结
prefetch_count
是RabbitMQ
服务端的参数,设置后即时生效。prefetch_count
对于AMQP-0-9-1
中的定义与RabbitMQ
中的实现不完全相同。prefetch_count
值设置建议使用框架提供的默认值或者通过分组实验结合数据报大小进行计算和评估出一个合理值。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
SpringBoot如何读取xml配置bean(@ImportResource)
这篇文章主要介绍了SpringBoot如何读取xml配置bean(@ImportResource),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-01-01MyBatis关闭一级缓存的两种方式(分注解和xml两种方式)
这篇文章主要介绍了MyBatis关闭一级缓存的两种方式(分注解和xml两种方式),mybatis默认开启一级缓存,执行2次相同sql,但是第一次查询sql结果会加工处理这个时候需要关闭一级缓存,本文给大家详细讲解需要的朋友可以参考下2022-11-11springboot如何静态加载@configurationProperties
这篇文章主要介绍了springboot如何静态加载@configurationProperties,本文一个错误案例和成功案例结合实例代码给大家介绍的非常详细,需要的朋友可以参考下2022-07-07SpringBoot @FixMethodOrder 如何调整单元测试顺序
这篇文章主要介绍了SpringBoot @FixMethodOrder 调整单元测试顺序方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-09-09
最新评论