基于Golang设计一套可控的定时任务系统

 更新时间:2023年07月25日 14:38:24   作者:第八共同体  
这篇文章主要为大家学习介绍了如何基于Golang设计一套可控的定时任务系统,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

现在的系统设计中,有许多规律性的功能特征需要用到定时任务来完成,比如每分钟需要执行一次清理数据的任务,每个月的第一天,需要处理一项什么任务等等这种,还有一种规律性的任务不是以时间间隔为第一维度切割的,而是如果任务执行完成,不管成功与否,都间隔一段时间执行一次任务等等。

像上面说描述的任务的特征,都需要我们去周期性的执行任务主体,如果没办法对定时任务进行严格的控制管理,在生产环境下是非常危险的。比如上周发生的一起生产事故,定时任务去拉取某服务器的数据,因为程序异常,导致产生庞大的协程拉取对方数据,致使对方数据库崩溃。

所以在享受一件技术带来好处的同时,要尝试在可控的范围内使用,才是我们调库人员的基本操守,接下里,我们就一起探究一下,我是如何设计一套可控的定时任务系统的。

功能点:

  • 任务的自动注册
  • 任务的信息管理,包括任务的cron表达式,任务的详情信息备注
  • 手动控制任务的启动和执行
  • 任务的实例管理,可查看单次运行实例的任务日志
  • 任务优先级
  • 保证至少执行一次
  • ...

整体结构如下:

系统分为四个主要部分,从下到上,依次为:

  • 任务的执行体,完成任务的主要功能,实现为单个rpc方法,方便进行独立部署,或者手动执行指定次数时进行单独调用的逻辑实现
  • Worker服务节点,该节点单独部署,对所有调度的任务进行执行
  • Schedule服务节点,该节点也是单独部署,对Cronweb管理的所有任务进行同步
  • Cronweb节点,任务的信息管理能力,可手动停止任务的执行,可调整任务的执行cron表达式,可定制任务执行的超时重启次数等等信息

最终,在调研了大多数任务管理的库后,选择了asynq.选择什么库,亦或是自己造轮子,都是根据业务需求来定的,以能满足功能需求为首要标准。

Asynq是一个用于排队任务并与woker异步处理的库。它由Redis支持,设计为可扩展但易于启动。

大致概述Asynq的工作方式:

  • Client将任务放在队列上
  • Server从队列中删除任务,并为每个任务启动一个woker goroutine
  • 任务是由多名woker同时处理的

任务队列被用作跨多个计算机分配工作的机制。系统可以由多个worker servers 和brokers组成,使位于高可用性和水平规模。

上面两段话为asynq的简单介绍,刚看起来可能会有点抽象,那接下来,我们先来详细的介绍一下这个库,然后看看我们是如何对它进行封装和调整的。

看一个简单的实例

package main
import (
    "log"
    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: redisAddr},
        asynq.Config{
            // Specify how many concurrent workers to use
            Concurrency: 10,
            // Optionally specify multiple queues with different priority.
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
            // See the godoc for other configuration options
        },
    )
    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...
    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

上面这块代码实现了一个简单的server,可以看到,

  • asynq库深度使用redis组件,所以你的系统如何没有接入redis那就要重新考虑了,当然现在的系统中redis的使用还是很普遍的。继续看,实例化Server时,可以指定有多少个并发任务来处理调度的task.此项可根据运行调整进行调整。
  • 另外的Queues是队列的配置,此处提供了三个队列,这里的队列你可以根据自己的需要进行调整,值为队列的优先级。
  • 此处需要注意的是,如果你指定了严格模式时,低优先级的任务,只有等到高优先级的任务执行完成后,才能执行。如果高优先级的队列中一直有任务,那么低优先级的任务可能会得不到执行的机会。默认的严格模式并没有打开,如果你需要此模式,进需要提供Config结构体中StrictPriority属性设置为true.
  • 然后就是类似于http server中的路由设置,你可在这里指定需要的不同任务
  • 最后启动了一个asynq的Server

任务的执行逻辑

package tasks
import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
    "github.com/hibiken/asynq"
)
// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)
type EmailDeliveryPayload struct {
    UserID     int
    TemplateID string
}
type ImageResizePayload struct {
    SourceURL string
}
//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeEmailDelivery, payload), nil
}
func NewImageResizeTask(src string) (*asynq.Task, error) {
    payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
    if err != nil {
        return nil, err
    }
    // task options can be passed to NewTask, which can be overridden at enqueue time.
    return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}
//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    var p EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
    // Email delivery code ...
    return nil
}
// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}
func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    var p ImageResizePayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Resizing image: src=%s", p.SourceURL)
    // Image resizing code ...
    return nil
}
func NewImageProcessor() *ImageProcessor {
	return &ImageProcessor{}
}

