调用任务

Basic:基础

本文档介绍了任务实例和 canvas 对 Celery 统一的调用接口。

这些 API 定义了标准的执行选项集,也就是下面这三个方法:

  • apply_async(args[, kwargs[, ...]])

    发送任务消息

  • delay(*args, **kwargs)

    发送任务消息的快捷方式,但不支持执行选项

  • calling (call)

    应用一个支持调用API(例如,add(2,2))的对象,意味着任务不会被 worker 执行,而是在当前线程中执行(消息不会被发送)。

速查表

  • T.delay(arg, kwarg=value))

    调用 apply_async 的快捷方式(.delay(_args, *_kwargs)调用 .apply_async(args, kwargs))

  • T.apply_async((arg,), {'kwarg': value})

  • T.apply_async(countdown=10)

    从现在起, 十秒内执行。

  • T.apply_async(eta=now + timedelta(seconds=10))

    从现在起十秒内执行,指明使用eta。

  • T.apply_async(countdown=60, expires=120)

    从现在起一分钟执行,但在两分钟后过期。

  • T.apply_async(expires=now + timedelta(days=2))

    两天内过期,使用datetime对象。

示例

delay()方法很方便,因为它看起来像调用一个常规函数:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

使用apply_async()的话,得这样写:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

所以delay 是更方便的,但如果你想要附加执行选项,就必须要使用apply_async

Tip

If the task isn’t registered in the current process you can use send_task()``to call the task by name instead.

本文档的其余部分将详细介绍任务执行选项。所有示例都使用名为add的任务,返回两个参数的总和:

@app.task
def add(x, y):
    return x + y

还有一个办法。。

稍后在阅读Canvas的时候,可以学习到更多关于这部分的内容,but signature's`` are objects used to pass around the signature of a task invocation, (for example to send it over the network), and they also support the Calling API:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()

Linking(callbacks/errbacks)

Celery支持将任务连接起来。后边的会作为前边,直接看示例吧:

add.apply_async((2, 2), link=add.s(16))

等同于(2+2)+16也就是20。先计算2+2,然后结果再和16运算。

You can also cause a callback to be applied if task raises an exception ( errback ). The worker won’t actually call the errback as a task, but will instead call the errback function directly so that the raw request, exception and traceback objects can be passed to it.

This is an example error callback:

@app.task
def error_handler(request, exc, traceback):
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          request.id, exc, traceback))

它可以被添加到任务中,使用link_error执行选项:

add.apply_async((2, 2), link_error=error_handler.s())

此外,linklink_error选项可以表示为一个列表:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

callback/errbacks会按照顺序调用,所有回调都将使用父任务的返回值作为部分参数来调用。

On message

Celery支持捕获所有状态变化通过设置on_message回调。

比如一个长任务发送进度,您可以执行以下操作::

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
def on_raw_message(body):
    print(body)

a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))

将会生成以下输出:

{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 50},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 90},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': 'hello world: 10',
 'children': [],
 'status': 'SUCCESS',
 'traceback': None}
hello world: 10

ETA and Countdown:预计到达时间和倒计时

ETA(estimated time of arrival:预计到达时间)允许您设置一个特定的日期和时间,即执行任务的最早时间。countdown是按秒设置ETA的快捷方式。

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

任务会在指定时间到达后的某个时间执行,但具体时间不确定。超时原因可能是队列中任务过多或者严重的网络延迟。为确保任务及时执行,应监视队列是否拥塞。使用Munin或类似的工具来接收警告,以便可以及时处理。

虽然 countdown 是整数,但ETA必须是个datetime对象,并指定确切的日期和时间(包括毫秒精度和时区信息):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)

警告:

当使用RabbitMQ作为broker时,如果指定了超过15分钟的countdown,您可能会遇到worker终止的问题并伴随一个PreconditionFailedopen in new window的错误:

amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel

从RabbitMQ3.8.15开始,consumer_timeout的默认值是15分钟。3.8.17开始调整到了30分钟。如果consumer在此时间之后依然没有确认信息,那么这个channel将会被关闭,并伴随一个PRECONDITION_FAILED异常。更多信息查看:Delivery Acknowledgement Timeoutopen in new window

要解决这个问题,在RabbitMQ的配置文件rabbitmq.conf中指定consumer_timeout的值大于等于countdown的值。比如可以设置一个特别大的值consumer_timeout = 31622400000,其等于1年(以毫秒计),以避免将来出现问题。

Expiration:

expires 参数定义了可选的过期时间。可以是任务发布后的秒数,也可以使用datetime指定特定的日期时间:

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

当一个worker收到一个过期任务时,会将其标记为REVOKEDTaskRevokedErroropen in new window)。

Message Sending Retry:消息发送重试

Celery会自动重试发送消息当连接失败的时候,重试行为是可配置的,比如频率、最大重试次数,或者完全关闭重试。

比如关闭,可以通过设置retry执行参数为False:

add.apply_async((2, 2), retry=False)

相关设置:

task_publish_retry

task_publish_retry_policy

Retry Policy:重试策略

重试策略是一个用来控制重试行为的映射,它包含下边这些keys:

  • max_retries

    最大重试次数,设置为None表示无限重试,默认值是3.

  • interval_start

    重试间隔,浮点数或整数。默认是0.

  • interval_step

    连续重试时,该数字会被添加到重试间隔时间上,浮点数或者整数。默认是0.2.

  • interval_max

    重试之间等待的最大秒数(浮点数或整数)。默认值为0.2。

比如,默认策略就像下边这样:

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

重试所花费的最大时间将是0.4秒。It's set relatively short by default because a connection failure could lead to a retry pile effect if the broker connection is down – For example, many web server processes waiting to retry, blocking other incoming requests.(默认被设置为很短的时间,因为如果broker连接关闭,连接失败可能会导致重试堆效应[雪崩?]。例如,许多Web服务器进程等待重试,从而阻塞其他传入请求。)

Connection Error Handling:连接错误处理

当您发送任务而消息传输连接丢失或无法启动连接时,将会触发OperationalError错误:

>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 388, in delay
        return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 503, in apply_async
    **options
  File "celery/app/base.py", line 662, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/backends/rpc.py", line 275, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
    channel = self._channel = channel()
  File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
    self.sock.connect(sa)
  kombu.exceptions.OperationalError: [Errno 61] Connection refused

如果开启了重试,那么该错误会在重试次数用完时触发,或者被立即关闭时。

你可以处理这个错误:

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception('Sending task raised: %r', exc)

Serializers:序列化

Clients和workers之间的数据传输需要序列化,因此Celery中的所有消息都有content_type 的header用来描述序列化所使用的的方法。

默认的序列化是JSON, 可以使用task_serializer改变这个设置,也可以为每个单独的任务甚至每个消息更改此设置。

Celery内置的序列化方法有JSON、pickle、YAML和msgpack,或者也可以将自定义序列化方法注册到Kombu serializer registry中使用。

另请参阅:

Kombu用户指南中的消息序列化open in new window

Security

pickle模块允许执行任意功能,请参见安全指南。Celery还附带了一个特殊的序列化程序,它使用加密技术对消息进行签名。

