Application

Celery库在使用的时候必须先实例化,它的示例叫做application,或者app。

application是线程安全的,所以多个不同配置、组件和任务的Celery应用可以同时运行在同一个进程空间中。

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行包括app的类名(Celery),当前的main module(main),以及对象的内存地址(0x100469fd0).

Main Name

只有main module的名称是重要的。下面会解释为什么。

当你在Celery中发送任务消息时,消息内容不包含代码,只要一个你想要执行的任务名。这和主机名在网络上工作的方式类似:每个worker维护一个任务名和他们对应的实际函数的映射表,叫任务注册表(task registry)。

当你定义一个任务,这个任务会被添加到本地注册表中:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

这里又见到了__main__,当Celery检测不到函数属于那个模块时,就会使用main module的名称来生成任务名称的开头。

这只是在有限用例中会出现的问题:

  1. If the module that the task is defined in is run as a program(如果定义任务的模块作为程序运行)。
  2. 当application是在REPL中创建的情况下

例如,这里的tasks模块也用于启动worker使用app.worker_main()

## tasks.py
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

当运行这个模块时,任务会被以"__main__"开头命名,但是当这个模块被别的进程导入并调用时,任务会以"tasks"开头命名:

>>> from tasks import add
>>> add.name
tasks.add

可以给main module指定其他的名称:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

另请参阅:Names

Configuration

可以通过几个选项改变Celery如何工作。这些选项可以直接在app实例中设置,也可以使用专用配置模块。

配置以app.confopen in new window的形式提供:

>>> app.conf.timezone
'Europe/London'

您还可以直接设置配置值:

>>> app.conf.enable_utc = True

或者通过update方法一次更新多个配置:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