上面的代码定义了不同的任务类型,以及任务在调度过程中的Payload信息。这个payload是一个可以利用的点,在你需要实现更加高级的控制能力的时候,这里我们先简单的看看如何使用的即可。

NewEmailDeliveryTask

NewImageResizeTask

其实就是定义了两个不同的任务类型,在实现时,你可以指定不同的属性,比如最大的重试次数,以及单个任务的实例的超时时间等等。

接下来的两段代码,我摘出来着重说一下:

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    var p EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
    // Email delivery code ...
    return nil
}
// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}
func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    var p ImageResizePayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Resizing image: src=%s", p.SourceURL)
    // Image resizing code ...
    return nil
}
func NewImageProcessor() *ImageProcessor {
	return &ImageProcessor{}
}

这其实就是说明了如何将一个task对象转换为可以在Server中进行路由注册的工具方法。你可以把它当做是http HandlerFunc一样对待,当然了,你也可以使用第二种方式,自己定义一个结构体,然后实现ProcessTask方法.

Client如何让任务进行调度的,三种不同场景下的使用方式

package main
import (
    "log"
    "time"
    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()
    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------
    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------
    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------
    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

上面这部分代码实现了三种场景下的使用方法,你可以立即调用,也可以在未来的某个时间点调用,还可以更加详尽的控制任务执行。

上面的代码仅仅是关于asynq的简单的一个介绍。在生产环境下,如何使用呢,一般情况下,我们会提供一个provider.provider来提供配置的源,源可以是文件,也可以是Mysql还可以是其他存储源,最重要的是需要实现对应的方法。下面一个文件源为例来说明如何实现一个源

// FileBasedConfigProvider implements asynq.PeriodicTaskConfigProvider interface.
type FileBasedConfigProvider struct {
        filename string
}
type PeriodicTaskConfigContainer struct {
        Configs []*Config `yaml:"configs"`
}
type Config struct {
        Cronspec string `yaml:"cronspec"`
        TaskType string `yaml:"task_type"`
}  
// Parses the yaml file and return a list of PeriodicTaskConfigs.
func (p *FileBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
        data, err := os.ReadFile(p.filename)
        if err != nil {
                return nil, err
        }
        var c PeriodicTaskConfigContainer
        if err := yaml.Unmarshal(data, &c); err != nil {
                return nil, err
        }
        var configs []*asynq.PeriodicTaskConfig
        for _, cfg := range c.Configs {
                configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.Cronspec, Task: asynq.NewTask(cfg.TaskType, nil)})
        }
        return configs, nil
}

如何使用

        provider := &FileBasedConfigProvider{filename: "./periodic_task_config.yml"}
        mgr, err := asynq.NewPeriodicTaskManager(
                asynq.PeriodicTaskManagerOpts{
                        RedisConnOpt: asynq.RedisClientOpt{
                                Addr:     "127.0.0.1:6379",
                                Password: "123456",
                                DB:       1,
                        },
                        PeriodicTaskConfigProvider: provider,         // this provider object is the interface to your config source
                        SyncInterval:               10 * time.Second, // this field specifies how often sync should happen
                })
        if err != nil {
                log.Fatal(err)
        }
        if err := mgr.Run(); err != nil {
                log.Fatal(err)
        }

在任务调度的逻辑中,我们一般会实现一个调度器来进行任务调度,而不是单个任务进行独立的调度,例如:

        loc, err := time.LoadLocation("Asia/Shanghai")
        if err != nil {
            panic(err)
        }
        scheduler := asynq.NewScheduler(
              asynq.RedisClientOpt{
                      Addr:     "127.0.0.1:6379",
                      Password: "123456",
                      DB:       1,
              },
              &asynq.SchedulerOpts{
                      Location: loc,
              },
         )
         task := asynq.NewTask("example_task", nil)
         // You can use cron spec string to specify the schedule.
         entryID, err := scheduler.Register("*/1 * * * *", task)
         if err != nil {
             log.Fatal(err)
         }
         fmt.Println(entryID)
         if err := scheduler.Run(); err != nil {
              log.Fatal(err)
         }

该库还提供了一个工具用于监控任务的运行情况。Asynqmon是一个基于web的工具,用于监控和管理Asynq队列和任务。下面是Web UI的一些截图

以上就是asynq的全部介绍,下面,看看我对任务系统的改动

首先,增加任务的注册和发现

  • 客户端在启动时,注册节点信息到etcd,并且注册该rpc服务器提供的所有远程调用方法,供web页面建立cron任务时,进行选择执行相应的定时方法,如果采用任务对列不需要提供调用方法,框架自己提供
  • 调度器定期拉取etcd注册信息,维护web端和客户端的rpc连接池,方便web端调度器进行rpc连接和rpc调用

订阅任务取消的信号,对任务取消的信号进行处理,在页面增加任务启停的功能

