信号
当应用程序的其他地方发生某些操作时,信号允许解耦的应用程序接收通知。
Celery 附带了许多信号,您的应用程序可以使用这些信号来增强某些操作的行为。
Basics:基础
有几种类型的事件触发信号,您可以连接到这些信号以在它们触发时执行操作。
比如连接到after_task_publish信号:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))
有些信号也有发送者,您可以根据发送者进行筛选。比如after_task_publish使用任务名作为发送者,因此,通过为 connect 提供 sender 参数,您可以在每次发布名为“proj.tasks.add”的任务时连接要调用的处理程序:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))
信号使用与django.core.dispatch相同的实现。因此,默认情况下,其他关键字参数(如信号)将传递给所有信号处理程序。
信号处理程序的最佳实践是接受任意关键字参数(i.e., **kwargs)这样,新的Celery版本可以添加额外的参数,而不会破坏用户代码。
Signals:信号
Task Signals:任务信号
before_task_publish
3.1新增特性
在发布任务之前发出。请注意,这是在发送任务的过程中执行的。
Sender和正在发送的任务名同名。
提供的参数:
- body - 任务消息主题。 
- exchange - 要发送的exchange或Exchange对象的名称 
- routing_key - 发送消息时要使用的routing key 
- headers - headers(可修改) 
- properties - 消息属性(可修改) 
- declare 
- retry_policy - 重试选项的mapping,可以是kombu.Connection.ensure()的任何参数并且可以被修改。 
after_task_publish
当任务已发送到broker时发出。请注意,这是在发送任务的进程中执行的。
Sender和正在发送的任务名同名。
提供的参数:
- headers 
- body 
- exchange - 使用的exchange或Exchange对象的名称 
- routing_key - 使用的routing_key 
task_prerun
任务执行前发出。
Sender是将要被执行的任务对象
提供的参数:
- task_id - 要执行的任务ID 
- task - 要执行的任务 
- args - 任务的位置参数 
- kwargs - 任务的关键字参数 
task_postrun
任务执行之后发送
Sender是执行的任务对象
提供的参数:
- task_id - 执行的任务ID 
- task - 执行的任务 
- args - 位置参数 
- kwargs - 关键字参数 
- retval - 任务返回值 
- state - 结果状态的名称 
task_retry
当任务被重试的时候发送
Sender是任务对象
提供的参数:
- request - 当前任务请求 
- reason - 重试原因(通常是异常实例,但总是可以强制为 str) 
- einfo - 异常的细节,包括traceback(一个billiard.einfo.ExceptionInfo对象 
task_success
任务成功时发送。
Sender是被执行的任务实例
提供的参数:
- result - 任务的返回值 
task_failure
任务失败时发送。
Sender是被执行的任务实例
提供的参数:
- task_id - 任务ID 
- exception - Exception实例 
- args - 任务调用的位置参数 
- kwargs - 任务调用的关键字参数 
- traceback - Stack trace 对象 
- einfo - billiard.einfo.ExceptionInfo实例 
task_internal_error
执行任务的过程中Celery内部出现错误时发送
Sender是被执行的任务实例
提供的参数:
- task_id - 任务ID 
- args - 任务调用的位置参数 
- kwargs - 任务调用的关键字参数 
- request - 原始请求字典。This is provided as the - task.requestmay not be ready by the time
 the exception is raised.
- exception - Exception实例 
- traceback - Stack trace 对象 
- einfo - billiard.einfo.ExceptionInfo实例 
task_received
从broker接收到一个任务并准备好执行时发送
Sender是consumer对象
提供的参数:
- request - 这是一个Request实例,不是 - task.request,使用prefork pool时,这个信号在父进程中发送,因此- task.request不可用也不应该使用。请改用此对象,因为它们共享许多相同的字段。
task_revoked
任务被worker撤销/终止时发送。
Sender是被终止/撤销的任务对象
提供的参数:
- request - 这是一个Request实例,不是 - task.request,使用prefork pool时,这个信号在父进程中发送,因此- task.request不可用也不应该使用。请改用此对象,因为它们共享许多相同的字段。
- terminated - 如果任务被终止,则设置为True 
- signum - 用于终止任务的Signal number。如果是None且terminated是True则应假定为TERM 
- expired - 如果任务过期则为True 
task_unknown
当worker收到未注册的任务消息时发送。
Sender is the worker Consumer
提供的参数:
- name - 未注册的任务名 
- id - 消息中发现的任务ID 
- message - 原始消息对象 
- exc - 发生的错误。 
task_rejected
当worker在其任一任务队列中收到未知类型的消息时发送
Sender is the worker Consumer
提供的参数:
- message - 原始消息对象 
- exc - 发生的错误(如果有)。 
App Signals:应用信号
import_modules
当一个程序(worker, beat, shell等)要求导入include和imports设置中要导入的模块时发送
Sender是app实例
Worker Signals:Worker信号
celeryd_after_setup
此信号在设置worker实例后、调用前发送。这意味着启用了celery worker -Q选项中的所有队列,并设置了日志记录等。
可以用于添加一个自定义的固定消费队列,忽略celery worker -Q选项。这个示例给每一个worker设置了一个队列,然后,可以使用这些队列将任务路由到任何指定的worker:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)
提供的参数:
- sender - worker所在节点名称 
- instance - 这是要初始化的celery.apps.worker.Worker实例。注意此时只有app和hostname(nodename)属性被设置, - __init__的其余部分尚未执行。
- conf - 当前app的配置 
celeryd_init
这是celery worker启动后发送的第一个信号。The sender is the host name of the worker,因此,此信号可用于设置worker的特定配置:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
    conf.task_default_rate_limit = '10/m'
