Extensions and Bootsteps

Custom Message Consumers:自定义消息消费者

你可能想要嵌入自定义的Kombu消费者来手动处理你的消息。

为了应对这个特殊的需求,有一个特殊的ConsumerStep bootstep类,你只需要定义get_consumers方法,that must return a list of kombu.Consumer`` objects to start whenever the connection is established:

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')

Note:

Kombu Consumers可以使用两种不同的消息回调调度机制。第一种是callback参数接收一个(body, message)带签名的callbacks列表,第二种是on_message参数,它接受一个message,带有签名的callback。后者不会自动解码和反序列化。

Kombu Consumers can take use of two different message callback dispatching mechanisms. The first one is the callbacks argument that accepts a list of callbacks with a (body, message) signature, the second one is the on_message argument that takes a single callback with a (message,) signature. The latter won't automatically decode and deserialize the payload.

def get_consumers(self, channel):
    return [Consumer(channel, queues=[my_queue],
                     on_message=self.on_message)]


def on_message(self, message):
    payload = message.decode()
    print(
        'Received message: {0!r} {props!r} rawlen={s}'.format(
        payload, props=message.properties, s=len(message.body),
    ))
    message.ack()

BluePrints:模型/蓝图

Bootsteps是一种向worker添加功能的技术。一个bootstep是一个定义了可以在worker的不同阶段执行自定义操作的hooks的自定义类。每个bootstep都属于一个模型,目前worker定义了两种模型:Worker和Consumer

Bootsteps is a technique to add functionality to the workers. A bootstep is a custom class that defines hooks to do custom actions at different stages in the worker. Every bootstep belongs to a blueprint, and the worker currently defines two blueprints: Worker, and Consumer


图A:Worker和Consumer模型中的Bootsteps。从下到上启动,worker模型中的第一步是Timer(计时器),最后一步是启动Consumer模型,然后和broker建立连接并开始消费消息。

Figure A: Bootsteps in the Worker and Consumer blueprints. Starting

from the bottom up the first step in the worker blueprint is the Timer, and the last step is to start the Consumer blueprint, that then establishes the broker connection and starts consuming messages.

worker_graph_full


Worker

Worker是第一个启动的模型,还同时启动了主要组件,如事件循环、处理池和用于ETA任务和其他定时事件的计时器。

Worker完全启动之后是Consumer模型,它设置了任务如何执行,连接broker并启动消息消费者。

Atrributes:属性

app

当前app实例

hostname

workers节点的名称(比如:worker1@example.com)

blueprint

这是worker Blueprint

hub

事件循环对象(Hubopen in new window)。你可以使用这个在事件循环中注册回调。

这仅由启用异步 I/O 的传输(amqp、redis)支持,在这种情况下应设置 worker.use_eventloop 属性。

Your worker bootstep must require the Hub bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}

pool

The current process/eventlet/gevent/thread pool. See celery.concurrency.base.BasePoolopen in new window.

Your worker bootstep must require the Pool bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

timer

Timer open in new windowused to schedule functions.

Your worker bootstep must require the Timer bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

statedb

