Tasks

任务是构成Celery应用的积木。

任务是可以从任何可调用对象创建的类。它扮演双重角色,因为它既定义了调用任务(发送消息)时发生的事情,也定义了当Worker收到该消息时发生的事情。

每个任务类都有一个独一无二的名称,这个名称在消息中引用以便于worker可以找到正确的函数执行。

队列中的任务在worker acknowledgedopen in new window之前不会被删除。worker可以提前储存多条消息,即便worker被终止、或者电源故障或是其他原因造成的worker意外终止,消息都会被重新调度到其他worker。

理想情况下,任务函数应该是幂等open in new window(幂等性)的:这意味着函数不会产生预期之外的后果,即便使用相同参数执行多次。由于worker无法检测到你的任务是否是幂等的,默认行为是在消息执行之前提前确认消息,这样,已经开始的任务调用永远不会再次执行。

如果你的任务是幂等的,你可以设置acks_late选项 to have the worker acknowledge the message after the task returns
instead. See also the FAQ entry Should I use retry or acks_late?open in new window.

请注意,如果执行任务的子进程终止(通过调用sys.exit()open in new window或者通过发送signal),worker将acknowledge该消息,即时acks_late是开启的。这个行为是故意设置的,因为:

  1. 我们不希望重新运行任务,通过内核强制发送一个SIGSEGV (segmentation fault)或其他类似信号给进程。
  2. 我们假设系统管理员故意终止该任务,而不希望它自动重新启动。
  3. 分配太多内存的任务有触发内核OOM KILL的危险,同样的情况可能会再次发生。
  4. 在重新传递时总是失败的任务可能会导致高频消息循环导致系统崩溃。

如果您确实希望在这些情况下重新交付任务,请考虑开启task_reject_on_worker_lost设置

Warning:

无限期阻塞的任务最终可能会阻止工作实例执行任何其他工作。
如果您的任务包含I/O操作,那么请确保为这些操作添加超时,比如使用requestsopen in new window库发送一个web请求时添加超时:

connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))

时间限制便于保证所有任务及时返回,不过时间限制事件实际上会强制杀死进程,因此,只使用它们来检测尚未使用手动超时的情况。

早期版本中,对长时间任务的调度并不友好,因此如果有那种需要运行分钟或小时的任务,建议开启celery worker-Ofairopen in new window命令行参数。不过,从4.0开始,-Ofair已经是默认策略。查看Prefetch Limits获取更多信息,并且将长时间任务和段时间任务分配给专用的worker来获取最佳性能(Automatic routing)。

如果您的worker挂起,那么请在提交issue之前检查正在运行的任务,因为挂起很可能是由一个或多个挂在网络操作上的任务引起的。

本章包含所有关于任务定义的内容。

Basics

您可以通过使用app.task()open in new window装饰器轻松地从任何可调用对象创建任务:

from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

有许多可配置的选项,可以通过给装饰器提供参数来进行配置:

@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)

如何导入任务装饰器?

如何导入装饰器?以及什么是"app"?

任务装饰器在Celery应用实例上,如果不清楚,请参考Celery起步。

如果使用Django(参考在Django中使用Celery),如果你正在编写某个库,你可能想使用shared_task()装饰器:

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

Multiple decorators:多重装饰器

多重装饰器

当使用多个装饰器的时候,确保任务装饰器在最后边(在Python中意味着处于列表第一个:

@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

Bound tasks:绑定任务

绑定任务意味着任务的第一个参数将始终是任务实例(Self),just like Python bound methods:

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)

绑定任务需要retries(使用app.Task.retry()open in new window),用于访问有关当前任务请求的信息,以及用于添加到自定义任务基类的任何附加功能。

任务继承

通过给任务装饰器指定base参数来指定任务基类:

import celery

class MyTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@app.task(base=MyTask)
def add(x, y):
    raise KeyError()

Names

每个任务都必须有一个独一无二的名字。

如果没有显示指定任务名称,那么装饰器会自动生成一个,任务名基于以下两点:

  1. 模块中定义的任务
  2. task函数的名称

显示指定名称的示例:

>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y

>>> add.name
'sum-of-two-numbers'

最佳实践是使用模块名作为名称空间,这种方式可以避免命名冲突。

>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

可以通过调用.name属性确定任务的名称:

>>> add.name
'tasks.add'

这个tasks.add就是通过在tasks.py 模块中定义的任务自动给我们生成的。

## tasks.py

@app.task
def add(x, y):
    return x + y
>>> from tasks import add
>>> add.name
'tasks.add'

Note:

你可以使用 inspect 命令来查看已注册的任务名称。

查看命令行管理工具(inspect/control)章节中的 inspect registered 命令

更改自动命名行为

4.0新增特性

有许多场景不适合用自动命名。想象一下在许多不同的模块中包含许多任务:

project/
    /__init__.py
    /celery.py
    /moduleA/
        /__init__.py
        /tasks.py
    /moduleB/
        /__init__.py
        /tasks.py