或者可以在连接时不指定sender来发送给多个worker
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('worker1@example.com', 'worker2@example.com'):
        conf.task_default_rate_limit = '10/m'
    if sender == 'worker3@example.com':
        conf.worker_prefetch_multiplier = 0
提供的参数:
- sender - worker所在节点名称 
- instance - 这是要初始化的celery.apps.worker.Worker实例。注意此时只有app和hostname(nodename)属性被设置, - __init__的其余部分尚未执行。
- conf - 当前app的配置 
- options - 通过命令行参数传递给worker的选项(包含默认选项) 
worker_init
worker启动之前发送
worker_ready
worker准备好接受工作时发送
heartbeat_sent
当Celery发送了一个worker heartbeat时发送
Sender是celery.worker.heartbeat.Heart实例
worker_shutting_down
当worker开始关闭进程时发送
提供的参数:
- sig - 收到的POSIX信号 
- how - 关闭方法,温和还是冷酷 
- exitcode - 使用的退出状态码 
worker_process_init
Dispatched in all pool child processes when they start.
请注意,该信号的处理程序阻塞时间不得超过4S,否则将视为启动失败从而杀死进程
worker_process_shutdown
Dispatched in all pool child processes just before they exit.
Note:不保证该信号一定会发送
提供的参数:
- pid - 即将关闭的子进程的pid 
- exitcode - 子进程要使用的退出状态码 
worker_shutdown
在工作进程即将关闭时发送
Beat Signals:Beat信号
beat_init
当celery beat启动时发送(包括独立和集成的
Sender是celery.beat.Service实例
beat_embedded_init
Dispatched in addition to the beat_init signal when celery beat is started as an embedded process.
Sender是celery.beat.Service实例
Eventlet Signals:Eventlet信号
eventlet_pool_started
当eventlet pool启动时发送
Sender是celery.concurrency.eventlet.TaskPool实例
eventlet_pool_preshutdown
Sent when the worker shutdown, just before the eventlet pool is requested to wait for remaining workers.
Sender是celery.concurrency.eventlet.TaskPool实例
eventlet_pool_postshutdown
Sent when the pool has been joined and the worker is ready to shutdown.
Sender是celery.concurrency.eventlet.TaskPool实例
eventlet_pool_apply
Sent whenever a task is applied to the pool.
Sender是celery.concurrency.eventlet.TaskPool实例
提供的参数:
- target - 目标函数 
- args - 位置参数 
- kwargs - 关键字参数 
Logging Signals:日志信号
setup_logging
如果连接了此信号,Celery 将不会配置loggers,因此您可以使用它来完全覆盖您自己的log配置。
如果你想通过 Celery 增加日志配置设置,那么你可以使用 after_setup_logger 和 after_setup_task_logger 信号。
提供的参数:
- loglevel - 日志等级 
- logfile - 日志文件名 
- format - 日志格式 
- colorize - 日志消息是否使用color 
after_setup_logger
在设置每个全局logger(非任务logger)之后发送.用于增加日志配置
提供的参数:
- logger - logger对象 
- loglevel - 日志等级 
- logfile - 日志文件名 
- format - 日志格式 
- colorize - 日志消息是否使用color 
after_setup_task_logger
在设置每个任务logger之后发送.用于增加日志配置
提供的参数:
- logger - logger对象 
- loglevel - 日志等级 
- logfile - 日志文件名 
- format - 日志格式 
- colorize - 日志消息是否使用color 
Command Signals:命令信号
user_preload_options
This signal is sent after any of the Celery command line programs are finished parsing the user preload options.
It can be used to add additional command-line arguments to the celery umbrella command:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()
Sender is the Command instance, and the value depends on the program that was called (e.g., for the umbrella command it'll be a CeleryCommand) object).
提供的参数:
- app - app实例 
- options - Mapping of the parsed user preload options (with default values). 
Deprecated Signals:弃用的信号
task_sent
该信号已弃用,请改用after_task_publish