golang实现redis的延时消息队列功能示例

 更新时间:2019年11月25日 16:08:33   作者:菜菜jjj  
这篇文章主要介绍了golang实现redis的延时消息队列功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言

在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo。

提前准备 安装redis, redis-go

因为用的是macOS, 直接

$ brew install redis
$ go get github.com/garyburd/redigo/redis

又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectId,所以:

$ go get gopkg.in/mgo.v2/bson

唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。

生产者

通过一个for循环生成10w个任务, 每一个任务有不同的时间

func producer() {
 count := 0
 //生成100000个任务
 for count < 100000 {
 count++
 dealTime := int64(rand.Intn(5)) + time.Now().Unix()
 uuid := bson.NewObjectId().Hex()
 redis.Client.AddJob(&job.JobMessage{
 Id: uuid,
 DealTime: dealTime,
 }, + int64(dealTime))
 }
}

其中AddJob函数在另一个包中, 将上一个函数中随机生成的时间作为需要处理的时间戳.

// 添加任务
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}

消费者

消费者处理流程分为两个步骤:

  • 获取小于等于当前时间戳的任务
  • 通过删除当前任务来判断谁获得了当前任务

因为在获取小于等于当前时间戳的任务时,可能有多个go routine同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过redis的删除操作来获取,因为删除指定value时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个go routine拿到了当前的任务。

下面是代码:

// 消费者
func consumer() {
 // 启动10个go routine一起去拿
 count := 0
 for count < 10 {
 go func() {
 for {
 jobs := redis.Client.GetJob()
 if len(jobs) <= 0 {
  time.Sleep(time.Second * 1)
  continue
 }
 currentJob := jobs[0]
 // 如果当前抢redis队列成功,
 if redis.Client.DelJob(currentJob) > 0 {
  var jobMessage job.JobMessage
  util.JsonDecode(currentJob, &jobMessage) //自定义的json解析函数
  handleMessage(&jobMessage)
 }

 }

 }()
 count++
 }
}

// 处理任务用函数
func handleMessage(msg *job.JobMessage) {
 fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
 go func() {
 countChan <- true
 }()
}

redis部分的代码,获取任务和删除任务

// 获取任务
func (client *RedisClient) GetJob() []string {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 timeNow := time.Now().Unix()
 ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
 if err != nil {
 panic(err)
 }
 return ret
}

// 删除当前任务, 用来判断是否抢到了当前任务
func (client *RedisClient) DelJob(value string) int {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 ret, err := redis.Int(conn.Do("zrem", key, value))
 if err != nil {
 panic(err)
 }
 return ret
}

代码大抵如此。最后跑起来之后,大概每3-4秒钟能够处理掉1w个任务,速度上确实是...

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • Go语言中排序的3种实现方法

    Go语言中排序的3种实现方法

    在写代码过程中,排序是经常会遇到的需求,这篇文章主要为大家介绍三种常用的方法,文中的示例代码简洁易懂,需要的小伙伴可以参考下
    2023-08-08
  • Go语言实现JSON解析的神器详解

    Go语言实现JSON解析的神器详解

    php转go是大趋势,越来越多公司的php服务都在用go进行重构,重构过程中,会发现php的json解析操作是真的香。本文和大家分享了一个Go语言实现JSON解析的神器,希望对大家有所帮助
    2023-01-01
  • 详解Go语言如何实现并发安全的map

    详解Go语言如何实现并发安全的map

    go语言提供的数据类型中,只有channel是并发安全的,基础map并不是并发安全的,本文为大家整理了三种实现了并发安全的map的方案,有需要的可以参考下
    2023-12-12
  • golang关闭chan通道的方法示例

    golang关闭chan通道的方法示例

    在go语言中,通道(channel)是一个非常重要的概念,通道提供了一种在不同 goroutine 之间安全地传递数据的方式,在本文中,我们将讨论如何关闭通道以及在关闭通道时需要考虑的事项,需要的朋友可以参考下
    2024-02-02
  • Go中JSON解析时tag的使用

    Go中JSON解析时tag的使用

    本文主要介绍了Go中JSON解析时tag的使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-01-01
  • Go 语言结构实例分析

    Go 语言结构实例分析

    在本篇文章里小编给大家整理的是一篇关于Go 语言结构实例分析的相关知识点,有兴趣的朋友们可以学习下。
    2021-07-07
  • 使用Go重试机制代码更可靠

    使用Go重试机制代码更可靠

    这篇文章主要为大家介绍了使用Go重试机制的使用,使你的代码更加可靠,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • 一文搞懂Golang 时间和日期相关函数

    一文搞懂Golang 时间和日期相关函数

    这篇文章主要介绍了Golang 时间和日期相关函数,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-12-12
  • go语言Timer计时器的用法示例详解

    go语言Timer计时器的用法示例详解

    Go语言的标准库里提供两种类型的计时器Timer和Ticker。这篇文章通过实例代码给大家介绍go语言Timer计时器的用法,代码简单易懂,感兴趣的朋友跟随小编一起看看吧
    2020-05-05
  • Go泛型的理解和使用小结

    Go泛型的理解和使用小结

    泛型是一种非常强大的编程技术,可以提高代码的复用性和可读性,通过泛型容器和类型参数化,Go语言中的泛型可以实现更加灵活和通用的编程,提高代码的复用性和可维护性,本文给大家介绍Go泛型的理解和使用,感兴趣的朋友一起看看吧
    2023-12-12

最新评论