Workers指南

启动worker

你可以使用celery -A proj worker -l INFO命令在前台启动worker。

守护进程:

你可能希望以守护进程的方式在后台运行,Daemonization章节介绍了流行的服务管理程序以便后台运行。

完整的命令行选项参考workeropen in new window部分,或者执行celery worker --help

你可以在同一台机器上运行多个worker,但必须给每个worker指定名称并且需要附加--hostname argument:

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

hostname 变量可以展开为以下内容:

  • %h :Hostname,包括domain name。
  • %n : Hostname
  • %d : Domain name

比如当前hostname为george.example.com

VariableTemplateResult
%hworker1@%hworker1@george.example.com
%nworker1@%nworker1@george
%dworker1@%dworker1@example.com

Note for supervisoropen in new window user

%符号使用%%来转义

停止worker

关闭应该使用TERM信号完成。

一旦开始关闭,worker会在实际终止之前实际完成所有当前正在执行的任务。(When shutdown is initiated the worker will finish all currently executing tasks before it actually terminates.)如果这些任务很重要,则应等待任务完成后再执行任何激烈的操作,如发送KILL信号。

如果在considerate时间之后worker仍未关闭,比如陷入了无限循环或其他类似的事情,你可以发送KILL信号强制终止,不过这会导致正在执行的任务丢失(除非任务设置了acks_lateopen in new window

同样,由于进程不能重写KILL信号,worker将不能获取它的子进程;请确保手动执行此操作。此命令通常会以下面的形式操作:

$ pkill -9 -f 'celery worker'

如果没有pkill命令,你可以使用这个稍微长一点的版本:

$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9

5.2版本的变化:在Linux系统上,Celery现在支持在worker终止后向所有子进程发送KILL信号。是通过prctl(2)PR_SET_PDEATHSIG选项实现的

重启worker

要重启worker,应该发送一个TERM信号,然后启动一个新实例。对于开发环境更简单的办法是使用celery multi:

$ celery multi start 1 -A proj -l INFO -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

对于生产环境,你应该使用 init-scripts 或者进程管理工具(查看Daemonization章节)

除了停止然后启动的方式重启,还可以通过发送HUP信号重启。请注意,这种方式worker会负责重启自己,这很容易出现问题,不建议生产中使用:

$ kill -HUP $pid

仅当worker作为守护进程在后台运行时,通过HUP重新启动才有效(它没有控制终端)。

macOS上HUP不可用。

Process Signals:进程信号

worker 的主进程会重写下面的信号:

TERM温和停止,等待任务完成
QUIT尽快终止
USR1Dump traceback for all active threads.
USR2Remote debug, see celery.contrib.rdb``.

文件路径中的变量

文件路径参数--logfileopen in new window,--pidfileopen in new window,和--statedbopen in new window可以包含变量,worker会进行变量替换。

Node name替换

  • %p:完整的node name
  • %h:包含domain name的hostname
  • %n:仅hostname
  • %d:仅domain name
  • %i:Prefork pool process index or 0 if MainProcess.
  • %I:Prefork pool process index with separator.

比如当前hostname为george@foo.example.com那么:

  • --logfile=%p.log -> george@foo.example.com.log

  • --logfile=%h.log -> foo.example.com.log

  • --logfile=%n.log -> george.log

  • --logfile=%d.log -> example.com.log

Prefork pool process index

The prefork pool process index specifiers will expand into a different filename depending on the process that’ll eventually need to open the file.

这可用于为每个子进程指定一个日志文件。

Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. That is, the number is the process index not the process count or pid.

  • %i - Pool process index or 0 if MainProcess.

    Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:

    • worker1-0.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)
  • %I - Pool process index with separator.

    Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:

    • worker1.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)

Concurrency:并发

默认情况下,多进程(multiprocessing)用于处理并发任务,不过你也可以使用Eventlet。worker 的processes/threads可以通过--concurrencyopen in new window参数指定,默认是主机可用的CPUs。

Number of processes (multiprocessing/prefork pool)

