信号
当应用程序的其他地方发生某些操作时,信号允许解耦的应用程序接收通知。
Celery 附带了许多信号,您的应用程序可以使用这些信号来增强某些操作的行为。
Basics:基础
有几种类型的事件触发信号,您可以连接到这些信号以在它们触发时执行操作。
比如连接到after_task_publish信号:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
有些信号也有发送者,您可以根据发送者进行筛选。比如after_task_publish使用任务名作为发送者,因此,通过为 connect 提供 sender 参数,您可以在每次发布名为“proj.tasks.add”的任务时连接要调用的处理程序:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
信号使用与django.core.dispatch相同的实现。因此,默认情况下,其他关键字参数(如信号)将传递给所有信号处理程序。
信号处理程序的最佳实践是接受任意关键字参数(i.e., **kwargs
)这样,新的Celery版本可以添加额外的参数,而不会破坏用户代码。
Signals:信号
Task Signals:任务信号
before_task_publish
3.1新增特性
在发布任务之前发出。请注意,这是在发送任务的过程中执行的。
Sender和正在发送的任务名同名。
提供的参数:
body
任务消息主题。
exchange
要发送的exchange或Exchange对象的名称
routing_key
发送消息时要使用的routing key
headers
headers(可修改)
properties
消息属性(可修改)
declare
retry_policy
重试选项的mapping,可以是kombu.Connection.ensure()的任何参数并且可以被修改。
after_task_publish
当任务已发送到broker时发出。请注意,这是在发送任务的进程中执行的。
Sender和正在发送的任务名同名。
提供的参数:
headers
body
exchange
使用的exchange或Exchange对象的名称
routing_key
使用的routing_key
task_prerun
任务执行前发出。
Sender是将要被执行的任务对象
提供的参数:
task_id
要执行的任务ID
task
要执行的任务
args
任务的位置参数
kwargs
任务的关键字参数
task_postrun
任务执行之后发送
Sender是执行的任务对象
提供的参数:
task_id
执行的任务ID
task
执行的任务
args
位置参数
kwargs
关键字参数
retval
任务返回值
state
结果状态的名称
task_retry
当任务被重试的时候发送
Sender是任务对象
提供的参数:
request
当前任务请求
reason
重试原因(通常是异常实例,但总是可以强制为 str)
einfo
异常的细节,包括traceback(一个billiard.einfo.ExceptionInfo对象
task_success
任务成功时发送。
Sender是被执行的任务实例
提供的参数:
result
任务的返回值
task_failure
任务失败时发送。
Sender是被执行的任务实例
提供的参数:
task_id
任务ID
exception
Exception实例
args
任务调用的位置参数
kwargs
任务调用的关键字参数
traceback
Stack trace 对象
einfo
billiard.einfo.ExceptionInfo实例
task_internal_error
执行任务的过程中Celery内部出现错误时发送
Sender是被执行的任务实例
提供的参数:
task_id
任务ID
args
任务调用的位置参数
kwargs
任务调用的关键字参数
request
原始请求字典。This is provided as the
task.request
may not be ready by the time
the exception is raised.exception
Exception实例
traceback
Stack trace 对象
einfo
billiard.einfo.ExceptionInfo实例
task_received
从broker接收到一个任务并准备好执行时发送
Sender是consumer对象
提供的参数:
request
这是一个Request实例,不是
task.request
,使用prefork pool时,这个信号在父进程中发送,因此task.request
不可用也不应该使用。请改用此对象,因为它们共享许多相同的字段。
task_revoked
任务被worker撤销/终止时发送。
Sender是被终止/撤销的任务对象
提供的参数:
request
这是一个Request实例,不是
task.request
,使用prefork pool时,这个信号在父进程中发送,因此task.request
不可用也不应该使用。请改用此对象,因为它们共享许多相同的字段。terminated
如果任务被终止,则设置为True
signum
用于终止任务的Signal number。如果是None且terminated是True则应假定为TERM
expired
如果任务过期则为True
task_unknown
当worker收到未注册的任务消息时发送。
Sender is the worker Consumer
提供的参数:
name
未注册的任务名
id
消息中发现的任务ID
message
原始消息对象
exc
发生的错误。
task_rejected
当worker在其任一任务队列中收到未知类型的消息时发送
Sender is the worker Consumer
提供的参数:
message
原始消息对象
exc
发生的错误(如果有)。
App Signals:应用信号
import_modules
当一个程序(worker, beat, shell等)要求导入include和imports设置中要导入的模块时发送
Sender是app实例
Worker Signals:Worker信号
celeryd_after_setup
此信号在设置worker实例后、调用前发送。这意味着启用了celery worker -Q选项中的所有队列,并设置了日志记录等。
可以用于添加一个自定义的固定消费队列,忽略celery worker -Q选项。这个示例给每一个worker设置了一个队列,然后,可以使用这些队列将任务路由到任何指定的worker:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
提供的参数:
sender
worker所在节点名称
instance
这是要初始化的celery.apps.worker.Worker实例。注意此时只有app和hostname(nodename)属性被设置,
__init__
的其余部分尚未执行。conf
当前app的配置
celeryd_init
这是celery worker启动后发送的第一个信号。The sender
is the host name of the worker,因此,此信号可用于设置worker的特定配置:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
或者可以在连接时不指定sender来发送给多个worker
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('worker1@example.com', 'worker2@example.com'):
conf.task_default_rate_limit = '10/m'
if sender == 'worker3@example.com':
conf.worker_prefetch_multiplier = 0
提供的参数:
sender
worker所在节点名称
instance
这是要初始化的celery.apps.worker.Worker实例。注意此时只有app和hostname(nodename)属性被设置,
__init__
的其余部分尚未执行。conf
当前app的配置
options
通过命令行参数传递给worker的选项(包含默认选项)
worker_init
worker启动之前发送
worker_ready
worker准备好接受工作时发送
heartbeat_sent
当Celery发送了一个worker heartbeat时发送
Sender是celery.worker.heartbeat.Heart实例
worker_shutting_down
当worker开始关闭进程时发送
提供的参数:
sig
收到的POSIX信号
how
关闭方法,温和还是冷酷
exitcode
使用的退出状态码
worker_process_init
Dispatched in all pool child processes when they start.
请注意,该信号的处理程序阻塞时间不得超过4S,否则将视为启动失败从而杀死进程
worker_process_shutdown
Dispatched in all pool child processes just before they exit.
Note:不保证该信号一定会发送
提供的参数:
pid
即将关闭的子进程的pid
exitcode
子进程要使用的退出状态码
worker_shutdown
在工作进程即将关闭时发送
Beat Signals:Beat信号
beat_init
当celery beat启动时发送(包括独立和集成的
Sender是celery.beat.Service实例
beat_embedded_init
Dispatched in addition to the beat_init signal when celery beat is started as an embedded process.
Sender是celery.beat.Service实例
Eventlet Signals:Eventlet信号
eventlet_pool_started
当eventlet pool启动时发送
Sender是celery.concurrency.eventlet.TaskPool实例
eventlet_pool_preshutdown
Sent when the worker shutdown, just before the eventlet pool is requested to wait for remaining workers.
Sender是celery.concurrency.eventlet.TaskPool实例
eventlet_pool_postshutdown
Sent when the pool has been joined and the worker is ready to shutdown.
Sender是celery.concurrency.eventlet.TaskPool实例
eventlet_pool_apply
Sent whenever a task is applied to the pool.
Sender是celery.concurrency.eventlet.TaskPool实例
提供的参数:
target
目标函数
args
位置参数
kwargs
关键字参数
Logging Signals:日志信号
setup_logging
如果连接了此信号,Celery 将不会配置loggers,因此您可以使用它来完全覆盖您自己的log配置。
如果你想通过 Celery 增加日志配置设置,那么你可以使用 after_setup_logger 和 after_setup_task_logger 信号。
提供的参数:
loglevel
日志等级
logfile
日志文件名
format
日志格式
colorize
日志消息是否使用color
after_setup_logger
在设置每个全局logger(非任务logger)之后发送.用于增加日志配置
提供的参数:
logger
logger对象
loglevel
日志等级
logfile
日志文件名
format
日志格式
colorize
日志消息是否使用color
after_setup_task_logger
在设置每个任务logger之后发送.用于增加日志配置
提供的参数:
logger
logger对象
loglevel
日志等级
logfile
日志文件名
format
日志格式
colorize
日志消息是否使用color
Command Signals:命令信号
user_preload_options
This signal is sent after any of the Celery command line programs are finished parsing the user preload options.
It can be used to add additional command-line arguments to the celery umbrella command:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
Sender is the Command instance, and the value depends on the program that was called (e.g., for the umbrella command it'll be a CeleryCommand) object).
提供的参数:
app
app实例
options
Mapping of the parsed user preload options (with default values).
Deprecated Signals:弃用的信号
task_sent
该信号已弃用,请改用after_task_publish