调用任务
Basic:基础
本文档介绍了任务实例和 canvas 对 Celery 统一的调用接口。
这些 API 定义了标准的执行选项集,也就是下面这三个方法:
apply_async(args[, kwargs[, ...]])
发送任务消息
delay(*args, **kwargs)
发送任务消息的快捷方式,但不支持执行选项
calling (call)
应用一个支持调用API(例如,add(2,2))的对象,意味着任务不会被 worker 执行,而是在当前线程中执行(消息不会被发送)。
速查表
T.delay(arg, kwarg=value))
调用 apply_async 的快捷方式(.delay(_args, *_kwargs)调用 .apply_async(args, kwargs))
T.apply_async((arg,), {'kwarg': value})
T.apply_async(countdown=10)
从现在起, 十秒内执行。
T.apply_async(eta=now + timedelta(seconds=10))
从现在起十秒内执行,指明使用eta。
T.apply_async(countdown=60, expires=120)
从现在起一分钟执行,但在两分钟后过期。
T.apply_async(expires=now + timedelta(days=2))
两天内过期,使用datetime对象。
示例
delay()
方法很方便,因为它看起来像调用一个常规函数:
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
使用apply_async()
的话,得这样写:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
所以delay 是更方便的,但如果你想要附加执行选项,就必须要使用apply_async
。
Tip
If the task isn’t registered in the current process you can use send_task()``to call the task by name instead.
本文档的其余部分将详细介绍任务执行选项。所有示例都使用名为add的任务,返回两个参数的总和:
@app.task
def add(x, y):
return x + y
还有一个办法。。
稍后在阅读Canvas的时候,可以学习到更多关于这部分的内容,but signature's`` are objects used to pass around the signature of a task invocation, (for example to send it over the network), and they also support the Calling API:
task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
Linking(callbacks/errbacks)
Celery支持将任务连接起来。后边的会作为前边,直接看示例吧:
add.apply_async((2, 2), link=add.s(16))
等同于(2+2)+16
也就是20。先计算2+2
,然后结果再和16运算。
You can also cause a callback to be applied if task raises an exception ( errback ). The worker won’t actually call the errback as a task, but will instead call the errback function directly so that the raw request, exception and traceback objects can be passed to it.
This is an example error callback:
@app.task
def error_handler(request, exc, traceback):
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
request.id, exc, traceback))
它可以被添加到任务中,使用link_error
执行选项:
add.apply_async((2, 2), link_error=error_handler.s())
此外,link
和link_error
选项可以表示为一个列表:
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
callback/errbacks会按照顺序调用,所有回调都将使用父任务的返回值作为部分参数来调用。
On message
Celery支持捕获所有状态变化通过设置on_message回调。
比如一个长任务发送进度,您可以执行以下操作::
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)
def on_raw_message(body):
print(body)
a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))
将会生成以下输出:
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 50},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 90},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': 'hello world: 10',
'children': [],
'status': 'SUCCESS',
'traceback': None}
hello world: 10
ETA and Countdown:预计到达时间和倒计时
ETA(estimated time of arrival:预计到达时间)允许您设置一个特定的日期和时间,即执行任务的最早时间。countdown是按秒设置ETA的快捷方式。
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # this takes at least 3 seconds to return
20
任务会在指定时间到达后的某个时间执行,但具体时间不确定。超时原因可能是队列中任务过多或者严重的网络延迟。为确保任务及时执行,应监视队列是否拥塞。使用Munin或类似的工具来接收警告,以便可以及时处理。
虽然 countdown 是整数,但ETA必须是个datetime对象,并指定确切的日期和时间(包括毫秒精度和时区信息):
>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
警告:
当使用RabbitMQ作为broker时,如果指定了超过15分钟的countdown,您可能会遇到worker终止的问题并伴随一个PreconditionFailed的错误:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel
从RabbitMQ3.8.15开始,consumer_timeout的默认值是15分钟。3.8.17开始调整到了30分钟。如果consumer在此时间之后依然没有确认信息,那么这个channel将会被关闭,并伴随一个
PRECONDITION_FAILED
异常。更多信息查看:Delivery Acknowledgement Timeout。要解决这个问题,在RabbitMQ的配置文件
rabbitmq.conf
中指定consumer_timeout
的值大于等于countdown的值。比如可以设置一个特别大的值consumer_timeout = 31622400000
,其等于1年(以毫秒计),以避免将来出现问题。
Expiration:
expires 参数定义了可选的过期时间。可以是任务发布后的秒数,也可以使用datetime指定特定的日期时间:
>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)
>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
... expires=datetime.now() + timedelta(days=1)
当一个worker收到一个过期任务时,会将其标记为REVOKED(TaskRevokedError)。
Message Sending Retry:消息发送重试
Celery会自动重试发送消息当连接失败的时候,重试行为是可配置的,比如频率、最大重试次数,或者完全关闭重试。
比如关闭,可以通过设置retry
执行参数为False:
add.apply_async((2, 2), retry=False)
相关设置:
task_publish_retry
task_publish_retry_policy
Retry Policy:重试策略
重试策略是一个用来控制重试行为的映射,它包含下边这些keys:
max_retries
最大重试次数,设置为None表示无限重试,默认值是3.
interval_start
重试间隔,浮点数或整数。默认是0.
interval_step
连续重试时,该数字会被添加到重试间隔时间上,浮点数或者整数。默认是0.2.
interval_max
重试之间等待的最大秒数(浮点数或整数)。默认值为0.2。
比如,默认策略就像下边这样:
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
重试所花费的最大时间将是0.4秒。It's set relatively short by default because a connection failure could lead to a retry pile effect if the broker connection is down – For example, many web server processes waiting to retry, blocking other incoming requests.(默认被设置为很短的时间,因为如果broker连接关闭,连接失败可能会导致重试堆效应[雪崩?]。例如,许多Web服务器进程等待重试,从而阻塞其他传入请求。)
Connection Error Handling:连接错误处理
当您发送任务而消息传输连接丢失或无法启动连接时,将会触发OperationalError错误:
>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 388, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 503, in apply_async
**options
File "celery/app/base.py", line 662, in send_task
amqp.send_task_message(P, name, message, **options)
File "celery/backends/rpc.py", line 275, in on_task_call
maybe_declare(self.binding(producer.channel), retry=True)
File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
channel = self._channel = channel()
File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
self.transport.connect()
File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
self.sock.connect(sa)
kombu.exceptions.OperationalError: [Errno 61] Connection refused
如果开启了重试,那么该错误会在重试次数用完时触发,或者被立即关闭时。
你可以处理这个错误:
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception('Sending task raised: %r', exc)
Serializers:序列化
Clients和workers之间的数据传输需要序列化,因此Celery中的所有消息都有content_type 的header用来描述序列化所使用的的方法。
默认的序列化是JSON, 可以使用task_serializer改变这个设置,也可以为每个单独的任务甚至每个消息更改此设置。
Celery内置的序列化方法有JSON、pickle、YAML和msgpack,或者也可以将自定义序列化方法注册到Kombu serializer registry中使用。
另请参阅:
Kombu用户指南中的消息序列化。
Security
pickle模块允许执行任意功能,请参见安全指南。Celery还附带了一个特殊的序列化程序,它使用加密技术对消息进行签名。
每种方式都有其优缺点。
json - JSON支持许多编程语言,从Python2.6开始它是Python的标准的一部分,使用现代的Python库(如simplejson)进行解码相当快。
JSON最大的缺点是数据格式太少,只有string、Unicode、floats、Boolean、dict和list。小数和日期明显缺失。
二进制数据会被转换成Base64,与原生支持二进制类型的编码格式相比,所传送数据的大小增加了34%。
但是,如果您的数据符合上述约束,并且需要跨语言支持,那么JSON的默认设置可能是最佳选择。
http://json.org可以查看更多。
注意:
(内容来自Python官方文档:https://docs.python.org/3.6/library/json.html)JSON的键/值对中的键始终为字符串类型。当一个字典被转换为JSON之后,所有的Keys会被强制转换为string类型。由于这个特性,如果一个字典被转换为JSON然后在转换回字典,这个字典可能和原始字典不同。也就是loads(dumps(x))) != x在x包含非string类型的key时。(也就是说一个字典有非string类型的key的时候,在转换成JSON之后再转换为字典之后,所有的key都会变成string类型。)
pickle - 如果你不打算支持除Python以外的所有语言,那么使用pickle将会给你所有Python内置数据类型的支持(除了类实例),以及更小的消息当发送二进制文件时,并且处理速度比JSON快那么一丢丢。
查看pickle获取更多信息。
yaml - YAML有许多特性和JSON相同,除了它原生支持更多数据类型(包括日期、递归引用等
但是Python的YAML库比JSON库要慢
如果您需要一组更有表达力的数据类型,并且需要保持跨语言兼容性,那么YAML可能比上面的更适合。
See http://yaml.org/ for more information.
msgpack - msgpack是一种二进制序列化格式,更接近JSON功能。然而,它还很年轻,在这一点上,支持应该被认为是实验性的。
See http://msgpack.org/ for more information.
序列化使用的方法需要在header中标识,以便于worker可以反序列化。如果使用自定义序列化,那么worker也需要支持。
以下顺序用于确定发送任务时使用的序列化方法:
- serializer 执行选项
- Task.serializer属性
- task_serializer设置
为单个任务调用设置自定义序列化程序的示例:
>>> add.apply_async((10, 10), serializer='json')
Compression:压缩
Celery可以使用以下几种内置方法压缩消息:
brotli
Brotli是针对Web,特别是针对小文本文档而优化的。它对于提供静态内容(如字体和html页面)最为有效。
要使用的话需要安装对应的库:
pip install celery[brotli]
bzip2
bzip2可以创建比gzip更小的文件,但压缩和解压速度明显比gzip要慢
要使用的话得确保Python的编译选项中开启了对bzip2的支持。
如果收到了下面的ImportError,则表示没有开启对bzip2的支持:
>>> import bz2 Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'bz2'
gzip
gzip适用于需要小内存占用的系统,这使其成为内存有限的系统的理想选择。它通常用于生成扩展名为
.tar.gz
的文件。要使用的话得确保Python的编译选项中开启了对gzip的支持。
如果收到了下面的ImportError,则表示没有开启对gzip的支持:
>>> import gzip Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'gzip'
lzma
lzma提供良好的压缩比,并以较快的压缩和解压缩速度执行,但会占用较高的内存。
要使用的话得确保Python的编译选项中开启了对lzma的支持,且Python版本大于等于3.3.
如果收到了下面的ImportError,则表示没有开启对lzma的支持:
>>> import lzma Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'lzma'
或者可以使用下面的方法安装:
$ pip install celery[lzma]
zlib
zlib是一个库形式的Deflate算法的抽象,在其API中包括对gzip文件格式和轻量级流格式的支持。它是许多软件系统的重要组成部分 - Linux内核和Git VCS等。
要使用的话得确保Python的编译选项中开启了对zlib的支持
如果收到了下面的ImportError,则表示没有开启对zlib的支持:
>>> import zlib Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'zlib'
zstd
zstd以zlib级的实时压缩方案和更好的压缩比为目标。它由Huff0和FSE库提供一个非常快的熵级支持(entropy stage)。
要使用这个算法需要安装以下模块:
$ pip install celery[zstd]
你也可以在kombu compression registry中注册自己的算法来使用。
以下顺序用于确定发送任务时使用的压缩方案:
- compression 执行选项
- Task.compression属性
- task_compression属性
指定调用任务时使用的压缩的示例:
>>> add.apply_async((2, 2), compression='zlib')
Connections:连接
自动池支持
- 从2.3版开始,支持自动连接池,因此您不必手动处理连接和发布者即可复用连接。
- 从2.5版开始,默认情况下启用连接池。
- 有关 broker_pool_limit 更多信息,请参见设置。
您可以通过创建发布者来手动处理连接:
results = []
with add.app.pool.acquire(block=True) as connection:
with add.get_publisher(connection) as publisher:
try:
for args in numbers:
res = add.apply_async((2, 2), publisher=publisher)
results.append(res)
print([res.get() for res in results])
尽管这个特殊的例子用一个组来表达要好得多:
>>> from celery import group
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()
>>> res.get()
[4, 8, 16, 32]
Routing options:路由选项
Celery可以将任务路由到不同的队列。
使用queue
选项可以完成简单的路由(name <-> name)
add.apply_async(queue='priority.high')
然后可以通过 -Q
选项将workers分配到priority.high
队列:
$ celery -A proj worker -l INFO -Q celery,priority.high
另请参阅:
硬编码队列名是不推荐的,最佳实践是使用configuration routers(task_routes)
更多关于路由的信息可以查看任务路由
Results options:结果选项
可以使用task_ignore_result设置或使用ignore_result
选项来控制开关结果存储。
>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None
>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3
如果您想在结果后端中存储有关任务的其他元数据,请将result_extended设置为True
。
另请参阅:
更多关于tasks的信息可以查看Tasks。
Advanced Options:高级选项
这部分是给想要发挥AMQP的完整路由特性的高级用户准备的。感兴趣的用户可阅读任务路由。
交换(exchange)
发送信息的 exchange(或者 kombu.entity.Exchange) 的名称。
routing_key
用于确定路由的密钥。
优先(priority)
0~255 之间的数字,其中255是最高优先级。
支持:RabbitMQ,Redis(优先级颠倒,最高为0)。