配置对象由多个字典组成,这些字典按如下顺序进行查询:

  1. 在运行时所做的更改
  2. 配置模块(如果有)
  3. 默认配置(celery.app.defaultsopen in new window

还可以使用app.add_defaults()open in new window方法添加新的默认

See also:
配置和默认配置章节有完整的配置列表以及默认配置。

config_from_object

app.config_from_object()open in new window方法从配置对象中加载配置。可以是配置文件,或者其他包含配置属性的对象。

注意,调用该函数会重置之前的配置,如果需要额外配置,请在调用该函数之后进行。

Example 1: Using the name of a module

app.config_from_object()open in new window方法的参数可以是完整的Python包名,甚至一个Python属性,比如:celeryconfig,myproj.config.celery或者myproj.config:CeleryConfig

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

celeryconfig 模块可能看起来像下面这样:

## celeryconfig.py
enable_utc = False
timezone = 'Asia/Shanghai'
result_expires = 3600
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
...

import celeryconfg可以导入的时候,app就可以使用这些配置。

Warning:
上面的示例讲道理有点难以理解,不过可以参考下面的写法:

from celery import Celery


app = Celery('proj',         
             include=['proj.tasks'])

## Optional configuration, see the application user guide.
app.config_from_object('proj.celeryconfig')


if __name__ == '__main__':
    app.start()

此时的目录结构为:

root@genshin:~# tree proj
proj
├── __init__.py
├── __pycache__
│   ├── __init__.cpython-310.pyc
│   ├── celery.cpython-310.pyc
│   ├── celeryconfig.cpython-310.pyc
│   └── tasks.cpython-310.pyc
├── celery.py
├── celeryconfig.py
└── tasks.py

Example 2: Passing an actual module object

还可以传递一个已经导入的模块对象,但不总是推荐这么做。

TIP:
建议使用模块的名称,因为这意味着在使用prefork池时不需要序列化该模块。如果您遇到配置问题或Pickle错误,请尝试使用模块的名称。

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

Example 3: Using a configuration class/object

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
## or using the fully qualified name of the object:
##   app.config_from_object('module:Config')

config_from_envvar

。。。。。你以为是从环境变量去取配置?其实是从环境变量取模块名。比如下边这个示例:

import os
from celery import Celery

## 这里应该是设置 CELERY_CONFIG_MODULE 变量的默认值,不过我实测没用。可能是我操作不对。。
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

然后执行:

$ CELERY_CONFIG_MODULE="proj.celeryconfig" celery worker -l INFO

这个其实和config_from_object是一个原理,区别就是这里的config_from_object()是从变量中取参数。至于好处。。好处就是你可以准备多份配置文件,然后执行的时候通过使用不同的变量来使用不同的配置文件。

Censored configuration

注:Censored这个词可以翻译成审查、剔除。

如果您希望将配置打印出来,作为调试信息或其他类似作用,您可能还希望过滤掉密码和API密钥等敏感信息。Celery提供了用来表示配置的实用程序,其中一个是humanize()open in new window:

from proj.celery import app
>>> app.conf.humanize(with_defaults=False, censored=True)
"broker_url: 'redis://127.0.0.1:6379/0'\nenable_utc: True\nresult_backend: 'redis://localhost:6379/1'\ntimezone: 'Europe/London'\ninclude: ['proj.tasks']\ndeprecated_settings: None"

该方法返回一个字符串,默认情况下只返回修改过的配置,如果你想要展示默认配置,将with_default参数设置为True即可。

另外一个方法是table()open in new window,返回一个字典:

from proj.celery import app
>>> app.conf.table(with_defaults=False, censored=True)
{'broker_url': 'redis://127.0.0.1:6379/0', 'enable_utc': True, 'result_backend': 'redis://localhost:6379/1', 'timezone': 'Europe/London', 'include': ['proj.tasks'], 'deprecated_settings': None}

请注意Celery并不能从中剔除所有敏感信息,因为剔除是基于正则表达式进行的。如果想要自定义的敏感信息也被剔除,那么在命名时包含特定关键字以便Celery将其视为secret,关键字列表包括:API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE,只要包含这些字符即可。

Laziness

应用程序实例是惰性的,这意味着只有在实际需要它时才会对其进行运算。

创建一个 Celeryopen in new window 实例只会做以下事情:

  1. 创建用于事件的逻辑时钟实例。
  2. 创建任务注册表。
  3. 将自身设置为当前应用程序(但如果禁用了set_as_current参数,则不会设置)
  4. 回调app.on_init()(默认情况下不执行任何操作)。

app.task()装饰器不会在定义任务时创建任务,相反,它将推迟任务的创建,使其在任务使用时或应用程序完成后进行,此示例展示了在使用任务或访问属性(在本例中为repr())之前不创建任务:

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

Finalization of the app happens either explicitly by calling app.finalize()open in new window – or implicitly by accessing the app.tasks attribute.

Finalizing the object will:

  1. Copy tasks that must be shared between apps

    Tasks are shared by default, but if the shared argument to the task decorator is disabled, then the task will be private to the app it’s bound to.

  2. Evaluate all pending task decorators.

  3. Make sure all tasks are bound to the current app.

    Tasks are bound to an app so that they can read default values from the configuration.

The “default app”

Celery didn’t always have applications, it used to be that there was only a module-based API. A compatibility API was available at the old location until the release of Celery 5.0,but has been removed.

Celery always creates a special app - the “default app”, and this is used if no custom application has been instantiated.

The celery.task module is no longer available. Use the methods on the app instance, not the module based API:

from celery.task import Task   # << OLD Task base class.

from celery import Task        # << NEW base class.

本小节没有看明白。

Breaking the chain

虽然可以依赖于当前设置的应用程序,但最佳实践是始终将应用程序实例传递给需要它的任何东西。

我称之为“app chain”,因为它根据传递的应用程序创建一系列实例。

下面的示例被认为是不好的做法:

from celery import current_app

class Scheduler:

    def run(self):
        app = current_app

它应该将应用程序作为参数:

class Scheduler:

    def __init__(self, app):
        self.app = app

Celery 在内部使用celery.app.app_or_default()open in new window函数,这样所有东西都可以在基于模块的兼容性 API 中工作

from celery.app import app_or_default

class Scheduler:
    def __init__(self, app=None):
        self.app = app_or_default(app)

在开发中,您可以设置CELERY_TRACE_APP环境变量,以便在应用程序链中断时引发异常:

$ CELERY_TRACE_APP=1 celery worker -l INFO

API的演化:

Celery自从2009年被制作之后。

例如,在开始时,可以将任何可调用的内容用作任务:

def hello(to):
    return 'hello {0}'.format(to)

>>> from celery.execute import apply_async

>>> apply_async(hello, ('world!',))

或者,您也可以创建一个任务类来设置某些选项,或覆盖其他行为

from celery import Task
from celery.registry import tasks

class Hello(Task):
    queue = 'hipri'

    def run(self, to):
        return 'hello {0}'.format(to)
tasks.register(Hello)

>>> Hello.delay('world!')

Later, it was decided that passing arbitrary call-able's was an anti-pattern, since it makes it very hard to use serializers other than pickle, and the feature was removed in 2.0, replaced by task decorators:

from celery import app

@app.task(queue='hipri')
def hello(to):
    return 'hello {0}'.format(to)

Abstract Tasks(任务摘要

所有使用app.task()open in new window装饰器创建的任务都是从应用的基础Taskopen in new window类继承的

也可以通过base参数指定一个不同的基类:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

若要创建自定义任务类,应从中立基类继承:

celery.Task.

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return self.run(*args, **kwargs)

提示:

如果你重写了task's __call__方法,然后,您还需要调用self.run来执行任务主体,这一点非常重要。不要调用super().__call__中立基类celery.Task__call__方法仅供参考

For optimization,this has been unrolled into celery.app.trace.build_tracer.trace_task
which calls run directly on the custom task class if no __call__method is defined.

中立基类是特殊的,因为它还没有绑定到任何特定的app。一旦任务被绑定到app,它就会读取配置以设置默认值,依此类推。

要实现一个基类,你需要使用app.task()open in new window装饰器创建一个任务

@app.task(base=DebugTask)
def add(x, y):
    return x + y

甚至可以通过更改应用程序的 app.Task()open in new window属性来更改应用程序的默认基类:

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]