golang gin 监听rabbitmq队列无限消费的案例代码
更新时间:2022年12月01日 10:12:09 作者:lj907722644
这篇文章主要介绍了golang gin 监听rabbitmq队列无限消费,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
golang gin 监听rabbitmq队列无限消费
连接rabbitmq
package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func InitRabbitmq() { var err error RabbitConn, err = amqp.Dial(config.Config.RabbitUrl) if err != nil { log.Println("连接RabbitMQ失败") panic(err) } RabbitChannel, err = RabbitConn.Channel() if err != nil { log.Println("获取RabbitMQ channel失败") panic(err) } } // 0表示channel未关闭,1表示channel已关闭 func CheckRabbitClosed(ch amqp.Channel) int64 { d := reflect.ValueOf(ch) i := d.FieldByName("closed").Int() return i }
创建生产者
package service import ( "encoding/json" "github.com/streadway/amqp" "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Producer() { // 声明队列,没有则创建 // 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列) declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil) if err != nil { log.Printf("声明队列 %v 失败, error: %v", config.Config.HawkSaveQueueName, err) panic(err) } request := model.Request{} marshal, _ := json.Marshal(request ) // exchange、routing key、mandatory、immediate err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(marshal), }) if err != nil { log.Printf("生产者发送消息失败, error: %v", err) } else { log.Println("生产者发送消息成功") } }
创建消费者
package service import ( "encoding/json" "log" "os" "strings" "sync" "time" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Consumer() { // 声明队列,没有则创建 // 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列) _, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil) if err != nil { log.Printf("声明队列 %v 失败, error: %v", config.Config.QueueName, err) panic(err) } // 队列名称、consumer、auto-ack、是否独享 // deliveries是一个管道,有消息到队列,就会消费,消费者的消息只需要从deliveries这个管道获取 deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil) if err != nil { log.Printf("从队列 %v 获取数据失败, error: %v", config.Config.QueueName, err) } else { log.Println("从消费队列获取任务成功") } // 阻塞住 for { select { case message := <-deliveries: closed := database.CheckRabbitClosed(*database.RabbitChannel) if closed == 1 { // channel 已关闭,重连一下 database.InitRabbitmq() } else { msgData := string(message.Body) request := model.Request{} err := json.Unmarshal([]byte(msgData), &request) if err != nil { log.Printf("解析rabbitmq数据 %v 失败, error: %v", msgData, err) } else { // TODO... // 处理逻辑 } } } } }
main方法协程调用
package main import ( "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/router" "yy-data-processing/service" ) func main() { // 初始化路由 routers := router.InitRouters() // 初始化RabbitMQ database.InitRabbitmq() go service.Producer() go service.Consumer() port := config.Config.Port if err := routers.Run(":" + port); err != nil { log.Printf("启动服务失败: ", err) } }
到此这篇关于golang gin 监听rabbitmq队列无限消费的文章就介绍到这了,更多相关golang监听rabbitmq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
golang服务报错: write: broken pipe的解决方案
在开发在线客服系统的时候,看到日志里有一些错误信息,下面这篇文章主要给大家介绍了关于golang服务报错: write: broken pipe的解决方案,需要的朋友可以参考下2022-09-09
最新评论