任务路由

Note:
Alternate routing concepts like topic and fanout is not available for all transports, please consult the transport comparison tableopen in new window.

Basics

Automatic routing:自动路由

做路由最简单的方法是使用task_create_missing_queues设置(默认开启)

启用此设置后,将自动创建task_queues中尚未定义的命名队列。这使得执行简单的路由任务变得很容易。

假设您有两台服务器 x , y 用来处理常规任务,另一个服务器 z 用来处理 feed 相关任务。你可以这样配置:

task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

在启用了该路由的情况下,导入feed任务将被路由到“feeds”队列,其他任务将会被路由到默认队列(由于历史原因,默认队列名为"celery")

或者,可以使用模式匹配甚至正则表达式来匹配 feed.tasks名称空间中的所有任务:

app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

如果匹配模式的顺序很重要,则应改为以items格式指定路由:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

Note:

task_routes设置可以是字典,也可以是路由器对象列表。因此,在这种情况下,我们需要将设置指定为包含列表的元组。

安装路由之后,你可以启动服务器 z 来只处理feeds队列:

user@z:/$ celery -A proj worker -Q feeds

您可以根据需要指定任意多个队列,因此可以使此服务器也处理默认队列:

user@z:/$ celery -A proj worker -Q feeds,celery

更改默认队列的名称

你可以通过以下配置来修改默认队列的名称:

app.conf.task_default_queue = 'default'

队列是如何被定义的

此功能的目的是为只有基本需求的用户隐藏复杂的AMQP协议。但是,您可能仍然对这些队列是如何声明的感兴趣。

使用下面的设置将会创建一个名为"*video" *的队列

{'exchange': 'video',
 'exchange_type': 'direct',
 'routing_key': 'video'}

类似 *Redis *或 *SQS *这种非AMQP的backends不支持交换机,因此需要给他们的交换机和队列指定为同一个名称。使用这种设计确保他们也可以正常工作。

The non-AMQP backends like Redis or SQS don't support exchanges, so they require the exchange to have the same name as the queue. Using this design ensures it will work for them as well.

Manual routing:手动路由

假设您有两台服务器 x , y 用来处理常规任务,另一个服务器 z 用来处理 feed 相关任务。你可以这样配置:

from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'

task_queues是一个Queueopen in new window实例列表。如果没有设置exchange或exchange类型的值,则会取task_default_exchange和task_default_exchange_type中的设置。

要将任务路由到feed_tasks队列,可以增加一条到task_routes设置:

task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

也可以在**Task.apply_async()或者send_task()**中使用routing_key来覆盖该设置。

from feeds.tasks import import_feed

import_feed.apply_async(args=['http://cnn.com/rss'],

                        queue='feed_tasks',

                        routing_key='feed.import')

想要服务器 *z *只从feed队列消费消息,你可以使用celery worker -Qopen in new window选项:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

服务器xy必须配置为从默认队列消费:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

如果需要,你还可以让处理feed的worker处理常规任务:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

如果您有另一个队列但要在另一个交换机上添加,只需指定自定义交换机和交换机类型:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

如果您对这些术语感到困惑,您应该阅读 AMQP

See also:

除了下面的Redis消息优先级之外,还有Rabbits and Warrensopen in new window(已不可访问),这是一篇描述队列和交换的优秀博客文章。还有 The CloudAMQP tutorial,对于 RabbitMQ 的用户,RabbitMQ FAQopen in new window可能是有用的信息来源。

定义路由选项

RabbitMQ消息优先级

supported transports:RabbitMQ

4.0新增特性

可以通过设置x-max-priority参数来配置队列以支持优先级:

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10}),
]

可以使用task_queue_max_priority设置所有队列的默认值:

app.conf.task_queue_max_priority = 10

可以使用task_default_priority设置所有任务的默认优先级:

app.conf.task_default_priority = 5

Redis消息优先级

supported transports:Redis

虽然 Celery Redis 传输确实尊重优先级字段,但 Redis 本身没有优先级的概念。 请在尝试使用 Redis 实现优先级之前阅读此说明,因为您可能会遇到一些意外行为。

要开始基于优先级的任务调度,你需要配置queue_order_strategy选项:

app.conf.broker_transport_options = {
    'queue_order_strategy': 'priority',
}

通过为每个队列创建 n 个列表来实现优先级支持。这意味着即使有 10 (0-9) 个优先级,默认情况下也会合并为 4 个级别以节省资源。这意味着一个名为 celery 的队列实际上将被拆分为 4 个队列。

