Extensions and Bootsteps
Custom Message Consumers:自定义消息消费者
你可能想要嵌入自定义的Kombu消费者来手动处理你的消息。
为了应对这个特殊的需求,有一个特殊的ConsumerStep bootstep类,你只需要定义get_consumers方法,that must return a list of kombu.Consumer`` objects to start whenever the connection is established:
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://')
class MyConsumerStep(bootsteps.ConsumerStep):
    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]
    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)
def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )
if __name__ == '__main__':
    send_me_a_message('world!')
Note:
Kombu Consumers可以使用两种不同的消息回调调度机制。第一种是
callback参数接收一个(body, message)带签名的callbacks列表,第二种是on_message参数,它接受一个message,带有签名的callback。后者不会自动解码和反序列化。Kombu Consumers can take use of two different message callback dispatching mechanisms. The first one is the
callbacksargument that accepts a list of callbacks with a(body, message)signature, the second one is theon_messageargument that takes a single callback with a(message,)signature. The latter won't automatically decode and deserialize the payload.def get_consumers(self, channel): return [Consumer(channel, queues=[my_queue], on_message=self.on_message)] def on_message(self, message): payload = message.decode() print( 'Received message: {0!r} {props!r} rawlen={s}'.format( payload, props=message.properties, s=len(message.body), )) message.ack()
BluePrints:模型/蓝图
Bootsteps是一种向worker添加功能的技术。一个bootstep是一个定义了可以在worker的不同阶段执行自定义操作的hooks的自定义类。每个bootstep都属于一个模型,目前worker定义了两种模型:Worker和Consumer
Bootsteps is a technique to add functionality to the workers. A bootstep is a custom class that defines hooks to do custom actions at different stages in the worker. Every bootstep belongs to a blueprint, and the worker currently defines two blueprints: Worker, and Consumer
图A:Worker和Consumer模型中的Bootsteps。从下到上启动,worker模型中的第一步是Timer(计时器),最后一步是启动Consumer模型,然后和broker建立连接并开始消费消息。
Figure A: Bootsteps in the Worker and Consumer blueprints. Starting
from the bottom up the first step in the worker blueprint is the Timer, and the last step is to start the Consumer blueprint, that then establishes the broker connection and starts consuming messages.
 
