下一步

本章将会介绍Celery的更多细节,包括如何给你的应用和库添加Celery支持。

在你的应用中使用Celery

我们的项目

目录结构

proj/__init__.py
    /celery.py
    /tasks.py

celery.py

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='rpc://',
             include=['proj.tasks'])

## Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

这个模块中创建了一个Celery``实例。要在自己的项目中使用Celery,只需要导入该实例即可。

  • broker参数指定了要使用的broker的URL

    查看 选择中间件 获取更多信息。

  • backend参数指定了要使用的backend的URL

    这用来追踪任务状态和结果。为了演示如何检索结果,这里配置了RPC后端,默认情况下是没有后端配置的。也可以自定义后端,或者不用。也可以使用@task(ignore_result=True) 选项来禁用单个任务的后端。可以查看保存结果获取更多信息

  • include参数是要导入的任务列表。

tasks.py

from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动worker

celery程序可以用来启动worker,在proj目录上层执行celery -A proj worker -l INFO来启动worker。成功启动后应该能看到如下信息:

--------------- celery@halcyon.local v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
  • broker 即之前在celery模块中配置的broker参数的URL,也可以通过在命令行使用 -bopen in new window 指定一个新的(覆盖配置。

  • concurrency 是启动的worker线程的数量。如果所有线程忙碌,那么新任务将会等待,直到某个线程空闲。

    该参数默认的数量是CPU的核心数量。可以使用 celery worker -copen in new window 选项指定,没有推荐的数值,最佳数量通常取决于多个因素,但是如果你的任务大部分是 I/O 限制的,那么你可以尝试增加它。实验表明,改数量大于两倍CPU数量时效果不明显,甚至可能导致性能下降。

    Celery还支持Eventlet,Gevent和单线程运行。查看并发with Eventlet获取更多信息。

  • Events 用来发送监控信息。如celery events 和 Flower(Celery的实时监控)之类的监控程序。更多信息可以查看监控和管理指南。

  • Queues 是worker用来消费tasks的队列清单。可以指定worker一次使用多个队列,这用于将消息路由到特定的worker,以实现服务质量、关注点分离和优先级划分,所有这些都在任务路由中进行了描述

使用 --help 选项获取命令行的完整参数列表。这些选项在Workers指南有更详细的介绍。

celery worker --help

后台运行

生产中你可能需要后台运行Celery,更详细的内容在Daemonization。

守护进程脚本使用celery multi 命令来后台启动一个或多个workers:

$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

你也可以重启它:

$ celery  multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

或者停止:

$ celery multi stop w1 -A proj -l INFO

stop命令是异步的,它不会等待worker进程结束。你可能更想要使用 stopwait 来替代,这可以确保退出之前任务被正确执行:

$ celery multi stopwait w1 -A proj -l INFO

Note:
celery multi 不存储关于workers的信息,因此重启时你需要使用相同的命令行参数。停止的时候只需要确保pidfile和logfile相同即可。

默认情况下会在当前目录创建pid和log文件。推荐放在专用文件夹,以避免multiple workers launching on top of each other:

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

使用 multi 命令可以启动多个 worker,并且有一个强大的命令行语法来为不同的 worker 指定参数,比如:

$ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

更多示例参考 multiopen in new window 模块的API,示例搬过来了。

$ # Single worker with explicit name and events enabled.
$ celery multi start Leslie -E

$ # Pidfiles and logfiles are stored in the current directory
$ # by default.  Use --pidfile and --logfile argument to change
$ # this.  The abbreviation %n will be expanded to the current
$ # node name.
$ celery multi start Leslie -E --pidfile=/var/run/celery/%n.pid
                               --logfile=/var/log/celery/%n%I.log

$ # You need to add the same arguments when you restart,
$ # as these aren't persisted anywhere.
$ celery multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
                                 --logfile=/var/log/celery/%n%I.log

$ # To stop the node, you need to specify the same pidfile.
$ celery multi stop Leslie --pidfile=/var/run/celery/%n.pid

$ # 3 workers, with 3 processes each
$ celery multi start 3 -c 3
celery worker -n celery1@myhost -c 3
celery worker -n celery2@myhost -c 3
celery worker -n celery3@myhost -c 3

$ # override name prefix when using range
$ celery multi start 3 --range-prefix=worker -c 3
celery worker -n worker1@myhost -c 3
celery worker -n worker2@myhost -c 3
celery worker -n worker3@myhost -c 3

$ # start 3 named workers
$ celery multi start image video data -c 3
celery worker -n image@myhost -c 3
celery worker -n video@myhost -c 3
celery worker -n data@myhost -c 3

$ # specify custom hostname
$ celery multi start 2 --hostname=worker.example.com -c 3
celery worker -n celery1@worker.example.com -c 3
celery worker -n celery2@worker.example.com -c 3

$ # specify fully qualified nodenames
$ celery multi start foo@worker.example.com bar@worker.example.com -c 3

$ # fully qualified nodenames but using the current hostname
$ celery multi start foo@%h bar@%h

$ # Advanced example starting 10 workers in the background:
$ #   * Three of the workers processes the images and video queue
$ #   * Two of the workers processes the data queue with loglevel DEBUG
$ #   * the rest processes the default' queue.
$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
    -Q default -L:4,5 DEBUG

$ # You can show the commands necessary to start the workers with
$ # the 'show' command:
$ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
    -Q default -L:4,5 DEBUG

$ # Additional options are added to each celery worker's command,
$ # but you can also modify the options for ranges of, or specific workers

$ # 3 workers: Two with 3 processes, and one with 10 processes.
$ celery multi start 3 -c 3 -c:1 10
celery worker -n celery1@myhost -c 10
celery worker -n celery2@myhost -c 3
celery worker -n celery3@myhost -c 3

$ # can also specify options for named workers
$ celery multi start image video data -c 3 -c:image 10
celery worker -n image@myhost -c 10
celery worker -n video@myhost -c 3
celery worker -n data@myhost -c 3

$ # ranges and lists of workers in options is also allowed:
$ # (-c:1-3 can also be written as -c:1,2,3)
$ celery multi start 5 -c 3  -c:1-3 10
celery worker -n celery1@myhost -c 10
celery worker -n celery2@myhost -c 10
celery worker -n celery3@myhost -c 10
celery worker -n celery4@myhost -c 3
celery worker -n celery5@myhost -c 3

$ # lists also works with named workers
$ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
celery worker -n foo@myhost -c 10
celery worker -n bar@myhost -c 10
celery worker -n baz@myhost -c 10
celery worker -n xuzzy@myhost -c 3

关于 --appopen in new window 参数

这个选项指定要使用的Celery app的实例,以 module.path:attribute的形式指定,也可以使用简写,如果只指定了包名,会按照下边的顺序搜索app实例:

使用 --app=proj:

  1. 名为 proj.app 的属性
  2. 名为 proj.celery 的属性
  3. proj模块中,任何属性值是Celery应用的,或者

如果都没有找到,会尝试寻找名为 proj.celery 的子模块:

  1. 名为 proj.celery.app 的属性
  2. 名为 proj.celery.celery 的属性
  3. proj.celery 模块中,任何属性值是Celery应用的

这个方案模仿了文档中的实践即: proj:app 用于包含单个模块,proj.celery:app 用于大型项目

调用任务

使用delay() 方法调用一个任务:

>>> from proj.tasks import add

>>> add.delay(2, 2)

这个方法实际上是 apply_async() 方法的一个快捷方式:

>>> add.apply_async((2, 2))

后者允许您指定执行选项,比如运行时间(倒计时)、应该将其发送到的队列,等等:

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

上面的示例中,任务将发送到名为“lopri”的队列中,并且最早10s后执行。

直接调用的话,任务将在当前进程中执行,(没有异步:

>>> add(2, 2)
4

这三个方法 delay()apply_sync()、和(__call__)构成了Celery调用接口,也用来签名。

关于这些API更详细的介绍在调用任务章节

每个任务被调用的时候都会赋予一个唯一的UUID,也叫作任务 id

delayapply_sync这两个方法会返回一个AsyncResult实例,可以用来追踪任务的执行状态。不过需要先进行任务结果后端设置

结果默认不保存,大多数任务保存返回值不是十分有用。所以,保持默认也是十分明智的。另外,结果后端不能用来监视任务和workers,Celery有专用的事件消息。参考监控和管理指南。

如果配置了结果后端,可以获取任务返回值:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

可以使用 id 属性获取任务id

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

如果任务抛出异常,你也可以检查异常信息,in fact result.get() will propagate any errors by default:

>>> res = add.delay(2, '2')
>>> res.get(timeout=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/result.py", line 221, in get
    return self.backend.wait_for_pending(
  File "celery/backends/asynchronous.py", line 195, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "celery/result.py", line 333, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "celery/result.py", line 326, in throw
    self.on_ready.throw(*args, **kwargs)
  File "vine/promises.py", line 244, in throw
    reraise(type(exc), exc, tb)
  File "vine/five.py", line 195, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'

或者

>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")

上述示例会返回异常实例,如果只是检查任务是否成功,有对应的方法可以使用:

>>> res.failed()
True

>>> res.successful()
False

如何做到的呢?通过查看任务 state:

>>> res.state
'FAILURE'

任务只能处于一个状态,但是会经过多个状态,阶段如下:

PENDING -> STARTED -> SUCCESS

started 是一个特殊状态,只有当task_track_started是开启时或者给任务设置了@task(track_started=True)才记录该状态。

pending 实际上也不是一个状态,任何任务 id unknown时默认为此状态:

>>> from proj.celery import app

>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

如果任务进行了重试,阶段可能变得更复杂,比如下面是重试两次的任务经历的阶段:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

更多任务状态的信息可以在States章节获取

任务调用更详细的细节在调用任务章节

Canvas: 设计 Work-flows

你已经学习了如何使用 delay 方法调用任务,通常这些就足够了。但有时您可能希望将任务调用的签名传递给另一个进程,或者作为另一个函数的参数,Celery为此使用了称为签名的东西。

签名包装了单个任务调用的参数和执行选项,以便可以将其传递给函数,甚至可以通过网络进行序列化和发送。

你可以按照下面的方式给 add 添加一个参数为(2, 2),countdown 为10的签名:

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

还有一个快捷方式:

>>> add.s(2, 2)
tasks.add(2, 2)

说实话这段没看明白,有没有演示。

And there's that calling API again...

签名实例支持API调用,意味着它可以使用 delayapply_async方法

下面一段罗里吧嗦说的其实就是任务参数可以在delay()方法和signature()方法中指定,也可以两边指定,简单说就是这两个方法的参数合起来能满足最终task所需参数即可。

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

这个示例是在signature()中指定

## incomplete partial: add(?, 2)
>>> s2 = add.s(2)
>>> res = s2.delay(8)
>>> res.get()
10

这个是两边指定

>>> s1 = add.s()
>>> res = s1.delay(2,3)
>>> res.get()
5

这个是在delay()中指定

注意:
这个参数是追加上去的,而且是加到前边,不是覆盖。在第二个示例中,任务最终收到的参数实际为add(8, 2),所以,如果参数给多了,会报错,少了也不行。不过关键字参数的逻辑是覆盖,即delay()中指定的会覆盖signatu()中指定的。

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

As stated, signatures support the calling API: meaning that

  • sig.apply_async(args=(), kwargs={}, **options)

    Calls the signature with optional partial arguments and partial keyword arguments. Also supports partial execution options.

  • sig.delay(*args, **kwargs)

    Star argument version of apply_async. Any arguments will be prepended to the arguments in the signature, and keyword arguments is merged with any existing keys.

So this all seems very useful, but what can you actually do with these? To get to that I must introduce the canvas primitives…

The Primitives

关于Primitives(原语)的解释可以看看Primitives。大概指的是某种事物的基本构成单元。

  • group

  • chain

  • chord

  • map

  • starmap

  • chunks

这些原语(Primitives)本身就是签名对象,所以它们可以任意组合为复杂的工作流。

Note:
这些示例需要后端存储,所以,要尝试的话,需要事先配置好。

一些示例:

Groups

groupopen in new window以并发形式调用任务列表,并且返回一个特殊的结果实例。该实例允许您将结果作为一组进行检查,并按顺序检索返回值

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


>>> s1 = add.s(1,2)
>>> s2 = add.s(3,4)
>>> s3 = add.s(5,6)
>>> s4 = add.s(7,8)
>>> group(s1,s2,s3,s4)().get()
[3, 7, 11, 15]
>>> s = [s1, s2, s3, s4]
>>> group(s)().get()
[3, 7, 11, 15]

或者:

>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

直接看示例吧,只能说,有点东西:

>>> from celery import chain
>>> from proj.tasks import add, mul

## (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

或者像这样:

>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(5).get()
72

甚至还可以简写:

>>> (add.s(5, 4) | mul.s(8))().get()
72

Chords

chord是一个带有带有回调的group:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

解释:
(add.s(i, i) for i in range(10)的结果当做参数传递给xsum.s(),然后返回结果。

等效于把 group 链接到另一个任务:

>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90

由于这些primitives都是signature类型,所以你可以随意组合,比如:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

更多信息参考设计工作流。(务必阅读)

Routing/路由

Celery支持AMQP提供的所有路由功能,也支持将消息发送到命名队列的简单路由

task_routes设置允许您按名称路由任务,并将所有内容集中在一个位置:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

也可以在apply_async中通过queue参数指定队列:

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

然后就可以通过celery worker -Q指定 worker 从此队列中消费:

$ celery -A proj worker -Q hipri

也可以指定从多个队列消费,多个队列用,隔开,默认队列的队列名为celery

$ celery -A proj worker -Q hipri,celery

指定的队列顺序不影响执行,多个队列权重是相同的。

更多 routing 的信息,包括如何充分使用 AMQP 的路由功能,参照任务路由

远程控制

如果使用的是RabbitMQ(AMQP)、Redis、Qpid作为broker,那么你可以在worker运行时控制、检查它。

比如查看现在worker正在执行功能的任务是:

$ celery -A proj inspect active

这个是通过广播消息实现的,因此集群中所有worker都可以收到远程控制信息。

也可以通过--destinationopen in new window选项指定一个或多个worker相应请求,该参数是一个以逗号分割的worker的主机名列表:

$ celery -A proj inspect active --destination=celery@example.com

如果不指定,那么所有的worker都会响应。

celery inspect 命令中包含的所有命令都不会对worker造成任何改变,它只返回worker内部情况的信息和统计信息。要查看inspect命令列表可以执行:

$ celery -A proj inspect --help

然后是celery control 命令,包含会改变worker工作行为的命令:

$ celery -A proj control --help

比如你可以强制开启workers的event message(用于监控任务和workers):

$ celery -A proj control enable_events

当events开启之后,你可以启动event dumper开查看workers正在做什么:

$ celery -A proj events --dump

或者开启curses接口:

$ celery -A proj events

监视结束后还可以关闭events:

$ celery -A proj control disable_events

celery status 命令用来获取在线的workers列表:

$ celery -A proj status

更多信息参考监控和管理指南

时区

默认使用UTC时间,想改的话使用timezone设置:

app.conf.timezone = 'Europe/London'

优化

默认没有对负载进行优化,默认配置对大量短时间任务以及少量长时间任务进行平衡,如果需要的话,参考优化