RabbitMQ延时消息队列在golang中的使用详解

 更新时间:2023年11月01日 08:18:52   作者:zhuyasen  
延时队列常使用在某些业务场景,使用延时队列可以简化系统的设计和开发、提高系统的可靠性和可用性、提高系统的性能,下面我们就来看看如何在golang中使用RabbitMQ的延时消息队列吧

延时队列常使用在某些业务场景,例如订单支付超时、接收到外卖后自动确认完成订单、定时任务、促销过期等,使用延时队列可以简化系统的设计和开发提高系统的可靠性和可用性提高系统的性能。下面介绍使用RabbitMQ的延时消息队列,使用之前先要让RabbitMQ支持延时队列。

在docker安装单机版rabbitMQ

docker-compose.yaml配置文件内容如下:

version: '3'
 
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    hostname: rabbitmq-service
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - $PWD/data:/var/lib/rabbitmq
      - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins
      - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez:/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: /

rabbitMQ默认不支持延时消息队列类型,需要另外安装插件来实现:

启动rabbitmq:

docker-compose up -d

可以在浏览器访问管理后台 http://localhost:15672 ,用户名和密码都是guest

点击菜单【exchange】--> 【Add a new exchange】-->【Type】,在下拉列表中看到x-delayed-message类型的话,说明已经支持延时队列了。

使用延时队列需要指定具体某一种消息类型(direct、topic、fanout、headers),下面以direct类型的延时消息队列为例。

生产端示例代码

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

var (
	url          = "amqp://guest:guest@127.0.0.1:5672/"
	exchangeName = "delayed-message-exchange-demo"
)

func main() {
	conn, err := amqp.Dial(url)
	checkErr(err)
	defer conn.Close()

	ctx := context.Background()

	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	delayedMessageType := "direct"
	exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
	q, err := NewProducer(queueName, conn, exchange)
	checkErr(err)
	defer q.Close()
	for i := 1; i <= 5; i++ {
		body := time.Now().Format("2006-01-02 15:04:05.000") + " hello world " + strconv.Itoa(i)
		err = q.Publish(ctx, time.Second*5, []byte(body)) // 发送消息
		checkErr(err)
		time.Sleep(time.Second)
	}
}

// Exchange 交换机
type Exchange struct {
	Name                string // exchange名称
	Type                string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message
	RoutingKey          string // 路由key
	XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers
}

// NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
	return &Exchange{
		Name:                exchangeName,
		Type:                "x-delayed-message",
		RoutingKey:          routingKey,
		XDelayedMessageType: delayedMessageType,
	}
}

// Producer 生产者对象
type Producer struct {
	queueName string
	exchange  *Exchange
	conn      *amqp.Connection
	ch        *amqp.Channel
}