pubsub := AsyncClient.Subscribe(context.Background(), "asynq:cancel")
	cancelCh := pubsub.Channel()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	now := time.Now()
	ch1 := make(chan int)
	stop := make(chan bool)
	go func(ctx context.Context, xxx string) {
		err := xxxx
		if err != nil {
			ch1 <- 2
			logx.Error(fmt.Sprintf("本次定时任务执行失败,失败原因: %s", err))
			return
		}
		 for i := 0; i < 100; i++ {
		 	select {
		 	default:
		 		time.Sleep(1 * time.Second)
		 		logx.Info("我是任务主流程,我在运行中...")
		 	case <-stop:
		 		logx.Info("收到退出信号,任务结束...")
		 		return
		 	}
		 }
		usedTime := time.Since(now)
		msg := fmt.Sprintf("本次定时任务成功结束执行,用时: %f秒", usedTime.Seconds())
		// Signal the goroutine has completed
		ch1 <- 1
	}(ctx, in.Args)
	for {
		select {
		case cd := <-ch1:
			if cd == 1 {
				return &CommonReply{Message: "本次定时任务成功结束执行"}, nil
			} else {
				return &CommonReply{Message: "本次定时任务执行失败"}, nil
			}
		case msg := <-cancelCh:
			if msg.Payload == in.TaskId {
				// stop <- true
				respMsg := fmt.Sprintf("任务[%s]已被取消.Exist.", msg.Payload)
				logx.Info(respMsg)
				return &CommonReply{Message: respMsg}, nil
			}
		}
	}

针对自身系统的功能需求,可以对asynq库已有的功能进行更多的丰富和改进,同时,你也可以借鉴其他优秀的库,对现有的库进行丰富和改造。

以上就是基于Golang设计一套可控的定时任务系统的详细内容,更多关于Golang定时任务系统的资料请关注脚本之家其它相关文章!

相关文章

  • 解决golang时间字符串转time.Time的坑

    解决golang时间字符串转time.Time的坑

    这篇文章主要介绍了解决golang时间字符串转time.Time的坑,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04
  • Go实现文件上传和下载

    Go实现文件上传和下载

    这篇文章主要为大家详细介绍了Go实现文件上传和下载,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-07-07
  • golang语言中wasm 环境搭建的过程详解

    golang语言中wasm 环境搭建的过程详解

    将 golang 打包为 WASM,通常有两种打包方式,一种是 golang 自带的,另外是使用 tinygo ,接下来通过本文给大家介绍golang语言中wasm 环境搭建的过程,感兴趣的朋友一起看看吧
    2021-11-11
  • golang jsoniter extension 处理动态字段的实现方法

    golang jsoniter extension 处理动态字段的实现方法

    这篇文章主要介绍了golang jsoniter extension 处理动态字段的实现方法,我们使用实例级别的 extension, 而非全局,可以针对不同业务逻辑有所区分,jsoniter 包提供了比较完善的定制能力,通过例子可以感受一下扩展性,需要的朋友可以参考下
    2023-04-04
  • 利用Go语言快速实现一个极简任务调度系统

    利用Go语言快速实现一个极简任务调度系统

    任务调度(Task Scheduling)是很多软件系统中的重要组成部分,字面上的意思是按照一定要求分配运行一些通常时间较长的脚本或程序。本文将利用Go语言快速实现一个极简任务调度系统,感兴趣的可以了解一下
    2022-10-10
  • 一键定位Golang线上服务内存泄露的秘籍

    一键定位Golang线上服务内存泄露的秘籍

    内存泄露?别让它拖垮你的Golang线上服务!快速掌握Go语言服务内存泄漏排查秘籍,从此问题无处遁形,一文读懂如何精准定位与有效解决Golang应用中的内存问题,立即阅读,让性能飞升!
    2024-01-01
  • Go语言实现常见限流算法的示例代码

    Go语言实现常见限流算法的示例代码

    计数器、滑动窗口、漏斗算法、令牌桶算法是我们常见的几个限流算法,本文将依次用Go语言实现这几个限流算法,感兴趣的可以了解一下
    2023-05-05
  • Golang连接Redis数据库的方法

    Golang连接Redis数据库的方法

    这篇文章主要介绍了Golang连接Redis数据库的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • Go 语言简单实现Vigenere加密算法

    Go 语言简单实现Vigenere加密算法

    这篇文章主要介绍了Go语言简单实现Vigenere加密算法,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下
    2022-09-09
  • Golang并发编程之Channel详解

    Golang并发编程之Channel详解

    传统的并发编程模型是基于线程和共享内存的同步访问控制的,共享数据受锁的保护,使用线程安全的数据结构会使得这更加容易。本文将详细介绍Golang并发编程中的Channel,,需要的朋友可以参考下
    2023-05-05

最新评论