Database <celery.worker.state.Persistent>` to persist state between worker restarts.

This is only defined if the statedb argument is enabled.

Your worker bootstep must require the Statedb bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Statedb'}

autoscaler

Autoscaler用于自动增加或减少pool中的进程数量。

只有在启用autoscale参数时才定义此参数。

Your worker bootstep must require the Autoscaler bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler',)

autoloader

Autoreloader自动重载程序用于在文件系统更改时自动重新加载使用代码。

只有在启用autoreload参数时才定义此参数。

Your worker bootstep must require the Autoreloader bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader',)

Example worker bootstep

例如,Worker BootStep可以是:

from celery import bootsteps

class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))

    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self

    def start(self, worker):
        print('Called when the worker is started.')

    def stop(self, worker):
        print('Called when the worker shuts down.')

    def terminate(self, worker):
        print('Called when the worker terminates')

Every method is passed the current WorkController instance as the first argument.

每个方法都将当前 WorkController 实例作为第一个参数传递。

另一个示例可以使用计时器定期唤醒:

from celery import bootsteps


class DeadlockDetection(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

    def __init__(self, worker, deadlock_timeout=3600):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None

    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            30.0, self.detect, (worker,), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None

    def detect(self, worker):
        # update active requests
        for req in worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()

Customizing Task Handling Logs:自定义任务处理日志

Celery worker 在任务的整个生命周期中为各种事件向 Python 日志子系统发出消息。这些消息可以通过在celery/app/trace.py中定义的LOG_<TYPE>进行重写。比如:

import celery.app.trace

celery.app.trace.LOG_SUCCESS = "This is a custom message"

The various format strings are all provided with the task name and ID for % formatting, and some of them receive extra fields like the return value or the exception which caused a task to fail.These fields can be used in custom format strings like so:

各种格式都提供了用于%格式的任务名和ID,其中一些会收到额外的字段,例如返回值或导致任务失败的异常。 这些字段可用于自定义格式字符串,如下所示:

import celery.app.trace

celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"

Consumer

Consumer模型建立一个到broker的连接,并且在连接丢失的时候重新重新启动(连接)。Consumer bootsteps包括worker heartbeat,远程控制命令消费者,更重要的,任务消费者。

当创建consumer bootsteps时你必须考虑到必须可以重新启动你的模型。为consumer bootsteps定义一个'shutdown'方法,当worker关闭时调用该方法。

Attribute:属性

app

当前app实例

controller

创建此consumer的父WorkControlleropen in new window对象。

The parent WorkControlleropen in new window object that created this consumer.

hostname

workers节点的名称(比如:worker1@example.com)

blueprint

这是worker Blueprint

hub

事件循环对象(Hubopen in new window)。你可以使用这个在事件循环中注册回调。

这仅由启用异步 I/O 的传输(amqp、redis)支持,在这种情况下应设置 worker.use_eventloop 属性。

Your worker bootstep must require the Hub bootstep to use this:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}

connection

当前broker连接(kombu.Connectionopen in new window

A consumer bootstep must require the 'Connection' bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.connection:Connection'}

event_dispatcher

A **app.events.Dispatcher** object that can be used to send events.

A consumer bootstep must require the *Events* bootstep to use this.
class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.events:Events'}

gossip

worker之间的广播通信(Gossipopen in new window).

A consumer bootstep must require the Gossip bootstep to use this.

class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = {'celery.worker.consumer.gossip:Gossip'}

    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None

    def on_cluster_size_change(self, worker):
        cluster_size = len(list(self.c.gossip.state.alive_workers()))
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit = 1.0 / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size

    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up soon
        # in order to see if the worker recovered.
        self.c.timer.call_after(10.0, self.on_cluster_size_change)

Callbacks

  • <set> gossip.on.node_join

    当新的node加入集群后调用,提供一个Workeropen in new window实例

  • <set> gossip.on.node_leave

    每当新节点离开集群(关闭)时调用,提供Workeropen in new window实例。

  • <set> gossip.on.node_lost

    每当群集中某个工作实例的heartbeat丢失(heartbeat未及时接收或处理)时调用,提供一个workeropen in new window实例。

    这并不一定意味着worker实际上处于脱机状态,因此,如果默认heartbeat超时时长不够,请使用超时机制。

pool

The current process/eventlet/gevent/thread pool. See celery.concurrency.base.BasePoolopen in new window.

timer

Timer <celery.utils.timer2.Schedule used to schedule functions.

heart

负责发送工作人员事件heartbeats(Heartopen in new window)

Your consumer bootstep must require the Heart bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.heart:Heart'}

task_consumer

kombu.Consumeropen in new window对象用来消费任务消息。

Your consumer bootstep must require the Tasks bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

strategies

Every registered task type has an entry in this mapping, where the value is used to execute an incoming message of this task type (the task execution strategy). This mapping is generated by the Tasks bootstep when the consumer starts:

for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )

Your consumer bootstep must require the Tasks bootstep to use this:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

task_buckets

defaultdictopen in new window用于按类型查找任务的速率限制。Entries in this dict may be None (for no limit) or a TokenBucketopen in new window instance implementing consume(tokens) and expected_time(tokens).

TokenBucket implements the token bucket algorithmopen in new window, but any algorithm may be used as long as it conforms to the same interface and defines the two methods above.

qos

QoS对象可用于更改任务通道当前的prefetch_count值:

## increment at next cycle
consumer.qos.increment_eventually(1)
## decrement at next cycle
consumer.qos.decrement_eventually(1)
consumer.qos.set(10)

Methods

consumer.reset_rate_limits()

Updates the task_buckets mapping for all registered task types.

consumer.bucket_for_task(type, Bucket=TokenBucket)

Creates rate limit bucket for a task using its task.rate_limit attribute.

consumer.add_task_queue(name, exchange=None, exchange_type=None, routing_key=None, options):

Adds new queue to consume from. This will persist on connection restart.

consumer.cancel_task_queue(name)

Stop consuming from queue by name. This will persist on connection restart.

apply_eta_task(request)

Schedule ETA task to execute based on the request.eta attribute. ([Request](https://docs.celeryq.dev/en/stable/reference/celery.worker.request.html#celery.worker.request.Request))

Installing Bootsteps

app.steps['worker'] and app.steps['consumer'] can be modified to add new bootsteps:

>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)

>>> app.steps['consumer'].update([StepA, StepB])

>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}

The order of steps isn’t important here as the order is decided by the resulting dependency graph (Step.requires).

To illustrate how you can install bootsteps and how they work, this is an example step that prints some useless debugging information. It can be added both as a worker and consumer bootstep:

from celery import Celery
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)

Starting the worker with this step installed will give us the following logs:

<Worker: w@example.com (initializing)> is in init
<Consumer: w@example.com (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <Worker: w@example.com (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <Consumer: w@example.com (running)> is starting
<Consumer: w@example.com (closing)> is stopping
<Worker: w@example.com (closing)> is stopping
<Consumer: w@example.com (terminating)> is shutting down

The print statements will be redirected to the logging subsystem after the worker has been initialized, so the "is starting" lines are time-stamped. You may notice that this does no longer happen at shutdown, this is because the stop and shutdown methods are called inside a signal handler, and it’s not safe to use logging inside such a handler. Logging with the Python logging module isn't reentrantopen in new window: meaning you cannot interrupt the function then call it again later. It's important that the stop and shutdown methods you write is also reentrantopen in new window.

Starting the worker with --loglevel=debugopen in new window will show us more information about the boot process:

[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.

命令行程序

增加新的命令行选项

Command-specific options

你可以给worker、beat、events命令增加命令行选项通过修改应用实例的 user_optionsopen in new window 属性

Celery命令使用 clickopen in new window 模块来解析命令行选项,因此,要增加自定义选项,你需要添加 click.Optionopen in new window 实例给相关设置。

比如给Celery worker命令增加一个自定义选项:

from celery import Celery
from click import Option

app = Celery(broker='amqp://')

app.user_options['worker'].add(Option(('--enable-my-option',),
                                      is_flag=True,
                                      help='Enable custom option.'))

所有的bootsteps的Bootstep.__init__函数都会以关键字参数的方式收到这个参数:

from celery import bootsteps

class MyBootstep(bootsteps.Step):

    def __init__(self, parent, enable_my_option=False, **options):
        super().__init__(parent, **options)
        if enable_my_option:
            party()

app.steps['worker'].add(MyBootstep)

Preload options:预加载选项

celery umbrella 命令支持“预加载选项(preload options)”的概念。这些是传递给所有子命令的特殊选项。

你可以添加新的预加载选项,比如指定一个配置模版:

from celery import Celery
from celery import signals
from click import Option

app = Celery()

app.user_options['preload'].add(Option(('-Z', '--template'),
                                       default='default',
                                       help='Configuration template to use.'))

@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])

Adding new celery sub-commands:添加新的子命令

通过使用 setuptools entry-points 可以增加新命令到celery umbrella 命令。

Entry-points是一种特殊的meta-data,可以将其添加到您的包的setup.py程序中,然后在安装后使用importlib模块从系统中读取。

Celery认可celery.commands entry-points安装附加子命令,其中entry-point的值必须指向有效的click
command。

Flower监控扩展通过在setup.py中添加entry-point来添加celery flower命令:

setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command:flower',
        ],
    }
)

命令定义分为两部分,用等号分隔。其中,第一部分是子命令(flower)的名称,第二部分是实现该命令的函数的完整路径:flower.command:flower

如上所述,模块路径和属性名称应该用冒号分隔。

flower/command.py模块中,命令函数可以定义如下:

import click

@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
    print('Running our command')

Worker API

Hubopen in new window - The workers async event loop

supported transports:amqp, redis

3.0新增

当使用amqp或redis作为broker时,worker使用异步I/O.

最终目标是让所有传输使用事件循环,但这需要一些时间,因此其他传输仍然使用基于线程的解决方案。

hub.add(fd, callback, flags)

hub.add_reader(fd, callback, *args)

Add callback to be called when fd is readable.

The callback will stay registered until explicitly removed using hub.remove(fd)open in new window, or the file descriptor is automatically discarded because it’s no longer valid.

Note that only one callback can be registered for any given file descriptor at a time, so calling add a second time will remove any callback that was previously registered for that file descriptor.

A file descriptor is any file-like object that supports the fileno method, or it can be the file descriptor number (int).

hub.add_writer(fd, callback, *args)

Add callback to be called when fd is writable. See also notes for hub.add_reader()open in new window above.

hub.remove(fd)

Remove all callbacks for file descriptor fd from the loop.

Timer - Scheduling events

​timer.call_after(secs, callback, args=(), kwargs=(), priority=0)

timer.call_repeatedly(secs, callback, args=(), kwargs=(), priority=0)

timer.call_at(eta, callback, args=(), kwargs=(), priority=0)