// NewProducer 实例化一个生产者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
	// 创建管道
	ch, err := conn.Channel()
	if err != nil {
		return nil, err
	}

	// 声明交换机类型
	err = ch.ExchangeDeclare(
		exchange.Name, // 交换机名称
		exchange.Type, //  x-delayed-message
		true,          // 是否持久化
		false,         // 是否自动删除
		false,         // 是否公开,false即公开
		false,         // 是否等待
		amqp.Table{
			"x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers
		},
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 声明队列,如果队列不存在则自动创建,存在则跳过创建
	q, err := ch.QueueDeclare(
		queueName, // 消息队列名称
		true,      // 是否持久化
		false,     // 是否自动删除
		false,     // 是否具有排他性(仅创建它的程序才可用)
		false,     // 是否阻塞处理
		nil,       // 额外的属性
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 绑定队列和交换机
	err = ch.QueueBind(
		q.Name,
		exchange.RoutingKey,
		exchange.Name,
		false,
		nil,
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	return &Producer{
		queueName: queueName,
		conn:      conn,
		ch:        ch,
		exchange:  exchange,
	}, nil
}

// Publish 发送消息
func (p *Producer) Publish(ctx context.Context, delayTime time.Duration, body []byte) error {
	err := p.ch.PublishWithContext(
		ctx,
		p.exchange.Name,       // exchange name
		p.exchange.RoutingKey, // key
		false,                 // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者
		false,                 // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // 如果队列的声明是持久化的,那么消息也设置为持久化
			ContentType:  "text/plain",
			Body:         body,
			Headers: amqp.Table{
				"x-delay": int(delayTime / time.Millisecond), // 延迟时间: 毫秒
			},
		},
	)
	if err != nil {
		return err
	}
	fmt.Printf("[send]: %s\n", body)
	return nil
}

// Close 关闭生产者
func (p *Producer) Close() {
	if p.ch != nil {
		_ = p.ch.Close()
	}
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

消费端示例代码

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

var (
	url          = "amqp://guest:guest@127.0.0.1:5672/"
	exchangeName = "delayed-message-exchange-demo"
)

func main() {
	conn, err := amqp.Dial(url)
	checkErr(err)
	defer conn.Close()

	ctx := context.Background()

	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	delayedMessageType := "direct"
	exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
	c, err := NewConsumer(ctx, queueName, exchange, conn)
	checkErr(err)
	c.Consume() // 消费消息
	defer c.Close()

	fmt.Println("exit press CTRL+C")
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-interrupt
	fmt.Println("exit consume messages")
}

// Exchange 交换机
type Exchange struct {
	Name                string // exchange名称
	Type                string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message
	RoutingKey          string // 路由key
	XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers
}

// NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
	return &Exchange{
		Name:                exchangeName,
		Type:                "x-delayed-message",
		RoutingKey:          routingKey,
		XDelayedMessageType: delayedMessageType,
	}
}

// Consumer 消费者
type Consumer struct {
	ctx       context.Context
	queueName string
	conn      *amqp.Connection
	ch        *amqp.Channel
	delivery  <-chan amqp.Delivery
	exchange  *Exchange
}

// NewConsumer 实例化一个消费者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
	// 创建管道
	ch, err := conn.Channel()
	if err != nil {
		return nil, err
	}

	// 声明交换机类型
	err = ch.ExchangeDeclare(
		exchange.Name, // 交换机名称
		exchange.Type, // 交换机的类型,支持direct、topic、fanout、headers
		true,          // 是否持久化
		false,         // 是否自动删除
		false,         // 是否公开,false即公开
		false,         // 是否等待
		amqp.Table{
			"x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers
		},
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 声明队列,如果队列不存在则自动创建,存在则跳过创建
	q, err := ch.QueueDeclare(
		queueName, // 消息队列名称
		true,      // 是否持久化
		false,     // 是否自动删除
		false,     // 是否具有排他性(仅创建它的程序才可用)
		false,     // 是否阻塞处理
		nil,       // 额外的属性
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 绑定队列和交换机
	err = ch.QueueBind(
		q.Name,
		exchange.RoutingKey,
		exchange.Name,
		false,
		nil,
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 为消息队列注册消费者
	delivery, err := ch.ConsumeWithContext(
		ctx,
		queueName, // queue 名称
		"",        // consumer 用来区分多个消费者
		true,      // auto-ack 是否自动应答
		false,     // exclusive 是否独有
		false,     // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
		false,     // no-wait 是否阻塞
		nil,       // args
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	return &Consumer{
		queueName: queueName,
		conn:      conn,
		ch:        ch,
		delivery:  delivery,
		exchange:  exchange,
	}, nil
}

// Consume 接收消息
func (c *Consumer) Consume() {
	go func() {
		fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
		for d := range c.delivery {
			// 处理消息
			fmt.Printf("%s %s [received]: %s\n", time.Now().Format("2006-01-02 15:04:05.000"), c.exchange.RoutingKey, d.Body)
			// _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack
		}
	}()
}

// Close 关闭
func (c *Consumer) Close() {
	if c.ch != nil {
		_ = c.ch.Close()
	}
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

总结

上面介绍了rabbitMQ延时消息队列简单使用示例,在实际使用中,连接rabbitMQ应该有网络断开重连功能。

rabbitMQ需要依赖插件rabbitmq_delayed_message_exchange,目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万以上)的场景,另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。

如果你采用了 Delayed Message 插件这种方式来实现,对于消息可靠性要求非常高,在发送消息之前可以先保存到 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。

这是在github.com/rabbitmq/amqp091-go基础上封装的 rabbitmq 库,开箱即用各种消息类型(directtopicfanoutheadersdelayed messagepublisher subscriber)。

到此这篇关于RabbitMQ延时消息队列在golang中的使用详解的文章就介绍到这了,更多相关go RabbitMQ延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Go语言通过smtp发送邮件的方法

    Go语言通过smtp发送邮件的方法

    这篇文章主要介绍了Go语言通过smtp发送邮件的方法,涉及Go语言发送邮件的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-02-02
  • GoLang抽奖系统简易实现流程

    GoLang抽奖系统简易实现流程

    这篇文章主要介绍了GoLang抽奖系统实现流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-12-12
  • Golang 空map和未初始化map的注意事项说明

    Golang 空map和未初始化map的注意事项说明

    这篇文章主要介绍了Golang 空map和未初始化map的注意事项说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04
  • Golang处理gRPC请求/响应元数据的示例代码

    Golang处理gRPC请求/响应元数据的示例代码

    前段时间实现内部gRPC框架时,为了实现在服务端拦截器中打印请求及响应的头部信息,便查阅了部分关于元数据的资料,因为中文网络上对于该领域的信息较少,于是在这做了一些简单的总结,需要的朋友可以参考下
    2024-03-03
  • golang实现浏览器导出excel文件功能

    golang实现浏览器导出excel文件功能

    这篇文章主要介绍了golang实现浏览器导出excel文件功能,文章通过golang导出excel文件返回给web,实现浏览器导出excel文件功能,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-03-03
  • Go实现各类限流的方法

    Go实现各类限流的方法

    这篇文章主要介绍了Go实现各类限流的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • golang利用redis和gin实现保存登录状态校验登录功能

    golang利用redis和gin实现保存登录状态校验登录功能

    这篇文章主要介绍了golang利用redis和gin实现保存登录状态校验登录功能,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2024-01-01
  • Go语言的type func()用法详解

    Go语言的type func()用法详解

    在Go语言中,函数的基本组成为:关键字func、函数名、参数列表、返回值、函数体和返回语句,这篇文章主要介绍了Go语言的type func()用法,需要的朋友可以参考下
    2022-03-03
  • Go语言基础学习之Context的使用详解

    Go语言基础学习之Context的使用详解

    在Go语言中,Context是一个非常重要的概念,它用于在不同的 goroutine 之间传递请求域的相关数据,本文将深入探讨Go语言中 Context特性和Context的高级使用方法,希望对大家有所帮助
    2023-05-05
  • 手把手教你如何在Goland中创建和运行项目

    手把手教你如何在Goland中创建和运行项目

    欢迎来到本指南!我们将手把手地教您在Goland中如何创建、配置并运行项目,通过简单的步骤,您将迅速上手这款强大的集成开发环境(IDE),轻松实现您的编程梦想,让我们一起开启这段精彩的旅程吧!
    2024-02-02

最新评论