使用自动命名的话可能会生成一堆类似moduleA.tasks.taskA, moduleA.tasks.taskB, moduleB.tasks.test 等等。你可能想要在所有任务命中去掉'tasks',你可给像上面说的给所有任务指定任务名,或者可以通过重写app.gen_task_name()open in new window修改自动命名规则。继续本例,celery.py可能包含:

from celery import Celery

class MyCelery(Celery):

    def gen_task_name(self, name, module):
        if module.endswith('.tasks'):
            module = module[:-6]
        return super().gen_task_name(name, module)

app = MyCelery('main')

上述操作之后,任务名会变为moduleA.taskA, moduleA.taskB, moduleB.test 等等

Warning:

确保app.gen_askname()是一个纯函数:这意味着对于相同的输入,它必须始终返回相同的输出。

任务请求

app.Task.requestopen in new window包含与当前执行的任务相关的信息和状态。

request定义了下列属性:

属性名属性描述
id正在执行的任务的id,该id是独一无二的
group
任务组的ID,该id是独一无二的(如果此任务是成员)。
chordThe unique id of the chord this task belongs to (if the taskis part of the header).
correlation_id用于类似消除重复数据的自定义ID
args位置参数
kwargs关键字参数
origin发送该任务的主机名
retries当前任务重试次数。从0开始计数
is_eager设置为True任务将在本地执行而不是交由worker执行。
eta任务的最初预期时间(如果有的话)。使用UTC时间(取决于enable_utc中的配置)。
expires任务的最初过期时间(如果有的话)。使用UTC时间(取决于enable_utc中的配置)。
hostname执行任务的节点的主机名
delivery_infodelivery的附加信息。This is a mapping containing the exchange and routing key used to deliver this task。app.Task.retry()open in new window在重新发送消息到相同目标队列时会用到。该字典中的key是否可用取决于所使用的的broker。
reply-to要将响应返回使用的队列的名称(例如,与RPC结果后端一起使用)
called_directly如果任务不是由worker执行的,该项会被标记为true
timelimit此任务的当前(软、硬)时间限制的元组(如果有)
callbacks此任务成功返回时要调用的签名列表。
errback此任务失败时要调用的签名列表。
utc开启UTC之后,该项为true。
3.1新增特性
headers与此任务消息一起发送的headers(可能为None,headers为字典格式)。
reply_to将响应发送到何处(队列名)
correlation_id通常与任务 id 相同,在 amqp 中用于跟踪响应。
4.0新增特性
root_idThe unique id of the first task in the workflow this taskis part of (if any).
parent_idThe unique id of the task that called this task (if any).
chainReversed list of tasks that form a chain (if any).The last item in this list will be the next task to succeed the current task. If using version one of the task protocol the chain tasks will be in request.callbacks instead.
5.2新增特性
properties与此任务消息一起接收的消息属性的映射(可能是None 或者**{}** )
replaced_task_nesting任务被替换的次数,如果有的话。(可以是0)

示例

An example task accessing information in the context is:

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
            self.request))

绑定参数意味着该函数将是一个“bound method”,以便您可以访问任务类型实例上的属性和方法。

Logging

worker会自动设置日志记录,也可以手动配置。

有一个名为celery.task的特殊的日志记录器可用,您可以从该记录器继承,以自动获取任务名称和唯一ID作为日志的一部分。

最佳实践是在模块最上方定义一个给所有任务使用的公共的日志记录器。

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Celery使用标准的Python日志记录库,文档在这儿open in new window

也可以使用print()open in new window,因为写入标准输出/错误的任何内容都将重定向到日志记录系统(可以关闭该行为,查看worker_redirect_stdouts)

Note:

如果您在任务或任务模块中的某个位置创建了记录器实例,则Worker不会更新重定向。

如果想要重定向sys.stdout、sys.stderr 到自定义日志记录器,必须手动开启此功能。比如:

import sys

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    old_outs = sys.stdout, sys.stderr
    rlevel = self.app.conf.worker_redirect_stdouts_level
    try:
        self.app.log.redirect_stdouts_to_logger(logger, rlevel)
        print('Adding {0} + {1}'.format(x, y))
        return x + y
    finally:
        sys.stdout, sys.stderr = old_outs

Note:

If a specific Celery logger you need is not emitting logs, you should check that the logger is propagating properly. In this example celery.app.traceis enabled so that succeeded in logs are emitted:

import celery
import logging

@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
    logger = logging.getLogger('celery')
    logger.propagate = True
    logger = logging.getLogger('celery.app.trace')
    logger.propagate = True

Note:

如通想要彻底关闭日志功能,使用setup_logging信号:

import celery

@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
    pass

参数校验

4.0新增功能

Celery 会校验调用任务时传递的参数信息,就像 Python 调用普通函数时一样:

>>> @app.task
... def add(x, y):
...     return x + y

## Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

## Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

可以通过设置任务的 typing参数设置为False来关闭参数校验:

>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

## Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

隐藏参数中的敏感信息

4.0新增功能

使用 task_protocol 2 或更高版本的时(自 4.0 开始为默认值),可以重写位置参数和关键字参数在日志中的表现方式,并且可以使用 argsreprkwargsrepr 调用参数监控事件:

>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

>>> charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

Warning:

任何能够从Broker中读取任务消息或以其他方式拦截任务消息的人都可以访问敏感信息。 如果消息中包含敏感信息,应该针对该敏感信息进行加密,在本案例中使用信用卡号对实际的号码进行加密,并且进行安全存储,然后在任务中进行检索解密。

Retrying

当任务出现错误时,可以通过 app.Task.retry()open in new window 重新执行。

当调用 retry 时,会发送与原始任务相同的ID发送一条消息,将该消息发送到原始任务的对列中。当任务被重试时,也会被记录为一个任务状态,便于通过 result 实例来跟踪任务。这是一个 retry的例子:

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

Note:

app.Task.retry()open in new window 调用时会引发异常,在重试之后将无法访问任何代码。重试异常,并不是作为错误处理的,表示Worker需要重试任务,便于在启用结果后端时存储正确的任务状态。

重试异常是正常的,除非将 throw 设置为 False 可以避免该情况发生。

//这个翻译我感觉有问题。下面是原文:

The app.Task.retry()open in new window call will raise an exception so any code after the retry won't be reached. This is the Retryopen in new window
exception, it isn't handled as an error but rather as a semi-predicate to signify to the worker that the task is to be retried, so that it can store the correct state when a result backend is enabled.

This is normal operation and always happens unless the
throw argument to retry is set to False.

任务装饰器的 bind 参数将允许访问 self(任务类型实例)。

exc 参数主要用于传递日志和存储任务结果时使用的异常信息。exceptiontraceback 都将在任务状态中可用(如果启用了结果后端)。

任务如果设置了 max_retries 值,超出了重试的最大次数,则会重新引发当前的异常信息,但如果:

  • exc 参数没有设置

    该情况会引发 MaxRetriesExceededErroropen in new window 异常

  • 没有异常

    If there’s no original exception to re-raise the excargument will be used instead, so:

    self.retry(exc=Twitter.LoginError())
    

    will raise the excargument given.

自定义重试延迟

当任务需要重试时,可以等待一段时间来进行重试,默认延迟可以通过 default_retry_delayopen in new window 来进行设置的。默认时间为 3 分钟,延迟时间的单位是秒(类型可以为 int 或 float)。

也可以通过 retry()open in new window 中的 countdown 参数来覆盖默认值。

@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        something_raising()
    except Exception as exc:
        # overrides the default delay to retry after 1 minute
        raise self.retry(exc=exc, countdown=60)

自动重试已知异常

4.0 版本的新功能

有时,您只想在引发特定异常时重试任务。 可以通过 Celery 中 的app.task()open in new window装饰器的 autoretry_for 参数进行自动重试任务:

from twitter.exceptions import FailWhaleError

@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

如果要给retry()open in new window的内部调用指定参数,配置app.task()open in new window装饰器的retry_kwargs 参数

