RabbitMq如何做到消息的可靠性投递

 更新时间:2022年12月17日 16:32:07   作者:Onemorelight95  
现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的。 引入消息队列可以给这个项目带来很多的好处,这篇文章主要为大家介绍了RabbitMq如何做到消息的可靠性投递,有需要的朋友可以借鉴参考下

如何保证消息不丢失

在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:

消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c)。

发送端的确认

Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式

和return 退回模式。

confirm 确认模式:消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

return 退回模式:消息从 exchange–>queue 投递失败,会将消息退回给producer。

消费端的确认

消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack,有两种确认方式:自动确认和手动确认。

在编码中,关于消息的确认方式,我们需要在消费者端调用Consumer函数时,设置第三个参数:autoAck是false还是true(false表示手动,true表示自动)。

自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false),手动签收,如果出现异常,则调用d.Reject(true)让其自动重新发送消息。

Go 实现

安装操作库

安装API库

Go可以使用streadway/amqp库来操作rabbit,使用以下命令来安装:

go get github.com/streadway/amqp

封装rabbitmq

接下来我们对streadway/amqp库的内容进行一个二次封装,封装为一个rabbitmq.go文件:

package rabbitmq
import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)
// RabbitMQ RabbitMQ结构
type RabbitMQ struct {
	channel  *amqp.Channel
	Name     string
	exchange string
}
// Connect 连接服务器
func Connect(s string) *RabbitMQ {
	//连接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	mq := new(RabbitMQ)
	mq.channel = ch
	return mq
}
// New 初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string, name string) *RabbitMQ {
	//连接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	q, e := ch.QueueDeclare(
		name,  //队列名
		false, //是否开启持久化
		true,  //不使用时删除
		false, //排他
		false, //不等待
		nil,   //参数
	)
	failOnError(e, "初始化消息队列失败!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	return mq
}
// QueueDeclare 声明queue
func (q *RabbitMQ) QueueDeclare(queue string) {
	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
	failOnError(e, "声明queue失败!")
}
// QueueDelete 删除queue
func (q *RabbitMQ) QueueDelete(queue string) {
	_, e := q.channel.QueueDelete(queue, false, true, false)
	failOnError(e, "删除queue失败!")
}
// Qos 配置queue参数
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "无法设置QoS")
}
// NewExchange 初始化交换机
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string, name string, typename string) {
	//连接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	e = ch.ExchangeDeclare(
		name,     // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e, "初始化交换机失败!")
}
// ExchangeDelete 删除交换机
func (q *RabbitMQ) ExchangeDelete(exchange string) {
	e := q.channel.ExchangeDelete(exchange, false, true)
	failOnError(e, "删除交换机失败!")
}
// Bind 绑定消息队列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
	e := q.channel.QueueBind(
		q.Name,
		key,
		exchange,
		false,
		nil,
	)
	failOnError(e, "绑定队列失败!")
	q.exchange = exchange
}
// Send 向消息队列发送消息
//Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失败!")
	e = q.channel.Publish(
		"",    //交换
		queue, //路由键
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo: q.Name,
			Body:    []byte(str),
		})
	msg := "向队列:" + q.Name + "发送消息失败!"
	failOnError(e, msg)
}
// Publish 向exchange发送消息
//Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失败!")
	e = q.channel.Publish(
		exchange,
		key,
		false,
		false,
		amqp.Publishing{ReplyTo: q.Name,
			Body: []byte(str)},
	)
	failOnError(e, "向交换机发送消息失败!")
}
// Consume 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定从哪个队列中接收消息
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失败!")
	return c
}
// Close 关闭队列连接
func (q *RabbitMQ) Close() {
	q.channel.Close()
}
//错误处理函数
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

发送端的确认

首先初始化消息队列的时候,我们要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

func New(s string, name string) *RabbitMQ {
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	q, e := ch.QueueDeclare(
		name,  //队列名
		false, //是否开启持久化
		true,  //不使用时删除
		false, //排他
		false, //不等待
		nil,   //参数
	)
	failOnError(e, "初始化消息队列失败!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	// 设置为confirm模式
	mq.channel.Confirm(false)
	return mq
}

然后在封装库中创建一个函数handleConfirm()用于接收来自Borker的回复:

func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
	return q.channel.NotifyPublish(ch)
}

生产者

生产者端在向Broker发送消息的时候,我们使用一个无缓冲的通道来接收来自Broker的回复,然后创建一个协程监听这个无缓冲通道。

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定为topic类型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
	confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
	go handleConfirm(confirm)
	var i int
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
		i++
	}
}
func handleConfirm(confirm <-chan amqp.Confirmation) {
	for {
		select {
		case message := <-confirm:
			fmt.Println("接收到来自Broker的回复:", message)
		}
	}
}

