周期性、定时任务

Introduction:引言

Celery beat 是一个调度器;它定期启动任务,然后由集群中的可用工作节点执行。

默认情况下,这些条目取自beat_schedule设置,但也可以使用自定义存储,例如将条目存储在SQL数据库中。

必须确保一个调度只有一个调度程序进行调度,否则任务会重复运行。Using a centralized approach means the schedule doesn’t have to be synchronized, and the service can operate without using locks.

Time Zones:时区

默认情况下周期性任务的调度使用UTC时间,可以通过timezone设置进行更改。

比如:

timezone = 'Europe/London'

该设置必须被添加到app中,不管是直接通过配置指定(app.conf.timezone = 'Europe/London'),还是通过配置模块,如果您已经使用app.config_from_object设置了一个的话。查看Configuration获取更多信息。

默认调度器会自动检测时区是否改变,并自动重载设置。不过其他调度器可能没这么智能(比如Django database 调度器),此时你需要手动重载。


Django Users:

Celery推荐并兼容Django 1.4中引入的新的USE_TZ设置。

对于Django用户,配置中设置的TIME_ZONE会被使用,或者也可以单独给Celery指定timezone。

database 调度器不会自动重载时区配置,因此需要手动操作:

$ python manage.py shell
>>> from djcelery.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)

Django-Celery仅支持Celery 4.0及以下版本,对于Celery 4.0及以上版本,请执行以下操作:

$ python manage.py shell
>>> from django_celery_beat.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)

Entries

要调用周期性(periodically,好像也能翻译成定时任务)任务,您必须在beat调度列表中添加一个条目。

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

@app.task
def add(x, y):
    z = x + y
    print(z)

Setting these up from within the on_after_configureopen in new window handler means that we’ll not evaluate the app at module level when using test.s(). Note that on_after_configureopen in new window is sent after the app is set up, so tasks outside the module where the app is declared (e.g. in a tasks.py file located by celery.Celery.autodiscover_tasks()open in new window) must use a later signal, such as on_after_finalizeopen in new window.

add_periodic_task()open in new window函数会在后台将entry添加到beat_schedule设置中,并且同样的设置也可以用来手动设置周期性任务:

示例:每30s运行一次tasks.add任务。

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

Note:

如果您想知道这些设置应该放在哪里,请参阅Configuration。 您可以直接在您的应用程序上设置这些选项,也可以保留一个单独的模块进行配置。

If you want to use a single item tuple for args, don’t forget that the constructor is a comma, and not a pair of parentheses.

给调度一个timedeltaopen in new window意味着任务将以30秒间隔发送(第一次发送将在beat启动后30秒执行,然后每隔30s执行)

还有一个类似于Crontab的调度器,查看Crontab schedules章节。

类似cron, 如果一个任务在下一个任务到来之前没有结束,则会造成任务重叠。如果这不符合预期,应该使用合适的锁机制来确保特定时间只有一个实例在运行(查看示例:Ensuring a task is only executed one at a timeopen in new window)。

Available Fields:可用字段

Crontab schedules

如果希望更好地控制任务的执行时间,例如,一天中的特定时间或一周中的某一天,则可以使用crontab:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

这些Crontab表达式的语法非常灵活。

以下是一些例子:

ExampleMeaning
crontab()
每分钟执行一次
crontab(minute=0, hour=0)
每天0点0分执行
crontab(minute=0, hour='*/3')
每3小时执行一次: 0点, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,hour='0,3,6,9,12,15,18,21')
和上一个一样
crontab(minute='*/15')
15分钟一次
crontab(day_of_week='sunday')
每周日时每分钟执行一次
crontab(minute='', day_of_week='sun')*',hour='*
同上
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri')
十分钟一次, 但只在周四和周五的3-4 am, 5-6 pm, 以及 10-11 pm执行.
crontab(minute=0, hour='/3')*/2,*
偶数以及可以被3整除的,每小时执行一次. 也就是说: 除了这些每小时执行一次 : 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5')
可以被5整除的,使用24小时格式
crontab(minute=0, hour='*/3,8-17')
8am-5pm每小时执行一次,其余时间可以被3整除的每小时一次
crontab(0, 0, day_of_month='2')
每月第二天执行
crontab(0, 0,day_of_month='2-30/2')
Execute on every even numbered day.
crontab(0, 0,day_of_month='1-7,15-21')
每月第一和第三周执行
crontab(0, 0, day_of_month='11',month_of_year='5')
每年的5月11号执行
crontab(0, 0,month_of_year='*/3')
每季度的第一个月执行