@app.task(autoretry_for=(FailWhaleError,),
          retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

这是手动处理异常的替代方法,上面的示例与在 try ... except语句中包含的代码块一样:

@app.task
def refresh_timeline(user):
    try:
        twitter.refresh_timeline(user)
    except FailWhaleError as exc:
        raise div.retry(exc=exc, max_retries=5)

如果希望所有的错误都自动重试,只需使用:

@app.task(autoretry_for=(Exception,))
def x():
    ...

4.2 版本新功能

如果你的任务依赖于别的服务,比如向API发送请求,可以使用 exponential backoff (指数补偿

)来避免大量服务请求。 幸运的是我们可以通过指定retry_backoff参数来让Celery的自动重试支持它,就像这样:

from requests.exceptions import RequestException

@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
    ...

默认情况下,exponential backoff 还会引入随机 jitter(抖动)来避免所有任务在同一时间执行。最大延迟限制在10分钟,可以通过下面的选项进行自定义配置。

Task.autoretry_for

异常类的列表或元组,如果任务在执行的过程中引发异常,任务将自动重试。默认情况下任何异常都不会自动重试。

Task.max_retries

最大重试次数。值为None的话代表一直重试,默认是3

Task.retry_backoff

一个布尔值或者数字。如果选项为True,自动重试将遵循exponential backoff。第一次重试延迟1s,第二次2s,第三次4s,第四次8s以此类推(延迟的值受Task.retry_jitter影响,如果开启了的话)。如果设置为数字,那么该值将作为延迟因子。比如,将该值设置为3,则第一次重试延迟3s,第二次6s,第三次12s,第四次24s以此类推。该选项默认配置为False,自动重试不会做延迟处理。

Task.retry_backoff_max

如果Task.retry_backoff开启的话,该值表示以秒为单位的最大重试延迟时长,默认是10分钟,也就是600.

Task.retry_jitter

一个布尔值,用与在exponential backoff引入随机数来避免队列中的所有任务同时执行。如果将此选项设置为True,则retry_backoff计算的延迟值将被视为最大值,实际延迟值将是介于零和该最大值之间的随机数。默认情况下,此选项设置为True。

选项列表

任务装饰器可以使用许多选项来更改任务的行为方式,例如,您可以使用rate_limit选项限制任务的速率。

传递给任务装饰器的任何关键字参数都为任务结果类的属性,这是一个内置的属性列表。

General

Task.name

任务注册的名字。可以手动设置,否则名字会通过模块和类名自动生成。更多信息查看Names。

Task.request

如果正在执行任务,则此消息将包含有关当前请求的信息。使用线程本地存储。更多信息查看任务请求

Task.max_retries

当前任务调用 self.retry 或使用 autoretry_for 参数时才会启用。

最大重试次数,如果重试的次数超过最大限制,会引发 MaxRetriesExceededErroropen in new window 异常。

Note:

您必须手动调用retry()open in new window,因为它不会在出现异常时自动重试。

默认值是3,如果设置为None则会一直重试。

Task.throws

预期内的异常,如果在元组中含有该异常类,将不会被视为异常。列表中的错误将错误信息记录到结果后端中,但Worker不会将该事件记录为错误信息,也没有traceback。

示例:

@task(throws=(KeyError, HttpNotFound)):
def get_foo():
    something()

错误类型:

  • 预期错误(在 Task.throws)

    记录 INFO 信息,但回溯排除。

  • 非预期错误

    记录 ERROR 信息,可回溯。

Task.default_retry_delay

在执行任务重试之前的默认时间(秒)。可以是intfloat。默认值为三分钟延迟。

Task.rate_limit

限制指定任务类型的速率(限制在指定时间内运行的任务数量)。当速率限制生效时,任务仍然会完成,但是可能需要一些时间才能开始。
如果限制速率为 None,表示速率限制无效。速率可以为 int 也可以为 float 类型,则被表示为“每秒任务数”。
速率限制也可以在数值后面添加 "/s"、"/m" 或 "/h",以秒、分钟或以小时为单位。任务将在指定的时间内平均分配。
例如:"100/m" (每分钟100个任务)。则强制会在同一个Worker实例上启动俩个任务之间至少 600ms 的延迟。
默认值通过 task_default_rate_limit 进行设定:如果未指定,表示默认情况禁用任务的速率限制。
注意,该速率限制是对每一个Worker实例的限制,并非全局速率限制。配置全局速率限制(例如,API每秒最多请求的次数),您必须限制到给定队列

Task.time_limit

此任务的硬性时间限制(秒)。未设置时,使用默认。

Task.soft_time_limit

此任务的软时间限制。未设置时,使用默认。

Task.ignore_result

不存储任务状态信息,如果配置该选项 AsyncResultopen in new window 将失效,无法进行检测任务情况以及返回内容。

Task.store_errors_even_if_ignored

如果为True,那么即使配置了ignore_result也会存储结果。

Task.serializer

要使用的序列化的方法,默认使用task_serializer中的设置。可以是pickle,json,yaml 或者其他通过kombu.serialization.registry注册的自定义方法。

更多信息查看Serializers:序列化。

Task.compression

默认使用的压缩格式。

默认使用task_compression中的设置。可以是gzip 或者 bzip2 或者其他通过kombu.compressionopen in new window注册的方法。

更多信息查看Compression:压缩。

Task.backend

该任务使用的结果存储后端。是celery.backends 后端类的一个实例 [An instance of one of the backend classes in celery.backends.] 默认是 app.backend, 通过result_backend配置。

Task.acks_late

如果设置为 True,任务执行后(而不是执行前,默认为执行前)才会确认该任务的消息。

注意:如果Worker执行过程中崩溃,任务可能会执行多次。确保你的tasks是幂等

可以通过 task_acks_late 参数来进行全局配置。

Task.track_started

如果设置为 True,当Worker执行任务时,任务状态为 stared。默认为 False,因为在正常情况下是不需要该颗粒度级别的。任务要么挂起、完成要么等待重试。当存在长时间运行的任务并且需要报告当前正在运行的任务时,具有“已启动”状态可能很有用。

执行任务的worker所在主机的主机名和进程 id 可以在状态的元数据中进行查看(例如:result.info['pid'])。

可以通过 task_track_started 进行全局配置。

另请参阅:

Taskopen in new window的API引用

States

Celery可以持续最终任务的当前状态。这个状态也包含成功任务的执行结果或者失败任务的异常信息。

有一些不同的的result backends 可供选择,它们有不同的优缺点(查看Result Backends)

在任务的执行周期中,可能会经历几种状态,每种状态都会附加当前状态信息。当任务进入下一个状态时,上一个状态的信息会被移除,但部分信息是可以推导的(例如,一个任务处于执行 FAILED 状态,则表示在某个时刻是处于 STARTED 状态的)。

也有状态集,如FAILURE_STATES集和READY_STATESopen in new window集。客户端使用这些集合的成员关系来决定是否应该重新引发异常(PROPAGATE_STATESopen in new window),或者是否可以缓存状态(如果任务准备好了,可以缓存状态)。

也可以自定义状态

Result Backends

如果需要跟踪任务信息或返回值,需要提供一个 Celery 存储的结果后端,便于检索。有几个内置的结果后端可以考虑使用:SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid(rpc) 和 Redis,也可以自定义后端。

每一个后端适用于所有的情况,每一个后端都有优缺点,应该选择适合结果后端。

Warning:

后端传输和存储需要消耗资源,为确保资源被释放,需要对调用任务后返回的每一个 AsyncResultopen in new window 实例调用 get()open in new windowforget()open in new window

更多信息:任务结果后端设置

RPC Result Backend (RabbitMQ/QPid)

RPC 后端(rpc://)比较特殊,因为它实际上不存储状态信息,只是作为消息发送。这是一个重要区别,因为这意味着一个结果只能被检索一次,而且只能由启动任务的客户端检索。两个不同的进程无法获取相同的结果。

即使有这样的限制,如果您需要实时接收状态更改,它也是一个很好的选择。使用消息传递意味着客户端不必轮询新状态。

默认情况下消息时非持久化的,broker重启后消息就会消失。可以通过配置result_persistent来实现结果后端发送持久化消息。

Database Result Backend

对于多数人来说,使用数据库保存任务状态信息是比较方便的,特别是 web 应用程序使用的数据库这一类,但也有一些限制。

  • 使用数据库轮询获取任务状态信息会导致数据库压力很大,应该设置轮询的间隔时间,例如 result.get()

  • 某些数据库使用的默认事务隔离级别不适合轮询表以查找更改。

    在 MySQL 中,默认的事务隔离级别是 REPEATABLE-READ,在提交当前事务之前,事务不会看到其他事务所做的更改。建议将其更改为 READ-COMMITTED 级别。

Built-in States

PENDING

等待执行的或未知的任务会处于此状体。任务ID未知的话也会处于此状态。

STARTED

表示任务已开启。默认不报告此状态,开启该状态请查看app.Task.track_startedopen in new window.

**meta-data:** pid以及执行任务的worker所在主机的主机名。

SUCCESS

任务已经成功执行

**meta-data:** 包含任务返回值的结果

**propagates**: Yes

**ready**: Yes

FAILURE

任务执行失败的结果

**meta-data:** 执行异常时的任务信息,其中 traceback 包含引发错误的堆栈信息。

propagates: Yes

RETRY

任务已经被重试。

**meta-data:** 结果信息包含导致重试的异常信息,traceback 包含引发异常时堆栈的回溯。

propagates: Yes

REVOKED

任务被撤销。

 **propagates**: Yes

Custom states

要自定义任务状态只需要提供一个独一无二的名称。名称通常是大写的字符串。作为示例,你可以查看一个定义了ABORTED状态的可终止的任务。

使用update_state()open in new window来更新任务状态:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

在这里,创建了一个名称为“ PROGRESS”的状态,通过将currenttotal作为状态元数据的一部分,告知任何知道该状态的应用程序该任务当前正在进行中,并且还告知该任务的进度,这可以用来创建任务进度条。

​没看懂

Creating pickleable exceptions

一个关于Python的冷知识是,Python 的异常必须要符合一些简单规则,才能被 pickle 模块支持以及序列化。

使用 Pickle 作为序列化器时,异常如果不是pickleable的,则任务无法正常工作。

为确保异常是可被处理的,异常必须要在 .args 属性中提供实例化它时的初始参数。最简单的方法就是使用异常调用 Exception.__init__

让我们来看一些有用的例子,还有一个不适用的例子:

## OK:
class HttpError(Exception):
    pass

## BAD:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code

## OK:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code
        Exception.__init__(self, status_code)  # <-- REQUIRED

所以规则是:对于任何支持自定义参数 *args 的异常,都必须使用 Exception.__init__(self, *args)

关键字参数没有特殊支持,如果需要保存关键字参数,当异常被 unpickled 时,需要将它们作为普通的参数进行传递:

class HttpError(Exception):

    def __init__(self, status_code, headers=None, body=None):
        self.status_code = status_code
        self.headers = headers
        self.body = body

        super(HttpError, self).__init__(status_code, headers, body)

Semipredicates

worker将任务封装在一个跟踪函数中,该函数记录任务的最终状态。有许多异常可用于向此函数发出信号,以更改它处理任务返回的方式。

Ignore(忽略)

任务可能会触发Ignoreopen in new window来强制worker终止任务。这意味着不会记录该任务的任何状态,但消息会被确认(从任务队列中删除)。

如果要实现类似于撤消的自定义功能,或手动存储任务的结果,则可以使用此方法。

示例:将撤销的任务保存在Redis集合中:

from celery.exceptions import Ignore

@app.task(bind=True)
def some_task(self):
    if redis.ismember('tasks.revoked', self.request.id):
        raise Ignore()

手动存储结果的例子:

from celery import states
from celery.exceptions import Ignore

@app.task(bind=True)
def get_tweets(self, user):
    timeline = twitter.get_timeline(user)
    if not self.request.called_directly:
        self.update_state(state=states.SUCCESS, meta=timeline)
    raise Ignore()

Reject(拒绝)

使用AMQPs的basic_reject方法可以使任务触发Rejectopen in new window来拒绝任务消息。正常情况下是无效的,除非 Task.acks_late 是启用的。

Rejecting和acking(确认)具有相同的效果,不过有些brokers提供了一些额外功能。例如,RabbitMQ支持死信队列(Dead Letter Exchangesopen in new window)的概念,可以配置一个队列用于将被拒绝的消息发送到其中。

Reject也可以用来给消息重新排队,不过务必小心,稍有不慎就会陷入死循环。

当任务导致内存不足时,使用 Reject 的例子:

import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
    file = get_file(path)
    try:
        renderer.render_scene(file)

    # if the file is too big to fit in memory
    # we reject it so that it's redelivered to the dead letter exchange
    # and we can manually inspect the situation.
    except MemoryError as exc:
        raise Reject(exc, requeue=False)
    except OSError as exc:
        if exc.errno == errno.ENOMEM:
            raise Reject(exc, requeue=False)

    # For any other error we retry after 10 seconds.
    except Exception as exc:
        raise self.retry(exc, countdown=10)

重新排队的例子:

from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def requeues(self):
    if not self.request.delivery_info['redelivered']:
        raise Reject('no reason', requeue=True)
    print('received two times')

Consult your broker documentation for more details about the basic_reject method.

该功能应该和broker相关。也就是不同的broker的支持程度是不同的。

Retry

Retryopen in new window异常由Task.retry触发,用来告诉Worker任务正在重试。

Custom task classes

所有的任务都继承 app.Task 类,run() 方法为任务体。

例如:

@app.task
def add(x, y):
    return x + y

在内部大概会是这样:

class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]

Instantiation(实例化)

一个任务不会在每次请求都实例化一个对象,而是在任务注册表注册一个全局实例。这意味着每一个进程只调用一次 __init__ 构造函数,任务类在语义上更接近 Actor。

比如你有一个任务:

from celery import Task

class NaiveAuthenticateServer(Task):

    def __init__(self):
        self.users = {'george': 'password'}

    def run(self, username, password):
        try:
            return self.users[username] == password
        except KeyError:
            return False

并且您将每个请求路由到同一个进程,那么它将保持请求之间的状态。

这对于缓存资源也很有用,例如,缓存数据库连接的基本Task类:

from celery import Task

class DatabaseTask(Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

Per task usage

可以将上述内容添加到每个任务中,如下所示:

@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        process_row(row)

然后process_rows任务的db属性在每个进程中将始终保持不变。

App-wide usage

你也可以将自定义类用在你所有的Celery app中,只需要在实例化时将其作为task_cls参数传递即可。该参数应该是一个指向你的自定义Task类的python路径或者类本身:

from celery import Celery

app = Celery('tasks', task_cls='your.module.path:DatabaseTask')

这将使所有在应用中使用装饰器语法声明的任务都使用DatabaseTask类,并且都具有db属性。

默认值是Celery提供的类:celery.app.task:Task

Handlers

before_start(self, task_id, args, kwargs)

在任务开始执行之前由worker执行。

5.2新增特性。

Parameters:

  • task_id - 要执行的任务的ID
  • args - 任务执行使用的初始参数
  • kwargs - 任务执行使用的初始关键字参数

忽略此处理程序的返回值。

after_return(self, status, retval, task_id, args, kwargs, einfo)

任务返回后调用。

Parameters:

  • status - 当前任务状态
  • retval - 任务返回值/异常信息
  • task_id - 任务ID
  • args - 返回的任务初始参数
  • kwargs - 返回的任务初始关键字参数

Keyword Argumentseinfo - 异常信息实例,包括traceback

忽略此处理程序的返回值。

Requests and custom requests

在收到运行任务的消息时,worker创建requestopen in new window来表示这种需求。

自定义任务类可以通过更改celery.app.task.Task.Requestopen in new window属性来覆盖要使用的请求类。您可以分配自定义请求类本身,也可以分配其完整名称。

该请求具有多个职责。自定义请求类应该涵盖所有这些,它们负责实际运行和跟踪任务。我们强烈建议从celery.worker.request.Requestopen in new window继承

当使用pre-forking worker时,on_timeout()open in new window方法和onfailure()open in new window方法会在main worker process中执行。应用程序可以利用这种工具来检测未被celery.app.task.Task.on_failure()open in new window检测到故障。

比如这个示例,探测并记录硬时间限制,以及其他故障。

import logging
from celery import Task
from celery.worker.request import Request

logger = logging.getLogger('my.package')

class MyRequest(Request):
    'A minimal custom request to log failures and hard time limits.'

    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        super().on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )
        logger.warning(
            'Failure detected for task %s',
            self.task.name
        )

class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

@app.task(base=MyTask)
def some_longrunning_task():
    # use your imagination

How it works|它是如何工作的

技术细节来了。这部分你不需要知道,但你可能会感兴趣。

所有定义的任务都在注册表中。注册表中包含任务名称和它们的任务类。你可以自己查看这个注册表:

>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
    <@task: celery.chord_unlock>,
 'celery.backend_cleanup':
    <@task: celery.backend_cleanup>,
 'celery.chord':
    <@task: celery.chord>}

