在Django中使用Celery
在Django中使用Celery
Note:
以前版本的 Celery 需要一个单独的库来与 Django 一起工作,但从 3.1 开始不再是这种情况。 Django现在提供现成的支持,因此本文档仅包含集成 Celery 和 Django 的基本方法。 您将使用与非 Django 用户相同的 API,因此建议您先阅读 Celery起步,然后再返回本教程。 当您有一个工作示例时,您可以继续阅读后续步骤指南。
Note:
Celery 5.0.x 支持Django 1.11以及更新版的Django,如果低于1.11 请使用Celery 4.4.x。
要在Django项目中使用Celery,你需要首先定义一个Celery 库(称作"app")
如果你的Django项目目录结构如下:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那么推荐的方式是创建一个 proj/proj/celery.py
模块来定义Celery实例:
import os
from celery import Celery
## Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
## Using a string here means the worker doesn't have to serialize
## the configuration object to child processes.
## - namespace='CELERY' means all celery-related configuration keys
## should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
## Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
然后,你需要在 proj/proj/__init__.py
模块中导入这个app。这可以确保Django启动时app可以被加载,以便 @shared_task
装饰器可以使用它:
## This will make sure the app is always imported when
## Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
需要注意的是,这种结构适合大型项目,对于小型项目你可以在一个模块中定义app和tasks,就像Celery起步中的示例那样。
让我们分解一下发生了什么,首先我们给celery命令行程序设置了DJANGO_SETTINGS_MODULE环境变量的默认值:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
你不需要这一行,但它让你不用总是将设置模块传递给 celery 程序。 它必须始终在创建应用程序实例之前出现,就像我们接下来要做的那样:
app = Celery('proj')
这是我们的库实例,您可以有许多实例,但在使用Django时可能没有理由这样做。
我们还添加了Django设置模块作为Celery的配置源。这意味着我们不必维护多个配置文件,而是直接从Django设置中配置Celery,不过你也可以将它们分开配置:
app.config_from_object('django.conf:settings', namespace='CELERY')
大写的名称空间意味着Celery配置选项必须使用大写,并且以CELERY_
开始,比如说task_always_eager变为CELERY_TASK_ALWAYS_EAGERA,broker_url变为CELERY_BROKER_URL。这同样适用于workers配置,比如worker_concurrency变为CELERY_WORKER_CONCURRENCY。
比如,一个Django项目配置文件可能包括:
settings.py
...
## Celery Configuration Options
CELERY_TIMEZONE = "Australia/Tasmania"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
你也可以将设置作为对象传递,不过更推荐string,这样的话worker可以免去序列化对象的步骤。CELERY_名称空间同样是可选的,不过建议使用(为了避免和其他Django设置冲突)
接下来,可重用应用程序的一个常见做法是在单独的 tasks.py 模块中定义所有任务,Celery 确实有一种自动发现这些模块的方法:
app.autodiscover_tasks()
在上面的行中,按照tasks.py的约定,Celery将自动发现所有已安装应用程序中的任务
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
这种方法你不用将个人模块手动添加到CELERY_IMPORTS设置中去。
最后,debug_task
示例是一个dumps自身请求的任务。这是Celery3.1引入的新的任务选项bind=True
,它可以更简单的引用当前任务实例。
@shared_task
装饰器
使用 您编写的任务可能会存在于可重用的应用程序中,而可重用的应用程序不能依赖于项目本身,因此您也不能直接导入您的应用程序实例。
@Shared_task
装饰器允许您在没有任何具体应用程序实例的情况下创建任务:
demoapp/tasks.py
## Create your tasks here
from demoapp.models import Widget
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
完整的示例代码:https://github.com/celery/celery/tree/master/examples/django/
扩展
django-celery-results - 使用Django ORM/Cache作为结果后端
django-celery-results扩展使用Django ORM或Django缓存框架提供结果后端。
按照以下步骤使用:
$ pip install django-celery-results
在Django项目的settings.py文件中将
django_celery_results
添加到INSTALLED_APPS
中。INSTALLED_APPS = ( ..., 'django_celery_results', )
注意模块名中只有下划线,没有破折号
创建Celery数据库表
$ python manage.py migrate django_celery_results
配置Celery使用django-celery-results后端。
假设你使用Django的
settings.py
配置Celery,添加以下设置:CELERY_RESULT_BACKEND = 'django-db'
对于cache后端:
CELERY_CACHE_BACKEND = 'django-cache'
我们还可以使用django设置中
CACHES
定义的缓存。# celery setting. CELERY_CACHE_BACKEND = 'default' # django setting. CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'my_cache_table', } }
对于附加的配置选项,参考任务结果后端设置
django-celery-beat
- Database-backed Periodic Tasks with Admin interface
查看自定义调度器类获取更多信息。
启动workers进程
生产环境中你可能需要将worker作为守护进程在后台运行,查看Daemonization。但对于测试和开发环境来说,使用celery worker命令启动worker实例更加有用,就像使用manage.py runserver启动Django服务器一样:
$ celery -A proj worker -l INFO
完整的命令行选项列表,可以使用帮助命令:
$ celery help
接下来怎么做
如果想要了解更多,应该继续学习下一步教程,然后你可以学习用户指南。