最高优先级的队列将被命名为 celery,其他队列将有一个分隔符(默认为 x06x16)并将它们的优先级编号附加到队列名称中。

['celery', 'celery\x06\x163', 'celery\x06\x166', 'celery\x06\x169']

如果您想要更多优先级或不同的分隔符,您可以设置 priority_steps 和 sep 传输选项:

app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'sep': ':',
    'queue_order_strategy': 'priority',
}

上面的配置将会生成下面的队列名:

['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']

That said, note that this will never be as good as priorities implemented at the server level, and may be approximate at best. But it may still be good enough for your application.

没那么好,但也够了。

AMQP Primer:AMQP入门

Messages:消息

消息由headers和body组成。Celery 使用 headers 来存储消息的content type及其编码。Content-type 通常是序列化格式。正文包含要执行的任务的名称、任务 ID (UUID)、应用它的参数和一些额外的元数据——比如重试次数或 ETA。

这是一个表示为 Python 字典的示例任务消息:

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}

生产者、消费者、brokers

发送消息的客户端通常叫做发布者或者生产者,接收消息的通常叫做消费者。

broker是消息服务,将消息从生产者路由到消费者

您可能会看到这些术语在 AMQP 相关材料中被大量使用。

Exchanges, queues, and routing keys

  1. 消息被发送到 exchanges.
  2. exchanges将消息路由到一个或多个队列。存在几种交换类型,提供不同的路由方式,或实现不同的消息传递场景。
  3. 消息在队列中等待,直到被消费
  4. 消息被确认后从队列中删除

发送和接收消息所需的步骤是:

  1. 创建一个exchange
  2. 创建一个队列
  3. 将exchange和队列绑定

Celery会自动创建task_queues中worker所需队列,除非auto_declare为False

这是一个具有三个队列的示例配置;一个用于视频,一个用于图像,一个用于其他所有内容的默认队列:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

Exchange types:交换机类型

Exchange type定义了消息如何通过exchange进行路由。有四种标准类型direct、topic、fanout、headers。此外还有一些作为RabbitMQ的非标准类型,比如Michael Bridgen的last-value-cache plug-inopen in new window

Direct exchanges

直接交换通过确切的routing key匹配,因此由"video"绑定的队列仅接收具有该routing key的消息。

Topic exchanges

Topic exchanges使用点分隔的单词和通配符匹配routing key:(*匹配单个单词)和#(匹配零个或多个单词)。

类似usa.news,usa.weather,norway.news以及norway.weather这样的routing keys,可以用*.news(所有news),usa.#(所有USA中的项目),或者usa.weather(所有USA天气项目)来绑定。

exchange.declare(exchange_name, type, passive, durable, auto_delete, internal)

通过名称声明一个exchange

查看amqp:Channel.exchange_declare.open in new window

关键字参数

  • passive - passive意味着不会创建exchange,但您可以使用它来检查exchange是否已经存在。
  • **durable - **持久的exchange(即,它们在broker重启后仍然存在)。
  • auto_delete – 这意味着当没有队列使用它时,broker将删除该exchange。

queue.declare(queue_name, passive, durable, exclusive, auto_delete)

通过名称声明一个队列

查看amqp:Channel.queue_declareopen in new window

Exclusive(独占)队列只能由当前连接使用。Exclusive 也意味着auto_delete

queue.bind(queue_name, exchange_name, routing_key)

使用一个routing key将一个队列绑定到exchange

未绑定的队列不会收到消息,因此这是必要的。

See amqp:Channel.queue_bindopen in new window

queue.delete(name, if_unused=False, if_empty=False)

删除一个队列及其绑定

See amqp:Channel.queue_deleteopen in new window

exchange.delete(name, if_unused=False)

删除一个exchange

See amqp:Channel.exchange_deleteopen in new window

Note:

声明并不一定意味着“创建”。当您声明时,您断言了该实体存在并且它是可操作的。没有关于谁应该首先创建exchange/queue/binding的规则,无论是消费者还是生产者。通常第一个需要它的将创建它。

Hands-on with the API:API实践

Celery附带了一个名为celery amqp的用于访问AMQP API的命令行工具,用于类似创建/删除队列或者exchanges,青储队列或者发送消息等管理员任务。对于非AMQP的brokers也是支持的,但不同的实现可能不会实现所有命令。

您可以直接在celery amqp的参数中编写命令,或者只是不带参数开始以在 shell 模式下启动它:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