这是 celery内置的任务列表,注意:当导入定义的模块时任务才会被注册。

默认加载器会导入imports设置中列出的所有模块。

app.task()open in new window装饰器负责在应用程序任务注册表中注册任务。

当任务被发送时,实际功能代码不会随任务一起发送,只有要执行的任务的名称。当worker接收到消息之后就可以在注册表中查找到实际要执行的代码。

This means that your workers should always be updated with the same software as the client. This is a drawback, but the alternative is a technical challenge that’s yet to be solved.

提示和最佳实践

忽略不想要的结果

如果你不关心任务执行结果,可以使用ignore_resultopen in new window选项,毕竟存储结果既费时又消耗资源。

@app.task(ignore_result=True)
def mytask():
    something()

也可以使用task_ignore_result来进行全局配置。

或者也可以通过在调用apply_asyncdelay时通过传递ignore_result参数来控制每一次执行时是否保存结果

@app.task
def mytask(x, y):
    return x + y

## No result will be stored
result = mytask.apply_async(1, 2, ignore_result=True)
print result.get() # -> None

## Result will be stored
result = mytask.apply_async(1, 2, ignore_result=False)
print result.get() # -> 3

如果配置了结果后端,那么ignore_result默认为False

这几种配置的优先级顺序为(粒度越细优先级越高):

  1. 全局task_ignore_result
  2. ignore_resultopen in new window选项
  3. 任务执行选项中的ignore_result

