RabbitMQ的Direct Exchange模式实现的消息发布案例(示例代码)
Producer生产者代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQProducer { private final static String EXCHANGE_NAME = "direct_message_exchange"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) { // 1. 创建连接工厂,设置连接参数 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); // RabbitMQ默认端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { // 2. 声明交换机 (direct类型,持久化) channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true); // 3. 声明队列 (持久化,非独占,连接断开时不自动删除) channel.queueDeclare("queue5", true, false, false, null); channel.queueDeclare("queue6", true, false, false, null); channel.queueDeclare("queue7", true, false, false, null); // 4. 绑定队列到交换机,设置路由键 channel.queueBind("queue5", EXCHANGE_NAME, "order"); channel.queueBind("queue6", EXCHANGE_NAME, "order"); channel.queueBind("queue7", EXCHANGE_NAME, "course"); // 5. 准备要发送的消息 String message = "你好,学相伴:www.kuangstudy.com"; // 6. 向交换机发送消息,使用路由键 "course" channel.basicPublish(EXCHANGE_NAME, "course", null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (Exception ex) { // 捕获异常并打印堆栈信息 ex.printStackTrace(); System.out.println("消息发送出现异常..."); } finally { // 在try-with-resources中,不再需要显式关闭连接和通道 // 会自动关闭连接和通道 } } }
功能点:
- 声明了一个Direct类型的交换机,并绑定了三个队列(
queue5
,queue6
,queue7
)。其中queue5
和queue6
都绑定到order
路由键,而queue7
绑定到course
路由键。 - 发送了一条消息到
course
路由键绑定的队列中(即queue7
)。
Consumer消费者代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class RabbitMQConsumer { private final static String QUEUE_NAME = "queue7"; // 与生产者的绑定一致 private final static String EXCHANGE_NAME = "direct_message_exchange"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) { // 1. 创建连接工厂,设置连接参数 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); // RabbitMQ默认端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { // 2. 声明交换机和队列,与生产者保持一致 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 3. 绑定队列到交换机,路由键为"course" channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "course"); System.out.println(" [*] 等待接收消息..."); // 4. 定义接收消息的回调函数 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收到的消息: '" + message + "'"); // 这里可以添加进一步的消息处理逻辑 }; // 5. 开始消费消息 (自动应答) channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (Exception ex) { // 捕获异常并打印堆栈信息 ex.printStackTrace(); System.out.println("消费者运行中出现异常..."); } } }
功能点:
1. 与生产者保持一致:消费者的队列名称、交换机名称和路由键与生产者保持一致,即监听queue7
队列,并接收路由键为course
的消息。
2. 回调函数处理消息:使用DeliverCallback
来定义收到消息后的处理逻辑。在回调函数中,delivery.getBody()
获取消息内容,随后可以对消息进行处理、存储或其他业务逻辑操作。
3 自动应答:basicConsume
中的true
表示自动应答(auto-acknowledge),即消息处理完毕后,RabbitMQ会自动确认消息已成功处理。如果需要手动应答,可以将true
替换为false
,并在处理完成后调用channel.basicAck()
来手动确认消息。
到此这篇关于RabbitMQ的Direct Exchange模式实现的消息发布案例的文章就介绍到这了,更多相关RabbitMQ Direct Exchange消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
spring boot+jwt实现api的token认证详解
这篇文章主要给大家介绍了关于spring boot+jwt实现api的token认证的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一学习学习吧2018-12-12深入了解Java核心类库--BigDecimal和System类
这篇文章主要为大家详细介绍了javaBigDecimal和System类定义与使用的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能给你带来帮助2021-07-07Java内存模型(JMM)及happens-before原理
这篇文章主要介绍了java内存模型(JMM)及happens-before原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2020-04-04
最新评论