每种方式都有其优缺点。

  • json - JSON支持许多编程语言,从Python2.6开始它是Python的标准的一部分,使用现代的Python库(如simplejsonopen in new window)进行解码相当快。

    JSON最大的缺点是数据格式太少,只有string、Unicode、floats、Boolean、dict和list。小数和日期明显缺失。

    二进制数据会被转换成Base64,与原生支持二进制类型的编码格式相比,所传送数据的大小增加了34%。

    但是,如果您的数据符合上述约束,并且需要跨语言支持,那么JSON的默认设置可能是最佳选择。

    http://json.orgopen in new window可以查看更多。

    注意:

    (内容来自Python官方文档:https://docs.python.org/3.6/library/json.htmlopen in new window)JSON的键/值对中的键始终为字符串类型。当一个字典被转换为JSON之后,所有的Keys会被强制转换为string类型。由于这个特性,如果一个字典被转换为JSON然后在转换回字典,这个字典可能和原始字典不同。也就是loads(dumps(x))) != x在x包含非string类型的key时。(也就是说一个字典有非string类型的key的时候,在转换成JSON之后再转换为字典之后,所有的key都会变成string类型。)

  • pickle - 如果你不打算支持除Python以外的所有语言,那么使用pickle将会给你所有Python内置数据类型的支持(除了类实例),以及更小的消息当发送二进制文件时,并且处理速度比JSON快那么一丢丢。

    查看pickle获取更多信息。

  • yaml - YAML有许多特性和JSON相同,除了它原生支持更多数据类型(包括日期、递归引用等

    但是Python的YAML库比JSON库要慢

    如果您需要一组更有表达力的数据类型,并且需要保持跨语言兼容性,那么YAML可能比上面的更适合。

    See http://yaml.org/open in new window for more information.

  • msgpack - msgpack是一种二进制序列化格式,更接近JSON功能。然而,它还很年轻,在这一点上,支持应该被认为是实验性的。

    See http://msgpack.org/open in new window for more information.

序列化使用的方法需要在header中标识,以便于worker可以反序列化。如果使用自定义序列化,那么worker也需要支持。

以下顺序用于确定发送任务时使用的序列化方法:

  1. serializer 执行选项
  2. Task.serializer属性
  3. task_serializer设置

为单个任务调用设置自定义序列化程序的示例:

>>> add.apply_async((10, 10), serializer='json')

Compression:压缩

Celery可以使用以下几种内置方法压缩消息:

  • brotli

    Brotli是针对Web,特别是针对小文本文档而优化的。它对于提供静态内容(如字体和html页面)最为有效。

    要使用的话需要安装对应的库:

    pip install celery[brotli]
    
  • bzip2

    bzip2可以创建比gzip更小的文件,但压缩和解压速度明显比gzip要慢

    要使用的话得确保Python的编译选项中开启了对bzip2的支持。

    如果收到了下面的ImportError,则表示没有开启对bzip2的支持:

    >>> import bz2
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'bz2'
    
  • gzip

    gzip适用于需要小内存占用的系统,这使其成为内存有限的系统的理想选择。它通常用于生成扩展名为.tar.gz的文件。

    要使用的话得确保Python的编译选项中开启了对gzip的支持。

    如果收到了下面的ImportError,则表示没有开启对gzip的支持:

    >>> import gzip
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'gzip'
    
  • lzma

    lzma提供良好的压缩比,并以较快的压缩和解压缩速度执行,但会占用较高的内存。

    要使用的话得确保Python的编译选项中开启了对lzma的支持,且Python版本大于等于3.3.

    如果收到了下面的ImportError,则表示没有开启对lzma的支持:

    >>> import lzma
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'lzma'
    

    或者可以使用下面的方法安装:

    $ pip install celery[lzma]
    
  • zlib

    zlib是一个库形式的Deflate算法的抽象,在其API中包括对gzip文件格式和轻量级流格式的支持。它是许多软件系统的重要组成部分 - Linux内核和Git VCS等。

    要使用的话得确保Python的编译选项中开启了对zlib的支持

    如果收到了下面的ImportError,则表示没有开启对zlib的支持:

    >>> import zlib
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'zlib'
    
  • zstd

    zstd以zlib级的实时压缩方案和更好的压缩比为目标。它由Huff0和FSE库提供一个非常快的熵级支持(entropy stage)。

    要使用这个算法需要安装以下模块:

    $ pip install celery[zstd]
    

你也可以在kombu compression registryopen in new window中注册自己的算法来使用。

以下顺序用于确定发送任务时使用的压缩方案:

  1. compression 执行选项
  2. Task.compression属性
  3. task_compression属性

指定调用任务时使用的压缩的示例:

>>> add.apply_async((2, 2), compression='zlib')

Connections:连接

自动池支持

  • 从2.3版开始,支持自动连接池,因此您不必手动处理连接和发布者即可复用连接。
  • 从2.5版开始,默认情况下启用连接池。
  • 有关 broker_pool_limit 更多信息,请参见设置。

您可以通过创建发布者来手动处理连接:

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

尽管这个特殊的例子用一个组来表达要好得多:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]

Routing options:路由选项

Celery可以将任务路由到不同的队列。

使用queue选项可以完成简单的路由(name <-> name)

add.apply_async(queue='priority.high')

然后可以通过 -Q 选项将workers分配到priority.high队列:

$ celery -A proj worker -l INFO -Q celery,priority.high

另请参阅:

硬编码队列名是不推荐的,最佳实践是使用configuration routers(task_routes)

更多关于路由的信息可以查看任务路由

Results options:结果选项

可以使用task_ignore_result设置或使用ignore_result选项来控制开关结果存储。

>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None

>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3

如果您想在结果后端中存储有关任务的其他元数据,请将result_extended设置为True

另请参阅:

更多关于tasks的信息可以查看Tasks。

Advanced Options:高级选项

这部分是给想要发挥AMQP的完整路由特性的高级用户准备的。感兴趣的用户可阅读任务路由。

  • 交换(exchange)

    发送信息的 exchange(或者 kombu.entity.Exchange) 的名称。

  • routing_key

    用于确定路由的密钥。

  • 优先(priority)

    0~255 之间的数字,其中255是最高优先级。

    支持:RabbitMQ,Redis(优先级颠倒,最高为0)。