更多优化建议

可以在优化章节找到更多优化建议。

避免启动同步子任务

让一个任务等待另一个任务的结果往往是非常低效的,并在工作池耗尽时,可能会导致死锁

建议使用异步替代,比如使用回调(callbacks)

错误示例:

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

推荐的方式:

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

将不同任务的signature进行链接,组成任务链来达成目的。您可以在Canvas: 设计工作流程中了解任务链和其他强大的结构

默认情况下,Celery不允许使用同步子任务,但极少数情况下你可能需要使用。注意: 不推荐使用!

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get(disable_sync_subtasks=False)
    info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

性能和策略

粒度

任务粒度是指每个子任务所需的计算量。一般而言,将任务分解为多个小任务是要比一个长任务要好的。

对于较小的任务,您可以并行处理更多的任务,并且这些任务的运行时间不会长到阻止worker处理其他等待的任务。

不过,执行一个任务是由开销的,需要传递消息,也可能需要存储数据等。所以如果任务粒度太细,可能会降低性能。

《并发的艺术》一书中有一节专门讨论任务粒度的主题。

Breshears, Clay. Section 2.2.1, “The Art of Concurrency”. O’Reilly Media, Inc. May 15, 2009. ISBN-13 978-0-596-52153-0.

Data locality

处理任务的worker应尽可能接近数据,最好的是在内存中有一个副本,最坏的是完全从另一个大陆传输。