More pool processes are usually better, but there's a cut-off point where adding more pool processes affects performance in negative ways. There's even some evidence to support that having multiple worker instances running, may perform better than having a single worker. For example 3 workers with 10 pool processes each. You need to experiment to find the numbers that works best for you, as this varies based on application, work load, task run times and other factors.

这段话应该是想说processes理论上越多越好,不过有个临界值,过了之后性能会下降,至于最优解你得自己试。因为这基于应用程序、工作负载、任务运行时间和其它因素而变化。

Remote control:远程控制

2.0新增特性

pool support: prefork, eventlet, gevent, thread, blocking:solo (see note)

broker support: amqp, redis

The celery command

celery 程序/命令通常用来在命令行执行远程控制命令。下面列出的所有命令都支持。查看命令行管理实用程序获取更多信息。

Workers具有使用高优先级广播消息队列进行远程控制的能力。可以是所有Workers或者是指定的Workers。

命令也可以有回复。客户端会等待并收集这些回复。由于没有像类似注册中心这样的东西来获知集群中有多少worker,因此客户端有一个可以配置的超时时间,默认是1S。如果在该时间内没有收到回复,并不意味着就是没有回复或者worker挂了,也有可能是网络延迟或者worker处理的比较慢,因此可以根据需要调整超时时间。

In addition to timeouts, the client can specify the maximum number of replies to wait for. If a destination is specified, this limit is set to the number of destination hosts.

Note:

solo pool支持远程控制命令,但是命令的等待会被任务的执行阻塞,因此,如果worker比较忙,可能会收不到回复,这种情况必须增加客户端的等待时间。

broadcast()open in new window 方法

这是用来发送命令到workers的客户端方法。一些后台命令有高级接口,这些接口在后台使用broadcast()open in new window ,比如rate_limit()open in new windowping()open in new window

Some remote control commands also have higher-level interfaces using broadcast() in the background, like rate_limit(), and ping().

也许应该翻译成一些远程控制命令还具有在后台使用broadcast()open in new window 的高级接口,比如rate_limit()open in new windowping()open in new window

发送rate_limit()open in new window命令和关键字参数:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

这将异步发送命令,而不等待答复。要请求回复,您必须使用reply参数:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

使用 destination 参数必须指定一个workers的列表来接收命令:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]

当然,使用高级接口来设置 rate limit 是方便多了,不过这里有一些只能使用broadcast()的命令

Commands:命令

revoke: 撤销任务

pool **support:**all,terminate(终止)只支持prefork和eventlet

broker supp​ortamqp,redis

**command:**​celery -A proj control revoke <task_id>

所有工作者节点都保留一个已撤销任务ID的内存,该内存可以是内存中的,也可以是磁盘上的持久性内存(请参见Persistent revokes)。

当worker接收到撤销任务的请求时会跳过执行该任务,对于已经开始执行的任务并不会终止,除非设置了terminate选项。

Note:

当任务卡住时,终止选项是管理员的最后手段。 这不是为了终止任务,而是为了终止正在执行任务的进程,并且该进程可能在发送信号时已经开始处理另一个任务,因此您绝对不能以编程方式调用它。

PS:也就是说,不要把这东西写进代码里

如果设置了terminate 那么执行任务的worker的子进程会被终止。默认发送的信号是TERM, 不过你也可以使用signal参数指定其他的。Python signalopen in new window标准库中的所有信号都可以,不过得是大写的形式。

终止任务也会撤消该任务。

示例:

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')

Revoking multiple tasks:终止多个任务

3.1新增功能

revoke方法可以接收一个list参数,可以同时撤销多个任务。

示例:

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

3.1之后GroupResult.revoke方法就是这样干的。

Persistent revokes

撤销任务是通过发送一个广播消息给所有的workers,workers会在内存中维护一个已撤销任务的list。当一个worker启动时,它会同集群中的其他workers同步这个列表。

这个列表是在内存中的,因此当重启所有的worker时,这个列表会消失。如果你想在重启后依旧保留,可以通过给celery worker 指定--statedb参数来保存到一个文件:

$ celery -A proj worker -l INFO --statedb=/var/run/celery/worker.state

