监控和管理指南

介绍

有一些工具可以用来监控和检查Celery集群。

本文档介绍了其中的一些,以及与监控相关的功能,如事件和广播命令。

Workers

命令行管理工具(inspect/control)

celery也可以用来检查和管理worker节点(以及有限的任务管理功能

列出所有可用命令:

$ celery --help

子命令帮助:

$ celery <command> --help

Commandsopen in new window

  • shell: 进入一个Python shell。

    The locals will include the celery variable: this is the current app. Also all known tasks will be automatically added to locals (unless the --without-tasksopen in new window flag is set).

    如果安装了IPython,bpython或者常规python,则按照顺序使用。也可以强制指定:--ipythonopen in new window, --bpythonopen in new window, or --pythonopen in new window.

  • status: 列出集群中的活动节点

    $ celery -A proj status
    
  • result:显示一个任务的结果

    $ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
    

    请注意,只要任务不使用自定义结果后端,就可以省略任务的名称。

  • purge:清除所有已配置的任务队列中的消息

    这个命令会清除所有CELERY_QUEUES设置中配置的队列中的消息:

    Warning:

    该操作无法撤回,消息会被永久清除!

    $ celery -A proj purge
    

    也可以使用-Q选项指定要清除的队列:

    $ celery -A proj purge -Q celery,foo,bar
    

    排除某个队列使用-X选项:

    $ celery -A proj purge -X celery
    
  • inspect active:列出活动任务

    $ celery -A proj inspect active
    

    当前所有正在执行的任务

  • inspect scheduled :列出计划的ETA任务

    $ celery -A proj inspect scheduled
    

    这些任务是worker在设置了ETA或倒计时参数时保留的任务。

  • inspect reserved :列出保留任务

    $ celery -A proj inspect reserved
    

    这将列出worker预读的所有任务,和当前正在等待执行的任务(不包括ETA)。

  • inspect revoked :列出已撤消任务的历史记录

    $ celery -A proj inspect revoked
    
  • inspect registered : 列出注册任务

    $ celery -A proj inspect registered
    
  • inspect stats : 展示worker统计数据(查看Statistics

    $ celery -A proj inspect stats
    
  • inspect query_task : 通过任务ID查看任务信息

    Any worker having a task in this set of ids reserved/active will respond with status and information.

    $ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
    

    也可以查询多个任务:

    $ celery -A proj inspect query_task id1 id2 ... idN
    
  • control enable_events :启用事件

    $ celery -A proj control enable_events
    
  • control disable_events : 关闭事件

    $ celery -A proj control disable_events
    
  • migrate :将任务从一个broker迁移到另一个broker(实验性)

    $ celery -A proj migrate redis://localhost amqp://localhost
    

    此命令会将一个broker上的所有任务迁移到另一个broker。由于此命令是新的和实验性的,因此在继续操作之前,您应该确保备份了数据。

Note:

所有检查和控制命令都支持--timeout选项,这是一个以秒为单位的数字。如果由于延迟没有收到响应,可以增加该数值。

Specifying destination nodes:指定目标节点

默认情况下,检查和控制命令会操作所有worker。你可以通过--destination指定单个或多个worker:

$ celery -A proj inspect -d w1@e.com,w2@e.com reserved

$ celery -A proj control -d w1@e.com,w2@e.com enable_events

Flower:实时web监控

Flower是Celery的一个基于web的监控和管理工具。它正在积极开发中,但已经是一个必备工具。作为Celery的推荐监控,它淘汰了Django Admin、celerymon和基于ncurses的监控。

Features:特性

  • 使用Celery事件进行实时监控

    • 任务进度和历史
    • 展示任务详情(参数、启动时间、运行时间等等)
    • 图表和统计数据
  • 远程控制

    • 查看worker状态和统计数据
    • 关闭和重启worker 实例
    • 控制worker pool size和自动缩放设置
    • 查看和修改worker实例使用的队列
    • 查看当前运行的任务
    • 查看计划任务(ETA/countdown)
    • 查看保留和撤消的任务
    • 应用时间和速率限制
    • 配置查看器
    • 撤消或终止任务
  • HTTP API

    • 列出worker
    • 关闭worker
    • 重启worker池
    • 扩大worker池
    • 缩小worker池
    • 自动缩放worker池
    • 开始从一个队列中消费
    • 停止从一个队列中消费
    • 列出任务
    • 列出任务类型
    • 获取一个任务信息
    • 执行一个任务
    • 通过名称执行任务
    • 获取任务结果
    • 修改一个任务的软、硬时间限制
    • 修改任务速率
    • 撤销任务
  • OpenID认证

截图:

dashboard

monitor

更多截图open in new window

Usage:用法

可以使用pip来安装Flower:

$ pip install flower

下面的命令可以启动一个web-server:

$ celery -A proj flower

默认访问地址是http://localhost:5555open in new window,可以使用--portopen in new window参数修改端口号:

$ celery -A proj flower --port=5555

可以通过--brokeropen in new window参数传递Broker URL:

$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

然后就可以在浏览器中打开flower:

$ open http://localhost:5555

Flower有许多特性,官方文档open in new window有更详细的信息。

celery events: Curses Monitor

2.0新增特性

celery events是一个简单点curses监控,可以显示任务和worker历史。可以检查任务结果和回溯任务,也支持一些管理命令比如限制速率和关闭workers。此监视器是作为概念验证启动的,您可能希望使用Flower。

启动:

$ celery -A proj events

你应该可以看到一个类似的屏幕:

celeryevshotsm

celery events还可以用来启动快照相机(查看Snapshots

$ celery -A proj events --camera=<camera-class> --frequency=1.0

包含一个工具可以将时间dump到stdout:

$ celery -A proj events --dump

要获得完整的选项列表,请使用**--help**:

$ celery events --help

RabbitMQ

管理Celery集群很重要的是要知道如何监控RabbitMQ。

RabbitMQ附带rabbitmqctlopen in new window命令,你可以用它获取队列、交换机、绑定、队列长度,每个队列的内存使用以及管理用户、虚拟主机及其权限。

Note:

本示例使用的是默认的虚拟主机"/",如果你是用的是自定义虚拟主机,需要使用-p选项比如:rabbitmqctl list_queues -p my_vhost …

Inspecting queues:检查队列

查找队列中的任务数量:

$ rabbitmqctl list_queues name messages messages_ready \
                          messages_unacknowledged

此处,messages_ready是准备传递的消息数(已发送但未接收),messages_unacknowledged是worker已接收但尚未确认的消息数,表示正在处理或已保留)。messages是就绪消息和未确认消息的总和。

正在从当前队列消费的worker的数量:

$ rabbitmqctl list_queues name consumers

给某个队列分配的内存量:

$ rabbitmqctl list_queues name memory

Tip:-q选项可以让输出更容易解析。

Redis

如果使用Redis,可以用redis-cli命令列出队列长度

检查队列

查找队列中的任务数量:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

默认队列名是*celery。*获取所有可用的队列:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*

队列键只有在其中有任务时才存在,因此如果键不存在,则只意味着该队列中没有消息。这是因为在Redis中,一个没有元素的列表会被自动删除,因此它不会显示在keys命令输出中,并且该列表的llen返回0。

此外,如果您将Redis用于其他目的,keys命令的输出将包括存储在数据库中的无关值。推荐的办法是给Celery一个专用的DATABASE_NUMBER。您还可以使用DATABASE_NUMBER将Celery应用程序彼此分开(虚拟主机),但这不会影响Flower等使用的监控事件,因为Redis pub/sub命令是全局的,而不是基于数据库的。

Munin

这是一个已知的Munin插件列表,在维护Celery集群时可能会很有用。

Munin是Linux下的一个监控工具。可以在官网open in new window获取更多信息。

Events:事件

worker有能力在任何事件发生时发送消息。这些事件会被诸如Flower、celery events这样的工具捕获用来监控集群。

Snapshots:快照

2.1新增特性

即便是一个worker也可以产生巨量的时间,因此存储所有事件可能会很贵。

通过一系列事件描述该时间段内的集群状态,通过对该状态进行定期快照,您可以保留所有历史记录,但仍只是定期将其写入磁盘。

要拍摄快照,您需要一个 Camera 类,通过它您可以定义每次捕获状态时应该发生的情况; 您可以将其写入数据库,通过电子邮件或其他方式发送。

然后使用 celery events与相机拍摄快照,例如,如果您想使用 myapp.Camera 每 2 秒捕获一次状态,您可以使用以下参数运行celery events

$ celery -A proj events -c myapp.Camera --frequency=2.0

Custom Camera

如果你需要捕获事件或者定期对这些事件做点什么,Cameras会很有用。对于事件的实时处理,你应该直接使用app.events.Receiver,就像在Real-time processing中那样。

下面是一个示例camera,将快照转储到屏幕上

from pprint import pformat

from celery.events.snapshot import Polaroid

class DumpCam(Polaroid):
    clear_after = True  # clear after flush (incl, state.event_count).

    def on_shutter(self, state):
        if not state.event_count:
            # No new events since last snapshot.
            return
        print('Workers: {0}'.format(pformat(state.workers, indent=4)))
        print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
        print('Total: {0.event_count} events, {0.task_count} tasks'.format(
            state))

celery.events.stateopen in new window处获取更多关于state对象的内容。

现在你可以和celery events一起使用这个cam通过-copen in new window选项

$ celery -A proj events -c myapp.DumpCam --frequency=2.0

或者以编程的方式使用:

from celery import Celery
from myapp import DumpCam

def main(app, freq=1.0):
    state = app.events.State()
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': state.event})
        with DumpCam(state, freq=freq):
            recv.capture(limit=None, timeout=None)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    main(app)

Real-time processing

要实时处理事件你需要:

  • 一个事件消费者(这是接收器)

  • 事件传入时调用的一组处理程序

    您可以为每种事件类型设置不同的处理程序,也可以使用通用处理程序(‘*’)

  • State(可选的

    app.events.State是集群中任务和worker的便捷内存表示,随着事件的到来而更新。

    它封装了许多常见问题的解决方案,例如检查worker是否还活着(通过验证心跳)、在事件进入时将事件字段合并在一起、确保时间戳同步等等。

结合这些功能,您可以轻松地实时处理事件:

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Note:

The wakeup argument to capture sends a signal to all workers to force them to send a heartbeat. This way you can immediately see workers when the monitor starts.

capture 的wakeup参数向所有worker发送一个信号,强制他们发送心跳。 这样,您可以在监视器启动时立即看到工作人员。

您可以通过指定处理程序来监听特定事件

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

事件引用

这个列表包含worker发送的事件以及它们的参数。

任务事件

task-send

signature

task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)

当一个任务消息被分布,并且task_send_sent_event处于开启状态时发送(一个事件)。

task-received

signature

task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)

当worker收到任务时发送。

task-started

signature

task-started(uuid, hostname, timestamp, pid)

在worker执行任务之前发送。

task-succeeded

signature

task-succeeded(uuid, result, runtime, hostname, timestamp)

任务执行成功后发送

Run-time is the time it took to execute the task using the pool. (Starting from the task is sent to the worker pool, and ending when the pool result handler callback is called).

task-failed

signature

task-failed(uuid, exception, traceback, hostname, timestamp)

在任务执行失败时发送。

task-rejected

signature

task-rejected(uuid, requeued)

worker拒绝了任务,可能会重新排队或移动到死信队列。

task-revoked

signature

task-revoked(uuid, terminated, signum, expired)

如果任务已撤消,则发送(请注意,这可能是由多个worker发送的)。

  • 如果任务进程已终止,则将Terminated设置为True,并将signum字段设置为所使用的信号。

  • 如果任务过期,则expired设置为true。

task-retried

signature

task-retried(uuid, exception, traceback, hostname, timestamp)

如果任务失败,则发送,但将在以后重试。

Worker Events

worker-online

signature

worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

worker已经连接到broker并处于联机状态

  • hostname:worker的Nodename
  • timestamp:事件的时间戳
  • freq:心跳频率(以秒为单位,float类型
  • sw_ident:Name of worker software (e.g., py-celery).
  • *sw_ver: *软件版本(e.g., 2.2.0).
  • sw_sys:操作系统(e.g., Linux/Darwin).

worker-heartbeat

signature

worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)
  • hostname:worker的Nodename
  • timestamp:事件的时间戳
  • freq:心跳频率(以秒为单位,float类型
  • sw_ident:Name of worker software (e.g., py-celery).
  • *sw_ver: *软件版本(e.g., 2.2.0).
  • sw_sys:操作系统(e.g., Linux/Darwin).
  • sw_sys:当前执行任务数
  • processed:该worker处理的任务总数

worker-offline

signature

worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

worker已经从broker断开连接。