如果数据不再worker本地,可以尝试在数据地启动一个另一个worker,如果不行的话,经常缓存使用的数据,或者预加载将要使用的数据。

最简单的办法是通过比如memcachedopen in new window这样的分布式缓存系统在worker间共享数据。

如果愿意的话,这还有篇论文:[Distributed Computing Economics.pdf](assets/Distributed Computing Economics-20220904225204-2tjuhh4.pdf)

State

由于Celery是一个分布式系统,因此您无法知道任务将在哪个进程或哪个机器上执行。您甚至无法知道任务是否会及时运行。

The ancient async sayings tells us that “asserting the world is the responsibility of the task”. What this means is that the world view may have changed since the task was requested, so the task is responsible for making sure the world is how it should be; If you have a task that re-indexes a search engine, and the search engine should only be re-indexed at maximum every 5 minutes, then it must be the tasks responsibility to assert that, not the callers.

另一个问题是Django模型对象。它们不应该作为参数传递给任务。当任务正在运行时,从数据库中重新获取对象几乎总是更好的,因为使用旧数据可能会导致争用情况。

假设您有一篇文章和一个自动展开其中某些缩写的任务,请想象以下场景:

class Article(models.Model):
    title = models.CharField()
    body = models.TextField()

@app.task
def expand_abbreviations(article):
    article.body.replace('MyCorp', 'My Corporation')
    article.save()