运行结果:

接收到来自Broker的回复: {1 true}
接收到来自Broker的回复: {2 true}
接收到来自Broker的回复: {3 true}
接收到来自Broker的回复: {4 true}
接收到来自Broker的回复: {5 true}

消费端的确认

首先将Consume函数的第三个参数autoAck参数标记为false:

// Consume 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name,
		"",
		false, // 不自动确认消息
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失败!")
	return c
}

在消费者端我们采用公平派遣模式,即队列发送消息给消费者的时候,不再采用轮询机制,而是一个消费者消费完消息之后,会调用Ack(false)函数向队列发送一个回复,队列每次会将消息优先发送给消费完消息的消费者(回复过)。

消费端限流:

实现公平派遣模式我们需要设置消费者端一次只能消费一条消息,之前我们已经进行了封装,直接在消费者端调用即可:

// Qos 配置queue参数
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "无法设置QoS")
}

生产者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定为direct类型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
	i := 0
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
		i = i + 1
	}
}

消费者1

消费者2在消费第三条消息的时候,假设发生了错误,我们调用d.Reject(true)函数让队列重新发送消息。

func main() {
	//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消费一条消息,直到消费完才重新接收
	consumer1.Qos()
	// 队列绑定到exchange
	consumer1.Bind("exchange", "key1")
	//接收消息
	msgs := consumer1.Consume()
	go func() {
		var i int
		for d := range msgs {
			time.Sleep(time.Second * 1)
			log.Printf("Consumer1 received a message: %s", d.Body)
			// 假设消费第三条消息的时候出现了错误,我们就调用d.Reject(true),队列会重新发送消息给消费者
			if i == 2 {
				d.Reject(true)
			} else {
				// 消息消费成功之后就回复
				d.Ack(false)
			}
			i++
		}
	}()
	select {}
}

消费者2

func main() {
	//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消费一条消息,直到消费完才重新接收
	consumer2.Qos()
	// 队列绑定到exchange
	consumer2.Bind("exchange", "key1")
	//接收消息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			time.Sleep(time.Second * 5)
			log.Printf("Consumer2 received a message: %s", d.Body)
			// 消息消费成功之后就回复
			d.Ack(false)
		}
	}()
	select {}
}

运行结果:

# 消费者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"

# 消费者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"

到此这篇关于RabbitMq如何做到消息的可靠性投递的文章就介绍到这了,更多相关RabbitMq消息可靠性投递内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Go并发编程中的错误恢复机制与代码持续执行实例探索

    Go并发编程中的错误恢复机制与代码持续执行实例探索

    这篇文章主要为大家介绍了Go并发编程中的错误恢复机制与代码持续执行实例探索,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • 数据竞争和内存重分配Golang slice并发不安全问题解决

    数据竞争和内存重分配Golang slice并发不安全问题解决

    这篇文章主要为大家介绍了数据竞争和内存重分配Golang slice并发不安全问题解决,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • Go语言使用buffer读取文件的实现示例

    Go语言使用buffer读取文件的实现示例

    本文主要介绍了Go语言使用buffer读取文件的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • go语言中的return语句

    go语言中的return语句

    这篇文章主要介绍了go语言中的return语句,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下,希望对你的学习有所帮助
    2022-05-05
  • 让goland支持proto文件类型的实现

    让goland支持proto文件类型的实现

    这篇文章主要介绍了让goland支持proto文件类型的实现操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Go语言zip文件的读写操作

    Go语言zip文件的读写操作

    本文主要介绍了Go语言zip文件的读写操作,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • go通过benchmark对代码进行性能测试详解

    go通过benchmark对代码进行性能测试详解

    在开发中我们要想编写高性能的代码,或者优化代码的性能时,你首先得知道当前代码的性能,在go中可以使用testing包的benchmark来做基准测试 ,文中有详细的代码示例,感兴趣的小伙伴可以参考一下
    2023-04-04
  • golang结构体与json格式串实例代码

    golang结构体与json格式串实例代码

    本文通过实例代码给大家介绍了golang结构体与json格式串的相关知识,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-10-10
  • go语言中排序sort的使用方法示例

    go语言中排序sort的使用方法示例

    golang中也实现了排序算法的包sort包,下面这篇文章就来给大家介绍了关于go语言中排序sort的使用方法,文中通过示例代码介绍的非常详细,需要的朋友可以参考借鉴,下面随着小编来一起学习学习吧
    2018-06-06
  • Go语言对JSON进行编码和解码的方法

    Go语言对JSON进行编码和解码的方法

    这篇文章主要介绍了Go语言对JSON进行编码和解码的方法,涉及Go语言操作json的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-02-02

最新评论