See https://docs.celeryq.dev/en/stable/reference/celery.schedules.html#celery.schedules.crontabopen in new windowcelery.schedules.crontab`` for more documentation.

Solar scheduler

如果你有任务需要根据日出、日落、黎明和黄昏执行,那你应该使用solaropen in new window scheduler:

from celery.schedules import solar

app.conf.beat_schedule = {
    # Executes at sunset in Melbourne
    'add-at-melbourne-sunset': {
        'task': 'tasks.add',
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16),
    },
}

参数是很简单的:solar(event, latitude, longitude)

请确保使用正确的纬度和经度符号:

SignArgumentMeaning
+
纬度
North
-
纬度South
+经度East
-经度West

可能的事件类型包括:

EventMeaning
dawn_astronomical在天空不再完全黑暗的那一刻执行。 这是太阳在地平线以下18度的时候。
dawn_nautical当地平线有足够的阳光和一些物体可以区分时执行; 正式地,当太阳在地平线以下 12 度时。
dawn_civil当有足够的光线可以区分物体时执行,以便可以开始户外活动; 正式地,当太阳在地平线以下 6 度时。Execute when there’s enough light for objects to be distinguishable so that outdoor activities can commence; formally, when the Sun is 6 degrees below the horizon.
sunrise当早晨太阳的上边缘出现在东方地平线上时执行。
solar_noon当天太阳在地平线以上最高时执行。
sunset当晚上太阳的后缘消失在西方地平线上时执行。
dusk_civil在暮光时分(civil twilight)结束时执行,此时物体仍然可以区分并且一些恒星和行星是可见的。 正式地,当太阳低于地平线 6 度时。
dusk_nautical在太阳低于地平线 12 度时执行。 物体不再可分辨,地平线不再是肉眼可见的。
dusk_astronomical在天空完全变暗的那一刻执行; 正式地,当太阳在地平线以下 18 度时。

所有solar事件基于UTC时间,且不受时区设置影响。

在极地地区,太阳可能不会每天都升起或落下。 调度程序能够处理这些情况(即,日出事件不会在太阳没有升起的那一天运行)。 一个例外是 solar_noon,它被正式定义为太阳穿过天子午线的时刻,即使太阳在地平线以下,也会每天发生。

暮光时分被定义为黎明和日出之间的时期以及在日落和黄昏之间。 您可以根据“暮光”来安排活动,具体取决于您对暮光的定义(民用、航海或天文),以及您希望事件发生在暮光的开始还是结束时,使用上面列表中的适当事件 .

Seehttps://docs.celeryq.dev/en/stable/reference/celery.schedules.html#celery.schedules.solaropen in new windowcelery.schedules.solar``for more documentation.

Starting the Scheduler:开始调度

启动celery beat 服务:

$ celery -A proj beat

您还可以通过启用 workers -B 选项将 beat 嵌入到 worker 中,如果您永远不会运行多个 worker 节点,这很方便,但它不常用,因此不建议在生产环境中使用:

$ celery -A proj worker -B

Beat 需要将任务的最后运行时间存储在本地数据库文件中(默认命名为 celerybeat-schedule),因此它需要访问当前目录的写入权限,或者您可以为此文件指定自定义位置:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

Note:

要将beat以守护进程方式运行,参考Daemonization

Using custom scheduler classes:自定义调度器类

可以通过命令行指定一个自定义的调度类(--scheduleropen in new window选项)

默认的调度程序是celery.beat.PersistentScheduleropen in new window,它只跟踪本地shelveopen in new window数据库文件中的最后运行时间。

还有一个django-celery-beatopen in new window扩展,它将schedule存储在Django数据库中,并提供了一个方便的管理界面来管理运行时的周期性任务。

安装并使用这个扩展:

  1. 使用pip安装:

    $ pip install django-celery-beat

  2. 在你的Django项目的settings.py文件中添加django_celery_beat模块到INSTALLED_APPS

    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    

    请注意,模块名称中没有破折号,只有下划线。

  3. 执行migrate来创建相关表

    $ python manage.py migrate

  4. 使用django_celery_beat.schedulers:DatabaseScheduler调度器启动celery beat 服务:

    $ celery -A proj beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

    注意:您也可以直接将其添加为beat_scheduler设置。

  5. 访问Django-Admin界面来设置一些定期任务。