C#使用RabbitMQ发送和接收消息工具类的实现

 更新时间:2023年12月19日 11:36:48   作者:让梦想疯狂  
RabbitMQ是一个消息的代理器,用于接收和发送消息,本文主要介绍了C#使用RabbitMQ发送和接收消息工具类的实现,具有一定的参考价值,感兴趣的可以了解一下

下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码:

工具类

通过NuGet安装RabbitMQ.Client

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class RabbitMQHelper : IDisposable
    {
        private readonly ConnectionFactory _factory;
        private IConnection _connection;
        private IModel _channel;
        public RabbitMQHelper()
        {
            // 设置连接参数
            _factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="message"></param>
        public void SendMessage<T>(string queueName, T message)
        {
            try
            {
                InitConnection();

                // 声明队列
                _channel.QueueDeclare(queue: queueName,
                    durable: true,// 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                _channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to send message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="messageHandler"></param>
        public void ReceiveMessage<T>(string queueName, Action<T> messageHandler)
        {
            try
            {
                InitConnection();

                // 声明队列(接收需声明队列,否则队列不存在时,无法接收消息)
                _channel.QueueDeclare(queue: queueName,
                    durable: true, // 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                //设置消费者数量(并发度),每个消费者每次只能处理一条消息
                _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                // 创建消费者
                var consumer = new EventingBasicConsumer(_channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

                        var convertedMessage = JsonConvert.DeserializeObject<T>(message);

                        //委托方法
                        messageHandler.Invoke(convertedMessage);

                        // 消息处理成功,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                    catch (Exception ex)
                    {
                        // 消息处理异常,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                };

                _channel.BasicConsume(queue: queueName,
                    autoAck: false,// 设置为true表示自动确认消息
                    consumer: consumer);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to receive message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 初始化链接
        /// </summary>
        private void InitConnection()
        {
            if (_connection == null || !_connection.IsOpen)
            {
                _connection = _factory.CreateConnection();
                _channel = _connection.CreateModel();
            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            _connection?.Close();
            _connection?.Dispose();
        }
    }
}

使用示例

using System;
using System.Text;
using System.Threading.Tasks;
using WorkerService1;

public class Program
{
    private static string QueueName = "myqueue_key";
    public static void Main()
    {

        var rabbitMQHelper = new RabbitMQHelper();
        for (long i = 0; i < 30; i++)
        {
            rabbitMQHelper.SendMessage(QueueName, i);
        }

        rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);

        Console.ReadLine();
    }

    /// <summary>
    /// 接收处理
    /// </summary>
    /// <param name="index"></param>
    private static void ReceivedHandle(long index)
    {
        try
        {
            Console.WriteLine($"第{index}次开始{DateTime.Now}");
            Thread.Sleep(2000);
            Console.WriteLine($"第{index}次结束{DateTime.Now}");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

到此这篇关于C#使用RabbitMQ发送和接收消息工具类的实现的文章就介绍到这了,更多相关C# RabbitMQ发送和接收内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • C#实现 Server-sent Events的步骤

    C#实现 Server-sent Events的步骤

    这篇文章主要介绍了C#实现 Server-sent Events的步骤,帮助大家更好的理解和使用c#,感兴趣的朋友可以了解下
    2021-01-01
  • C# 9.0 特性全面总结

    C# 9.0 特性全面总结

    这篇文章主要介绍了C# 9.0 特性的相关资料,帮助大家更好的理解和学习使用c#,感兴趣的朋友可以了解下
    2021-02-02
  • C#.NET实现网页自动登录的方法

    C#.NET实现网页自动登录的方法

    这篇文章主要介绍了C#.NET实现网页自动登录的方法,以实例形式分析了C#实现点击自动登录的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-09-09
  • C#中for循环、while循环循环执行的方法

    C#中for循环、while循环循环执行的方法

    这篇文章主要介绍了C#中for循环、while循环循环执行的方法的相关资料,非常不错,具有参考借鉴价值,感兴趣的朋友一起学习吧
    2016-06-06
  • Unity实现通用的信息提示框

    Unity实现通用的信息提示框

    这篇文章主要为大家详细介绍了Unity实现通用的信息提示框,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-06-06
  • C#简单读取、改变文件的创建、修改及访问时间的方法

    C#简单读取、改变文件的创建、修改及访问时间的方法

    这篇文章主要介绍了C#简单读取、改变文件的创建、修改及访问时间的方法,涉及C#文件类SetCreationTime、SetLastWriteTime及SetLastAccessTime的相关使用技巧,需要的朋友可以参考下
    2015-07-07
  • OpenXml合并Table单元格代码实例

    OpenXml合并Table单元格代码实例

    在本篇文章里小编给大家整理了关于OpenXml合并Table单元格的相关实例详解,需要的朋友们参考下。
    2019-08-08
  • C#和JavaScript实现交互的方法

    C#和JavaScript实现交互的方法

    最近做一个小项目不可避免的需要前端脚本与后台进行交互。由于是在asp.net中实现,故问题演化成asp.net中jiavascript与后台c#如何进行交互。
    2015-05-05
  • C# #define条件编译详解

    C# #define条件编译详解

    这篇文章主要介绍了C# #define条件编译,告诉大家#define是用来做什么?如何使用#define,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-01-01
  • C#实现在匿名方法中捕获外部变量的方法

    C#实现在匿名方法中捕获外部变量的方法

    这篇文章主要介绍了C#实现在匿名方法中捕获外部变量的方法,本文直接给出代码实例,然后分析了代码中的一些知识点,需要的朋友可以参考下
    2015-03-03

最新评论