Celery起步

选择中间件

中间件用来发送消息也可以存储结果数据。常用的包括 rabbitmq 以及 redis。其中 rabbitmq 不能用于存储结果数据。安装方式这里不啰嗦,只记录下连接方式:

broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'
broker_url = 'redis://localhost:6379/0'
broker_url = 'redis://:password@hostname:port/db_number'
result_backend = 'redis://localhost:6379/0'

更多信息参考:

  • 使用 Rabbitmq
  • Broker设置
  • 使用 Redis

第一个程序

创建 tasks.py 文件,内容如下:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

启动 worker,在 tasks.py 所在目录执行以下命令中的一个:

celery -A tasks worker --loglevel=INFO
celery -A tasks.app worker --loglevel=INFO

第一个是官方命令,不过也许在某种情况下启动会失败。如果发生了,可以试试第二种。Celery 会在前台运行,如果需要后台运行的话参考守护程序

保存结果

上述方式并不保存结果,甚至无法查询结果,它只是一个演示。要保存结果,需要配置一个 backend,配置方式以及支持的 backend 参考:任务结果后端设置

现在,修改刚才的代码,添加 backend 配置

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task
def add(x, y):
    return x + y

然后重启 Celery 实例,重新进入 python 终端并导入,然后就可以调用结果了

root@ubuntu:~/proj# python3
Python 3.10.4 (main, Jun 29 2022, 12:14:53) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> s = add.delay(12,33)
>>> s.get(timeout=1)
45

这个示例中我们创建了一个 Celery 的实例,我们称之为 Celery 应用 并命名为 app,它是你在 Celery 中执行所有操作的入口。

The first argument to Celery is the name of the current module. This is only needed so that names can be automatically generated when the tasks are defined in the __main__ module.

The second argument is the broker keyword argument, specifying the URL of the message broker you want to use.

然后我们定义了一个叫做 add 的任务,该任务返回两个数之和。

delay() 方法是 apply_async() 的一个快捷方式,它提供了更好的任务控制。参考调用任务

调用任务后会返回一个 AsyncResult 实例,它可以用来检查任务状态、等待任务结束、或者获取一个返回值,也可以在任务失败时获取异常信息。

Warning: Backends use resources to store and transmit results. To ensure that resources are released, you must eventually call get() or forget() on EVERY AsyncResult instance returned after calling a task.

大概意思是 Backends 的存储和传输都需要消耗(占用)资源。为了确保资源被释放,在每次调用一个 task 后返回的 AsyncResult 实例上,都得调用一次 get()forget()

配置

Celery 像是一个消费级用品,开箱即用。他有一个输入和输出。输入必须连接一个 broker,还有一个可选的输出后端。但是如果你仔细观察这个设备,会发现后面有一个盖子,露出大量滑块、刻度盘和按钮,这就是它的配置。

默认配置对于大多数情况来说已经足够好,不过有许多选项可以让 Celery 满足个性需求。参考:配置和默认配置

你可以通过一个专用模块来对 Celery 进行配置,比如下面这个 task_serializer 设置的示例:

app.conf.task_serializer = 'json'

你也可以使用 update 一次更改多想配置:

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于大型项目,我们更推荐使用集中化的方式来进行配置。可以通过调用 app.config_from_object() 方法进行配置:

app.config_from_object('celeryconfig')

通常我们给这个配置模块命名为 celeryconfig,不过实际上你可以自定义任意名称,上面的示例中,celeryconfig.py 必须可以从当前目录加载。它的内容应该是这样的:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

你可以使用 python -m celeryconfig 来验证该文件是否可以正常工作。更加完整的配置参考配置和默认配置。

为了展示配置文件的强大功能,下面是如何将一个 misbehaving task 路由到一个专用队列:

To demonstrate the power of configuration files, this is how you’d route a misbehaving task to a dedicated queue:

task_routes = {
    'tasks.add': 'low-priority',
}

或者不使用路由而是限制其速率,这样其每分钟只能执行 10 个任务:

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

如果使用 Rabbitmq 或者 Redis,也可以直接在运行时指定新的限制速率:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
    new rate limit set successfully