信号

当应用程序的其他地方发生某些操作时,信号允许解耦的应用程序接收通知。

Celery 附带了许多信号,您的应用程序可以使用这些信号来增强某些操作的行为。

Basics:基础

有几种类型的事件触发信号,您可以连接到这些信号以在它们触发时执行操作。

比如连接到after_task_publish信号:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

有些信号也有发送者,您可以根据发送者进行筛选。比如after_task_publish使用任务名作为发送者,因此,通过为 connect open in new window提供 sender 参数,您可以在每次发布名为“proj.tasks.add”的任务时连接要调用的处理程序:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

信号使用与django.core.dispatch相同的实现。因此,默认情况下,其他关键字参数(如信号)将传递给所有信号处理程序。

信号处理程序的最佳实践是接受任意关键字参数(i.e., **kwargs)这样,新的Celery版本可以添加额外的参数,而不会破坏用户代码。

Signals:信号

Task Signals:任务信号

before_task_publish

3.1新增特性

在发布任务之前发出。请注意,这是在发送任务的过程中执行的。

Sender和正在发送的任务名同名。

提供的参数:

after_task_publish

当任务已发送到broker时发出。请注意,这是在发送任务的进程中执行的。

Sender和正在发送的任务名同名。

提供的参数:

task_prerun

任务执行前发出。

Sender是将要被执行的任务对象

提供的参数:

  • task_id

    要执行的任务ID

  • task

    要执行的任务

  • args

    任务的位置参数

  • kwargs

    任务的关键字参数

task_postrun

任务执行之后发送

Sender是执行的任务对象

提供的参数:

  • task_id

    执行的任务ID

  • task

    执行的任务

  • args

    位置参数

  • kwargs

    关键字参数

  • retval

    任务返回值

  • state

    结果状态的名称

task_retry

当任务被重试的时候发送

Sender是任务对象

提供的参数:

  • request

    当前任务请求

  • reason

    重试原因(通常是异常实例,但总是可以强制为 str)

  • einfo

    异常的细节,包括traceback(一个billiard.einfo.ExceptionInfo对象

task_success

任务成功时发送。

Sender是被执行的任务实例

提供的参数:

  • result

    任务的返回值

task_failure

任务失败时发送。

Sender是被执行的任务实例

提供的参数:

  • task_id

    任务ID

  • exception

    Exception实例

  • args

    任务调用的位置参数

  • kwargs

    任务调用的关键字参数

  • traceback

    Stack trace 对象

  • einfo

    billiard.einfo.ExceptionInfo实例

task_internal_error

执行任务的过程中Celery内部出现错误时发送

Sender是被执行的任务实例

提供的参数:

  • task_id

    任务ID

  • args

    任务调用的位置参数

  • kwargs

    任务调用的关键字参数

  • request

    原始请求字典。This is provided as the task.request may not be ready by the time
    the exception is raised.

  • exception

    Exception实例

  • traceback

    Stack trace 对象

  • einfo

    billiard.einfo.ExceptionInfo实例

task_received

从broker接收到一个任务并准备好执行时发送

Sender是consumer对象

提供的参数:

  • request

    这是一个Requestopen in new window实例,不是task.request,使用prefork pool时,这个信号在父进程中发送,因此task.request不可用也不应该使用。请改用此对象,因为它们共享许多相同的字段。

task_revoked

任务被worker撤销/终止时发送。

Sender是被终止/撤销的任务对象

提供的参数:

  • request

    这是一个Requestopen in new window实例,不是task.request,使用prefork pool时,这个信号在父进程中发送,因此task.request不可用也不应该使用。请改用此对象,因为它们共享许多相同的字段。

  • terminated

    如果任务被终止,则设置为True

  • signum

    用于终止任务的Signal number。如果是None且terminated是True则应假定为TERM

  • expired

    如果任务过期则为True

task_unknown

当worker收到未注册的任务消息时发送。

Sender is the worker Consumeropen in new window

提供的参数:

  • name

    未注册的任务名

  • id

    消息中发现的任务ID

  • message

    原始消息对象

  • exc

    发生的错误。

task_rejected

当worker在其任一任务队列中收到未知类型的消息时发送

Sender is the worker Consumeropen in new window

提供的参数:

  • message

    原始消息对象

  • exc

    发生的错误(如果有)。

App Signals:应用信号

import_modules

当一个程序(worker, beat, shell等)要求导入include和imports设置中要导入的模块时发送

Sender是app实例

Worker Signals:Worker信号

celeryd_after_setup

此信号在设置worker实例后、调用前发送。这意味着启用了celery worker -Qopen in new window选项中的所有队列,并设置了日志记录等。

可以用于添加一个自定义的固定消费队列,忽略celery worker -Qopen in new window选项。这个示例给每一个worker设置了一个队列,然后,可以使用这些队列将任务路由到任何指定的worker:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)

