聊聊通过celery_one避免Celery定时任务重复执行的问题

 更新时间:2021年10月31日 10:44:36   作者:吾星喵  
Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,今天通过本文给大家介绍通过celery_one避免Celery定时任务重复执行的问题,感兴趣的朋友一起看看吧

在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。

参考:https://www.jb51.net/article/226849.htm

有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。

Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={'graceful': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

另外如果要手动设置任务的 key,可以指定 keys 参数

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

解决步骤

Celery One允许你将Celery任务排队,防止多次执行

安装

pip install -U celery_once

要求,需要Celery4.0,老版本可能运行,但不是官方支持的。

使用celery_once,tasks需要继承一个名为QueueOnce的抽象base tasks

Once安装完成后,需要配置一些关于ONCE的选项在Celery配置中

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')

# 一般之前的配置没有这个,需要添加上
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

# 在原本没有参数的里面加上base
@celery.task(base=QueueOnce)
def slow_task():
    sleep(30)
    return "Done!"

要确定配置,需要取决于使用哪个backend进行锁定,查看Backends

在后端,这将覆盖apply_async和delay。它不影响直接调用任务。

在运行任务时,celery_once检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发AlreadyQueued异常。

example.delay(10)
example.delay(10)
Traceback (most recent call last):
    ..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
    ..
AlreadyQueued()

graceful:如果在任务的选项中设置了once={'graceful': True},或者在运行时设置了apply_async,则任务可以返回None,而不是引发AlreadyQueued异常。

from celery_once import AlreadyQueued
# Either catch the exception,
try:
    example.delay(10)
except AlreadyQueued:
    pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
    sleep(30)
    return "Done!"

其他功能请访问:https://pypi.org/project/celery_once/

到此这篇关于通过celery_one避免Celery定时任务重复执行的文章就介绍到这了,更多相关Celery定时任务重复执行内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python小练习之爬鱿鱼游戏的评价生成词云

    python小练习之爬鱿鱼游戏的评价生成词云

    读万卷书不如行万里路,只学书上的理论是远远不够的,只有在实战中才能获得能力的提升,本篇文章手把手带你用Python爬取热火的鱿鱼游戏评价,大家可以在过程中查缺补漏,提升水平
    2021-10-10
  • python 中如何获取列表的索引

    python 中如何获取列表的索引

    这篇文章主要介绍了python 中如何获取列表的索引,在文中给大家提到了python 返回列表中某个值的索引,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-07-07
  • python爬取w3shcool的JQuery课程并且保存到本地

    python爬取w3shcool的JQuery课程并且保存到本地

    本文主要介绍python爬取w3shcool的JQuery的课程并且保存到本地的方法解析。具有很好的参考价值。下面跟着小编一起来看下吧
    2017-04-04
  • 一文掌握python中的时间包

    一文掌握python中的时间包

    这篇文章主要介绍了python中的时间包,主要包括datetime时间包,获取当前时间,获取时间间隔及时间对象转时间字符串的相关知识,本文通过示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-08-08
  • Python实现读写INI配置文件的方法示例

    Python实现读写INI配置文件的方法示例

    这篇文章主要介绍了Python实现读写INI配置文件的方法,结合实例形式分析了Python针对ini配置文件的读写操作类定义及使用方法,需要的朋友可以参考下
    2018-06-06
  • Python的Twisted框架中使用Deferred对象来管理回调函数

    Python的Twisted框架中使用Deferred对象来管理回调函数

    当说起Twisted的异步与非阻塞模式等特性时,回调函数的使用在其中自然就显得不可或缺,接下来我们就来看Python的Twisted框架中使用Deferred对象来管理回调函数的用法.
    2016-05-05
  • python爬虫实战之爬取京东商城实例教程

    python爬虫实战之爬取京东商城实例教程

    这篇文章主要介绍了python爬取京东商城的相关资料,文中通过爬取一个实例页面进行了讲解,通过示例代码和图文介绍的非常详细,相信对大家具有一定的参考价值,需要的朋友们下面来一起学习学习吧。
    2017-04-04
  • python绘制规则网络图形实例

    python绘制规则网络图形实例

    今天小编大家分享一篇python绘制规则网络图形实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-12-12
  • Python使用os.listdir和os.walk获取文件路径

    Python使用os.listdir和os.walk获取文件路径

    这篇文章主要介绍了Python使用os.listdir和os.walk获取文件路径,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Python采集某度贴吧排行榜实战示例

    Python采集某度贴吧排行榜实战示例

    这篇文章主要为大家介绍了Python采集某度贴吧排行榜实战示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04

最新评论