首先,作者创建一篇文章并保存它,然后作者单击启动缩写任务的按钮:

>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)

现在,队列非常忙碌,因此任务在2分钟内不会运行。同时,另一位作者对文章进行了更改,因此当任务最终运行时,文章的正文将还原为旧版本,因为任务的参数中包含旧正文。

修复条件竞争(Race Conditions)很容易,只需使用文章id,并在任务主体中重新获取文章:

竞态条件是指在并发环境中,当有多个事件同时访问同一个临界资源时,由于多个事件的并发执行顺序的不确定,从而导致程序输出结果的不确定,这种情况我们称之为竞态条件 (Race Conditions)或者竞争冒险(race hazard)。

@app.task
def expand_abbreviations(article_id):
    article = Article.objects.get(id=article_id)
    article.body.replace('MyCorp', 'My Corporation')
    article.save()
>>> expand_abbreviations.delay(article_id)

由于发送大消息的开销可能很大,因此这种方法甚至可能具有性能优势。

Database transactions

我们来看另一个示例:

from django.db import transaction
from django.http import HttpResponseRedirect

@transaction.atomic
def create_article(request):
    article = Article.objects.create()
    expand_abbreviations.delay(article.pk)
    return HttpResponseRedirect('/articles/')

这是一个Django视图,它在数据库中创建了一个article对象,然后将主键传递给一个任务。它使用transaction.atomic装饰器, 作用是view返回时提交事物,当view触发异常时将事物回滚。

这里存在一个条件竞争,如果任务在事务被提交之前开始执行;数据库对象尚不存在!

解决方案是使用on_commit回调,以便所有事务都成功提交后启动Celery任务。

from django.db.transaction import on_commit

def create_article(request):
    article = Article.objects.create()
    on_commit(lambda: expand_abbreviations.delay(article.pk))

注:

on_commit在Django1.9之后才支持。

示例

让我们看一个实际的案例:需要对评论进行垃圾邮件过滤的博客。创建评论时,垃圾邮件过滤器会在后台运行,因此用户不必等待它完成。

我有一个Django博客应用程序,允许对博客文章发表评论。我将描述这个应用程序的部分模型/视图和任务。

blog/models.py

评论model大概这样:

from django.db import models
from django.utils.translation import ugettext_lazy as _


class Comment(models.Model):
    name = models.CharField(_('name'), max_length=64)
    email_address = models.EmailField(_('email address'))
    homepage = models.URLField(_('home page'),
                               blank=True, verify_exists=False)
    comment = models.TextField(_('comment'))
    pub_date = models.DateTimeField(_('Published date'),
                                    editable=False, auto_add_now=True)
    is_spam = models.BooleanField(_('spam?'),
                                  default=False, editable=False)

    class Meta:
        verbose_name = _('comment')
        verbose_name_plural = _('comments')

在view中,当comment发送后,首先将其写入数据库,然后在后台启动过滤器。

blog/views.py

from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response

from blog import tasks
from blog.models import Comment


class CommentForm(forms.ModelForm):

    class Meta:
        model = Comment


def add_comment(request, slug, template_name='comments/create.html'):
    post = get_object_or_404(Entry, slug=slug)
    remote_addr = request.META.get('REMOTE_ADDR')

    if request.method == 'post':
        form = CommentForm(request.POST, request.FILES)
        if form.is_valid():
            comment = form.save()
            # Check spam asynchronously.
            tasks.spam_filter.delay(comment_id=comment.id,
                                    remote_addr=remote_addr)
            return HttpResponseRedirect(post.get_absolute_url())
    else:
        form = CommentForm()

    context = RequestContext(request, {'form': form})
    return render_to_response(template_name, context_instance=context)

过滤器我使用的是Akismetopen in new window,这个东西被WordPress广泛使用。个人使用时免费的,商业使用则需要付费。你需要注册一个他们的服务来获取API key。

对Akismet进行API调用,可以使用Michael Foord编写的akismet模块。[原链接已经不可用,不过似乎已经提交到了pypi仓库,可以使用pip install akismet安装。]

blog/tasks.py

from celery import Celery

from akismet import Akismet

from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site

from blog.models import Comment


app = Celery(broker='amqp://')


@app.task
def spam_filter(comment_id, remote_addr=None):
    logger = spam_filter.get_logger()
    logger.info('Running spam filter for comment %s', comment_id)

    comment = Comment.objects.get(pk=comment_id)
    current_domain = Site.objects.get_current().domain
    akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
    if not akismet.verify_key():
        raise ImproperlyConfigured('Invalid AKISMET_KEY')


    is_spam = akismet.comment_check(user_ip=remote_addr,
                        comment_content=comment.comment,
                        comment_author=comment.name,
                        comment_author_email=comment.email_address)
    if is_spam:
        comment.is_spam = True
        comment.save()

    return is_spam