提供的参数:

celeryd_init

这是celery worker启动后发送的第一个信号。The sender is the host name of the worker,因此,此信号可用于设置worker的特定配置:

from celery.signals import celeryd_init

@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
    conf.task_default_rate_limit = '10/m'

或者可以在连接时不指定sender来发送给多个worker

from celery.signals import celeryd_init

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('worker1@example.com', 'worker2@example.com'):
        conf.task_default_rate_limit = '10/m'
    if sender == 'worker3@example.com':
        conf.worker_prefetch_multiplier = 0

提供的参数:

  • sender

    worker所在节点名称

  • instance

    这是要初始化的celery.apps.worker.Workeropen in new window实例。注意此时只有apphostname(nodename)属性被设置,__init__的其余部分尚未执行。

  • conf

    当前app的配置

  • options

    通过命令行参数传递给worker的选项(包含默认选项)

worker_init

worker启动之前发送

worker_ready

worker准备好接受工作时发送

heartbeat_sent

当Celery发送了一个worker heartbeat时发送

Sender是celery.worker.heartbeat.Heartopen in new window实例

worker_shutting_down

当worker开始关闭进程时发送

提供的参数:

  • sig

    收到的POSIX信号

  • how

    关闭方法,温和还是冷酷

  • exitcode

    使用的退出状态码

worker_process_init

Dispatched in all pool child processes when they start.

请注意,该信号的处理程序阻塞时间不得超过4S,否则将视为启动失败从而杀死进程

worker_process_shutdown

Dispatched in all pool child processes just before they exit.

Note:不保证该信号一定会发送

提供的参数:

  • pid

    即将关闭的子进程的pid

  • exitcode

    子进程要使用的退出状态码

worker_shutdown

在工作进程即将关闭时发送

Beat Signals:Beat信号

beat_init

celery beat启动时发送(包括独立和集成的

Sender是celery.beat.Serviceopen in new window实例

beat_embedded_init

Dispatched in addition to the beat_initopen in new window signal when celery beat is started as an embedded process.

Sender是celery.beat.Serviceopen in new window实例

Eventlet Signals:Eventlet信号

eventlet_pool_started

当eventlet pool启动时发送

Sender是celery.concurrency.eventlet.TaskPoolopen in new window实例

eventlet_pool_preshutdown

Sent when the worker shutdown, just before the eventlet pool is requested to wait for remaining workers.

Sender是celery.concurrency.eventlet.TaskPoolopen in new window实例

eventlet_pool_postshutdown

Sent when the pool has been joined and the worker is ready to shutdown.

Sender是celery.concurrency.eventlet.TaskPoolopen in new window实例

eventlet_pool_apply

Sent whenever a task is applied to the pool.

Sender是celery.concurrency.eventlet.TaskPoolopen in new window实例

提供的参数:

  • target

    目标函数

  • args

    位置参数

  • kwargs

    关键字参数

Logging Signals:日志信号

setup_logging

如果连接了此信号,Celery 将不会配置loggers,因此您可以使用它来完全覆盖您自己的log配置。

如果你想通过 Celery 增加日志配置设置,那么你可以使用 after_setup_logger 和 after_setup_task_logger 信号。

提供的参数:

  • loglevel

    日志等级

  • logfile

    日志文件名

  • format

    日志格式

  • colorize

    日志消息是否使用color

after_setup_logger

在设置每个全局logger(非任务logger)之后发送.用于增加日志配置

提供的参数:

  • logger

    logger对象

  • loglevel

    日志等级

  • logfile

    日志文件名

  • format

    日志格式

  • colorize

    日志消息是否使用color

after_setup_task_logger

在设置每个任务logger之后发送.用于增加日志配置

提供的参数:

  • logger

    logger对象

  • loglevel

    日志等级

  • logfile

    日志文件名

  • format

    日志格式

  • colorize

    日志消息是否使用color

Command Signals:命令信号

user_preload_options

This signal is sent after any of the Celery command line programs are finished parsing the user preload options.

It can be used to add additional command-line arguments to the celery umbrella command:

from celery import Celery
from celery import signals
from celery.bin.base import Option

app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))

@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()

Sender is the Command instance, and the value depends on the program that was called (e.g., for the umbrella command it'll be a CeleryCommand) object).

提供的参数:

  • app

    app实例

  • options

    Mapping of the parsed user preload options (with default values).

Deprecated Signals:弃用的信号

task_sent

该信号已弃用,请改用after_task_publish