关于RabbitMQ的Channel默认线程
前言
最近做了一个小功能,是通过一个客户端消费者监听队列消息, 代码如下:
Connection conn = getConnection(); Channel channel = conn.createChannel(); MessageConsumer consumer = ... channel.basicConsume(realQueue, true, consumer);
通过jvm工具观察rabbitmq的线程使用情况,发现生产者每发一条消息,消费者这边就会创建一条线程, 言下之意,一个channel当消息来到时就会异步处理这些消息.
定位
通过断点查找发现原来是 ConsumerWorkService这个类控制的。
这个类顾名思义,就是消费者工作 ExecutorService, 这里的Service表示的是ExecutorService
这个类构造函数里有一个executor参数,当这个参数为空时,就会创建一个Executors.newFixedThreadPool,代码如下:
final public class ConsumerWorkService { private static final int MAX_RUNNABLE_BLOCK_SIZE = 16; private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2; private final ExecutorService executor; private final boolean privateExecutor; private final WorkPool<Channel, Runnable> workPool; private final int shutdownTimeout; public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) { this.privateExecutor = (executor == null); this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) : executor; this.workPool = new WorkPool<>(queueingTimeout); this.shutdownTimeout = shutdownTimeout; } ...
默认的executor 会使用 CPU核数的2倍 作为线程池里线程的数量。
所以到底是要用多个channel,还是单个channel,这个就是其中一个参考依据。
executor是怎么传进来的
答案:
ConnectionFactory -> AMQConnection -> ChannelManager -> ConsumerWorkService
ConnectionFactory有一个属性是 shareExecutorService ,这个属性表示内部使用共享的唯一一个ExecutorService 设置这个属性就可以一直传到ConsumerWorkService中。
除了ConnectionFactory.setShareExecutorService方法以外, 还可以在Connection被创建时,设置executorService ConnectionFactory的newConnection方法:
public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;
总结
通过设置shareExecutorService,无论多少个channel,都可以统一控制线程数量、队列数量, 根据实际情况进行配置。
public class RabbitMqUtil { public static Channel getChannel() throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //连接服务器 factory.setHost("114.***.***.***"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123"); //创建连接 // ExecutorService executor = Executors.newFixedThreadPool(1); 设置线程池中的个数,把executor传给newConnection() Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); return channel; } }
到此这篇关于关于RabbitMQ的Channel默认线程的文章就介绍到这了,更多相关RabbitMQ的Channel内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
使用递归删除树形结构的所有子节点(java和mysql实现)
下面小编就为大家带来一篇使用递归删除树形结构的所有子节点(java和mysql实现)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧2017-10-10
最新评论