这里的1>是命令提示符。其中数字1代表目前已经执行的命令数量。输入help获取可用的命令列表。支持自动补全,因此可以开始输入命令,然后按Tab键以显示可能匹配的列表。

Let’s create a queue you can send messages to:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

这创建了一个名为testexchange的exchange以及一个名为testqueue的队列。并且使用routing key testkey将队列绑定到exchange。

从现在开始,所有发送到textexchange并且带着testkey的消息将会移动到这个队列中。你可以使用basic.publish命令发送消息:

4> basic.publish 'This is a message!' testexchange testkey
ok.

现在消息已发送,您可以再次检索它。此处您可以使用basic.get命令,以同步方式轮询队列中的新消息(这对于维护任务是可以的,但对于服务你可能想要使用basic.consume替代)

从队列中弹出一条消息:

5> basic.get testqueue
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP 使用确认来表示消息已被成功接收和处理。如果消息没有被确认并且消费者通道被关闭,消息将被传递给另一个消费者。

请注意上述结构中的delivery_tag,一个通道内的所有消息有一个独一无二的delivery tag,这个tag用于确认消息。另外,连接之间的delivery tag不是唯一的,因此在另一个客户端的delivery tag 1 可能指向与此通道中不同的消息。

你可以使用basic.ack来确认消息:

6> basic.ack 1
ok.

完成测试后记得清理:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.

任务路由

定义队列

在 Celery 中,可用队列由task_queues设置定义

这是一个具有三个队列的示例配置;一个用于视频,一个用于图像,一个用于其他所有内容的默认队列:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

这里的task_default_queue用于路由那些没有显示路由的任务。

默认的exchange, exchange type, 以及 routing key将被作为任务的默认路由值,并且作为task_queues中的默认值。

支持多重绑定,这里是一个将多个routing key绑定到一个队列中的示例:

from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media.video'),
        binding(media_exchange, routing_key='media.image'),
    ]),
)

指定任务目标

任务的目的地由以下(按顺序)决定:

  1. Task.apply_async()中的路由参数。
  2. Taskopen in new window中定义的路由相关选项。
  3. task_routes中定义的Routers

最好不要对这些设置进行硬编码,而是使用Routersopen in new window将其保留为配置选项;这是最灵活的方法,但仍然可以将合理的默认值设置为任务属性。

Routers

路由器是决定任务的路由选项的方法。

定义一个新路由器所需要做的就是定义一个带有签名的函数(name, args, kwargs, options, task=None, **kw):

def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

If you return the queue key, it’ll expand with the defined settings of that queue in task_queues:

{'queue': 'video', 'routing_key': 'video.compress'}

变成

{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

将router类添加到task_routes来安装它们

task_routes = (route_task,)

Router方法也可以通过名称来添加:

task_routes = ('myapp.routers.route_task',)

对于简单的任务名称 -> 路由映射,如上面的路由器示例,您可以简单地将一个 dict 放入task_routes中以获得相同的行为:

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

然后将按顺序遍历routers,它将在第一个返回true的router处停止,并将其用作任务的最终路由。

您还可以按顺序定义多个routers:

task_routes = [
    route_task,
    {
        'myapp.tasks.compress_video': {
            'queue': 'video',
            'routing_key': 'video.compress',
    },
]

The routers will then be visited in turn, and the first to return a value will be chosen.

If you're using Redis or RabbitMQ you can also specify the queue's default priority in the route.

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
        'priority': 10,
    },
}

Similarly, calling apply_async on a task will override that default priority.

task.apply_async(priority=0)

优先级和集群响应能力

需要注意的是,由于worker的预读,如果一堆任务同时提交,它们可能一开始就没有优先顺序。禁用预读将防止此问题,但可能会导致小型、快速任务的性能不理想。在大多数情况下,简单地将worker_prefetch_multiplier减少到 1 是一种更简单、更简洁的方式来提高系统的响应能力,而无需完全禁用预读。

请注意,使用 redis 最为borker时,优先级值是反向排序的:0 是最高优先级。


Broadcast:广播

Celery 还可以支持广播路由。这是一个示例exchange broadcast_tasks,它将任务副本传递给与其连接的所有worker:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

Now the tasks.reload_cache task will be sent to every worker consuming from this queue.

这是另一个示例,这次有一个celery beat调度:

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

Broadcast & Result:

注意:Celery result没有定义当两个任务有相同ID的时候会发生什么。如果相同ID的任务被分不到了多个worker,那么状态历史不会被保存。

这种情况使用task.ignore_result属性是个不错的选择