如果你使用celery muliti 的话,你可能希望每个worker实例保存到单独的文件中,使用类似于%n这样的格式代表节点名称:

celery multi start 2 -l INFO --statedb=/var/run/celery/%n.state

参见文件路径中的变量

请注意,远程控制命令必须正常工作,撤销才能正常工作。目前只有RabbitMQ(amqp)和Redis支持远程控制命令。

Time Limits:时间限制

2.0新增特性

pool support: prefork/gevent

一个任务有几率永远运行下去,这会阻塞所有新任务。避免这种情况发生的最好办法是使用时间限制。

时间限制(--time-limit)是一个任务在执行它的进程被终止并被新进程取代之前可以运行的最大秒数。你可以开启软限制(--soft-time-limit),这会触发一个可捕获的异常以便在硬时间到来之前清理该任务。

Soft,or hard?

时间限制设置为软和硬两个值。 软时间限制允许任务在被杀死之前捕获异常以进行清理:硬超时是不可捕获的并且强制终止任务。

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

时间限制也可以使用task_time_limit以及task_soft_time_limit这两个设置。

Note:

不支持SIGUSR1信号的平台时间限制无法正常工作

Note:

gevent pool没有实现软限制。同时,其硬限制也不会强制执行。

Changing time limits at run-time:在运行时更改时间限制

2.3新增

broker support: amqp, redis

time_limit远程控制命令可以让你同时更改一个任务的两种时间限制。

比如,更改tasks.crawl_the_web任务的软限制为1分钟,硬限制为2分钟:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

只有在时间限制更改后开始执行的任务才会受到影响。

Rate Limits:速率限制

Changing rate-limits at run-time

示例:更改myapp.mytask任务的速率限制,使其每分钟最多执行200个该类型的任务

>>> app.control.rate_limit('myapp.mytask', '200/m')

上述内容未指定destination,因此更改请求将影响群集中的所有工作实例。如果您只想影响特定的工作线程列表,则可以包含destination参数:

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['celery@worker1.example.com'])

警告:

对于开启了worker_disable_rate_limits的worker,该配置无效。

Max tasks per child setting

2.0新增

pool support: prefork

使用该选项指定一个worker被替换之前可以执行的最大任务数量。

如果您无法控制内存泄漏(例如,来自不开源的C语言写的扩展),这将非常有用。

该选项还可以通过workers的--max-tasks-per-childopen in new window命令行选项或者worker_max_tasks_per_child设置。

Max memory per child setting

4.0新增

pool support: prefork

使用该选项指定一个worker被替换之前可以使用的最大任务常驻内存。

如果您无法控制内存泄漏(例如,来自不开源的C语言写的扩展),这将非常有用。

该选项还可以通过workers的--max-memory-per-childopen in new window命令行选项或者worker_max_memory_per_child设置。

Autoscaling:自动扩展

2.2新增

pool support: preforkgevent

autoscaler 组件用来根据负载动态调整pool的大小:

  • 当有工作要做时,autoscaler会添加更多的pool processes,

    • 在工作负载较低时开始删除进程。

通过--autoscaleopen in new window选项来开启,该选项有两个参数:pool processes的最大和最小值。

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

或者可以写一个Autoscaler的子类自定义自动扩展的规则,比如基于负载均衡或者内存使用之类的。你可以使用worker_autoscaler设置指定自定义的autoscaler

Queues:队列

一个worker示例可以从任意多队列中消费。默认情况下会从task_queues设置中定义的队列中消费(如果没指定,那就是默认队列celery

你可以在启动时指定要消费的队列,通过给-Q选项制定一个冒号分割的列表

$ celery -A proj worker -l INFO -Q foo,bar,baz

如果队列名在task_queues中定义了,则使用其定义的队列,如果没有定义,则生成一个新的队列(取决于task_create_missing_queues选项

你还可以通过远程控制命令控制worker开始或停止从某个队列中消费通过add_consumercancel_consumer

Queues: Adding consumers

add_comsumer命令会通知一个或多个workers开始从某个队列中消费。这个操作时幂等的。

要通知集群中的所有workers开始从名为"foo"的队列中消费,你可以使用 celery control 程序:

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

如果想要指定一个特定的worker可以使用--destinationopen in new window选项

$ celery -A proj control add_consumer foo -d celery@worker1.local

同样的操作也可以通过app.control.add_consumer()open in new window方法动态完成

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

到目前为止,我们只展示了使用自动队列的示例,如果您需要更多控制,还可以指定 exchangerouting_key 甚至其他选项:

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])