Worker
Worker是第一个启动的模型,还同时启动了主要组件,如事件循环、处理池和用于ETA任务和其他定时事件的计时器。
Worker完全启动之后是Consumer模型,它设置了任务如何执行,连接broker并启动消息消费者。
Atrributes:属性
app
当前app实例
hostname
workers节点的名称(比如:worker1@example.com)
blueprint
这是worker Blueprint
hub
事件循环对象(Hub)。你可以使用这个在事件循环中注册回调。
这仅由启用异步 I/O 的传输(amqp、redis)支持,在这种情况下应设置 worker.use_eventloop 属性。
Your worker bootstep must require the Hub bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}
pool
The current process/eventlet/gevent/thread pool. See celery.concurrency.base.BasePool.
Your worker bootstep must require the Pool bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}
timer
Timer used to schedule functions.
Your worker bootstep must require the Timer bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}
statedb
Database <celery.worker.state.Persistent>` to persist state between worker restarts.
This is only defined if the statedb argument is enabled.
Your worker bootstep must require the Statedb bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Statedb'}
autoscaler
Autoscaler用于自动增加或减少pool中的进程数量。
只有在启用autoscale参数时才定义此参数。
Your worker bootstep must require the Autoscaler bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler',)
autoloader
Autoreloader自动重载程序用于在文件系统更改时自动重新加载使用代码。
只有在启用autoreload参数时才定义此参数。
Your worker bootstep must require the Autoreloader bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader',)
Example worker bootstep
例如,Worker BootStep可以是:
from celery import bootsteps
class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}
    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))
    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self
    def start(self, worker):
        print('Called when the worker is started.')
    def stop(self, worker):
        print('Called when the worker shuts down.')
    def terminate(self, worker):
        print('Called when the worker terminates')
Every method is passed the current WorkController instance as the first argument.
每个方法都将当前 WorkController 实例作为第一个参数传递。
另一个示例可以使用计时器定期唤醒:
from celery import bootsteps
class DeadlockDetection(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}
    def __init__(self, worker, deadlock_timeout=3600):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None
    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            30.0, self.detect, (worker,), priority=10,
        )
    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None
    def detect(self, worker):
        # update active requests
        for req in worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()
Customizing Task Handling Logs:自定义任务处理日志
Celery worker 在任务的整个生命周期中为各种事件向 Python 日志子系统发出消息。这些消息可以通过在celery/app/trace.py中定义的LOG_<TYPE>进行重写。比如:
import celery.app.trace
celery.app.trace.LOG_SUCCESS = "This is a custom message"
The various format strings are all provided with the task name and ID for % formatting, and some of them receive extra fields like the return value or the exception which caused a task to fail.These fields can be used in custom format strings like so:
各种格式都提供了用于%格式的任务名和ID,其中一些会收到额外的字段,例如返回值或导致任务失败的异常。 这些字段可用于自定义格式字符串,如下所示:
import celery.app.trace
celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"
Consumer
Consumer模型建立一个到broker的连接,并且在连接丢失的时候重新重新启动(连接)。Consumer bootsteps包括worker heartbeat,远程控制命令消费者,更重要的,任务消费者。
当创建consumer bootsteps时你必须考虑到必须可以重新启动你的模型。为consumer bootsteps定义一个'shutdown'方法,当worker关闭时调用该方法。
Attribute:属性
app
当前app实例
controller
创建此consumer的父WorkController对象。
The parent WorkController object that created this consumer.
hostname
workers节点的名称(比如:worker1@example.com)
blueprint
这是worker Blueprint
hub
事件循环对象(Hub)。你可以使用这个在事件循环中注册回调。
这仅由启用异步 I/O 的传输(amqp、redis)支持,在这种情况下应设置 worker.use_eventloop 属性。
Your worker bootstep must require the Hub bootstep to use this:
class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}
connection
当前broker连接(kombu.Connection
A consumer bootstep must require the 'Connection' bootstep to use this:
class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.connection:Connection'}
event_dispatcher
A **app.events.Dispatcher** object that can be used to send events.
A consumer bootstep must require the *Events* bootstep to use this.
class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.events:Events'}
gossip
worker之间的广播通信(Gossip).
A consumer bootstep must require the Gossip bootstep to use this.
class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = {'celery.worker.consumer.gossip:Gossip'}
    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None
    def on_cluster_size_change(self, worker):
        cluster_size = len(list(self.c.gossip.state.alive_workers()))
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit = 1.0 / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size
    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up soon
        # in order to see if the worker recovered.
        self.c.timer.call_after(10.0, self.on_cluster_size_change)
Callbacks
- <set> gossip.on.node_join- 当新的node加入集群后调用,提供一个Worker实例 
- <set> gossip.on.node_leave- 每当新节点离开集群(关闭)时调用,提供Worker实例。 
- <set> gossip.on.node_lost- 每当群集中某个工作实例的heartbeat丢失(heartbeat未及时接收或处理)时调用,提供一个worker实例。 - 这并不一定意味着worker实际上处于脱机状态,因此,如果默认heartbeat超时时长不够,请使用超时机制。 
pool
The current process/eventlet/gevent/thread pool. See celery.concurrency.base.BasePool.
timer
Timer <celery.utils.timer2.Schedule used to schedule functions.
heart
负责发送工作人员事件heartbeats(Heart)
Your consumer bootstep must require the Heart bootstep to use this:
class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.heart:Heart'}
task_consumer
kombu.Consumer对象用来消费任务消息。
Your consumer bootstep must require the Tasks bootstep to use this:
class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}
strategies
Every registered task type has an entry in this mapping, where the value is used to execute an incoming message of this task type (the task execution strategy). This mapping is generated by the Tasks bootstep when the consumer starts:
for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )
Your consumer bootstep must require the Tasks bootstep to use this:
class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}
task_buckets
defaultdict用于按类型查找任务的速率限制。Entries in this dict may be None (for no limit) or a TokenBucket instance implementing consume(tokens) and expected_time(tokens).
TokenBucket implements the token bucket algorithm, but any algorithm may be used as long as it conforms to the same interface and defines the two methods above.
qos
QoS对象可用于更改任务通道当前的prefetch_count值:
## increment at next cycle
consumer.qos.increment_eventually(1)
## decrement at next cycle
consumer.qos.decrement_eventually(1)
consumer.qos.set(10)
Methods
consumer.reset_rate_limits()
Updates the task_buckets mapping for all registered task types.
consumer.bucket_for_task(type, Bucket=TokenBucket)
Creates rate limit bucket for a task using its task.rate_limit attribute.
consumer.add_task_queue(name, exchange=None, exchange_type=None, routing_key=None, options):
Adds new queue to consume from. This will persist on connection restart.
consumer.cancel_task_queue(name)
Stop consuming from queue by name. This will persist on connection restart.
apply_eta_task(request)
Schedule ETA task to execute based on the request.eta attribute. ([Request](https://docs.celeryq.dev/en/stable/reference/celery.worker.request.html#celery.worker.request.Request))
Installing Bootsteps
app.steps['worker'] and app.steps['consumer'] can be modified to add new bootsteps:
>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)
>>> app.steps['consumer'].update([StepA, StepB])
>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
The order of steps isn’t important here as the order is decided by the resulting dependency graph (Step.requires).
To illustrate how you can install bootsteps and how they work, this is an example step that prints some useless debugging information. It can be added both as a worker and consumer bootstep:
from celery import Celery
from celery import bootsteps
class InfoStep(bootsteps.Step):
    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))
    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))
    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))
    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))
    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)
Starting the worker with this step installed will give us the following logs:
<Worker: w@example.com (initializing)> is in init
<Consumer: w@example.com (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <Worker: w@example.com (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <Consumer: w@example.com (running)> is starting
<Consumer: w@example.com (closing)> is stopping
<Worker: w@example.com (closing)> is stopping
<Consumer: w@example.com (terminating)> is shutting down
The print statements will be redirected to the logging subsystem after the worker has been initialized, so the "is starting" lines are time-stamped. You may notice that this does no longer happen at shutdown, this is because the stop and shutdown methods are called inside a signal handler, and it’s not safe to use logging inside such a handler. Logging with the Python logging module isn't reentrant: meaning you cannot interrupt the function then call it again later. It's important that the stop and shutdown methods you write is also reentrant.
Starting the worker with --loglevel=debug will show us more information about the boot process:
[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
命令行程序
增加新的命令行选项
Command-specific options
你可以给worker、beat、events命令增加命令行选项通过修改应用实例的 user_options 属性
Celery命令使用 click 模块来解析命令行选项,因此,要增加自定义选项,你需要添加 click.Option 实例给相关设置。
比如给Celery worker命令增加一个自定义选项:
from celery import Celery
from click import Option
app = Celery(broker='amqp://')
app.user_options['worker'].add(Option(('--enable-my-option',),
                                      is_flag=True,
                                      help='Enable custom option.'))
所有的bootsteps的Bootstep.__init__函数都会以关键字参数的方式收到这个参数:
from celery import bootsteps
class MyBootstep(bootsteps.Step):
    def __init__(self, parent, enable_my_option=False, **options):
        super().__init__(parent, **options)
        if enable_my_option:
            party()
app.steps['worker'].add(MyBootstep)
Preload options:预加载选项
celery umbrella 命令支持“预加载选项(preload options)”的概念。这些是传递给所有子命令的特殊选项。
你可以添加新的预加载选项,比如指定一个配置模版:
from celery import Celery
from celery import signals
from click import Option
app = Celery()
app.user_options['preload'].add(Option(('-Z', '--template'),
                                       default='default',
                                       help='Configuration template to use.'))
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])
Adding new celery sub-commands:添加新的子命令
通过使用 setuptools entry-points 可以增加新命令到celery umbrella 命令。
Entry-points是一种特殊的meta-data,可以将其添加到您的包的setup.py程序中,然后在安装后使用importlib模块从系统中读取。
Celery认可celery.commands entry-points安装附加子命令,其中entry-point的值必须指向有效的click
 command。
Flower监控扩展通过在setup.py中添加entry-point来添加celery flower命令:
setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command:flower',
        ],
    }
)
命令定义分为两部分,用等号分隔。其中,第一部分是子命令(flower)的名称,第二部分是实现该命令的函数的完整路径:flower.command:flower
如上所述,模块路径和属性名称应该用冒号分隔。
在flower/command.py模块中,命令函数可以定义如下:
import click
@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
    print('Running our command')
Worker API
Hub - The workers async event loop
supported transports:amqp, redis
3.0新增
当使用amqp或redis作为broker时,worker使用异步I/O.
最终目标是让所有传输使用事件循环,但这需要一些时间,因此其他传输仍然使用基于线程的解决方案。
hub.add(fd, callback, flags)
hub.add_reader(fd, callback, *args)
Add callback to be called when fd is readable.
The callback will stay registered until explicitly removed using hub.remove(fd), or the file descriptor is automatically discarded because it’s no longer valid.
Note that only one callback can be registered for any given file descriptor at a time, so calling add a second time will remove any callback that was previously registered for that file descriptor.
A file descriptor is any file-like object that supports the fileno method, or it can be the file descriptor number (int).
hub.add_writer(fd, callback, *args)
Add callback to be called when fd is writable. See also notes for hub.add_reader() above.
hub.remove(fd)
Remove all callbacks for file descriptor fd from the loop.
Timer - Scheduling events
timer.call_after(secs, callback, args=(), kwargs=(), priority=0)
timer.call_repeatedly(secs, callback, args=(), kwargs=(), priority=0)
timer.call_at(eta, callback, args=(), kwargs=(), priority=0)