Queues: Canceling consumers

可以使用 cancel_consumer 命令取消一个队列

要强制取消集群中所有worker消费的某个队列,可以使用 celery control 程序:

$ celery -A proj control cancel_consumer foo

使用 --destinationopen in new window选项可以自定要操作的worker,一个或者一个worker列表:

$ celery -A proj control cancel_consumer foo -d celery@worker1.local

或者你也可以是在代码中使用app.control.cancel_consumer()open in new window方法取消:

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]

Queues: List of active queues

你可以通过active_queues控制命令获取worker从那些队列中消费:

$ celery -A proj inspect active_queues
[...]

和其他远程控制命令一样,也支持--destinationopen in new window参数

$ celery -A proj inspect active_queues -d celery@worker1.local
[...]

也可以在代码中使用active_queues()open in new window方法实现:

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]

Inspecting workers

app.control.inspectopen in new window可以让你检查运行中的workers。其背后使用的是远程控制命令。

你也可以使用celery命令检查workers,和app.controlopen in new window接口支持同样的命令

>>> # Inspect all nodes.
>>> i = app.control.inspect()

>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')

Dump of registered tasks

使用registered()open in new window方法以获取worker中注册的任务列表

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]

Dump of currently executing tasks

使用active()open in new window方法获取活动任务列表

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

Dump of scheduled (ETA) tasks

使用scheduled()open in new window方法获取待调度的任务列表

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

Note:包括ETA/countdown任务,不包括周期性任务

Dump of reserved tasks

Reserved 任务是已经被接收到但仍在等待执行的任务。

可以通过reserved()open in new window方法获取此类任务列表:

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

Statistics

远程控制命令inspect stats(或者stats()open in new window方法)可以为您提供一长串有关该worker的有用(或不太有用)统计信息:

$ celery -A proj inspect stats

关于输出的详情,参考stats()open in new window

Additional Commands:其他指令

Remote shutdown

此命令可以用远程的方式优雅的关闭worker:

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='worker1@example.com')

Ping

对worker发送ping请求,如果worker还活着会返回'pong',默认1s超时,也可以自定义超时时间。

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping()open in new window方法支持 destination 选项,因此你可以指定worker来ping:

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

Enable/disable events

可以 enable/disable 事件通过使用 enable_events, disable_events 命令。这对于临时监视使用celery events/celerymon的worker非常有用。

>>> app.control.enable_events()
>>> app.control.disable_events()

编写自己的远程控制命令

有两种远程控制命令:

  • 检查类命令

    无副作用,只是返回从worker中获取到的值,比如当前注册的任务、活动任务列表等

  • 控制类命令

    会产生副作用,比如添加一个要消费的队列

远程控制命令在控制面板中注册,它们采用单个参数:当前ControlDispatch实例。From there you have access to the active Consumer`` if needed.

这是一个获取预读取任务数量的命令:

from celery.worker.control import control_command

@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

请确保将此代码添加到由worker导入的模块中,这应该和你定义Celery app模块的地方是一样的,或者可以添加到imports设置中

Make sure you add this code to a module that is imported by the worker: this could be the same module as where your Celery app is defined, or you can add the module to the imports`` setting.

重启worker以便注册命令,之后就可以通过celery control 调用该命令:

$ celery -A proj control increase_prefetch_count 3

也可以给 celery inspect 添加动作,比如这是一个读取当前预读数量的示例:

from celery.worker.control import inspect_command

@inspect_command()
def current_prefetch_count(state):
    return {'prefetch_count': state.consumer.qos.value}

重启之后就可以使用辣:

$ celery -A proj inspect current_prefetch_count