测试

任务及单元测试

要在单元测试中测试任务行为,首选方法是mocking。


Eager mode:

task_always_eager 设置启用的eager模式根据定义不适合单元测试。

使用eager模式进行测试时,只是在worker中模拟发生了什么,这和真实情况会存在许多差异。

请注意,eager模式下执行的任务默认情况下不会将结果写入后端。 如果要启用此功能,请查看 task_store_eager_result。


Celery 任务很像 web 视图,因为它应该只定义如何在被称为任务的上下文中执行操作。

这意味着任务最好只处理序列化、消息头、重试等事情,而实际逻辑在其他地方实现。

假设我们有这样一个任务:

from .models import Product


@app.task(bind=True)
def send_order(self, product_pk, quantity, price):
    price = Decimal(price)  # json serializes this to string.

    # models are passed by id, not serialized.
    product = Product.objects.get(product_pk)

    try:
        product.order(quantity, price)
    except OperationalError as exc:
        raise self.retry(exc=exc)

绑定的任务意味着任务的第一个参数将始终是任务实例(self)。 这意味着您确实将 self 参数作为第一个参数,并且可以使用 Task 类方法和属性。

你可以像下面的示例使用mocking给这个任务写单元测试:

from pytest import raises

from celery.exceptions import Retry

## for python 2: use mock.patch from `pip install mock`.
from unittest.mock import patch

from proj.models import Product
from proj.tasks import send_order

class test_send_order:

    @patch('proj.tasks.Product.order')  # < patching Product in module above
    def test_success(self, product_order):
        product = Product.objects.create(
            name='Foo',
        )
        send_order(product.pk, 3, Decimal(30.3))
        product_order.assert_called_with(3, Decimal(30.3))

    @patch('proj.tasks.Product.order')
    @patch('proj.tasks.send_order.retry')
    def test_failure(self, send_order_retry, product_order):
        product = Product.objects.create(
            name='Foo',
        )

        # Set a side effect on the patched methods
        # so that they raise the errors we want.
        send_order_retry.side_effect = Retry()
        product_order.side_effect = OperationalError()

        with raises(Retry):
            send_order(product.pk, 3, Decimal(30.6))

pytest

4.0新增特性

Celery还提供了一个pytestopen in new window插件,它添加了可以在集成(或单元)测试套件中使用的fixture。

Celery also makes a pytestopen in new window plugin available that adds fixtures that you can use in your integration (or unit) test suites.

Enabling

默认这些插件是禁用状态,启用的话可以使用:

  • pip install celery[pytest]
  • pip install pytest-celery
  • 或者添加PYTEST_PLUGINS=celery.contrib.pytest的环境变量
  • or add pytest_plugins = ("celery.contrib.pytest", ) to your root conftest.py

Marks

celery - Set test app configuration:配置测试应用

celery mark允许你为一个测试用例覆盖配置:

@pytest.mark.celery(result_backend='redis://')
def test_something():
    ...

或者一个类中的所有测试用例

@pytest.mark.celery(result_backend='redis://')
class test_something:

    def test_one(self):
        ...

    def test_two(self):
        ...

Fixtures

Function scope:作用域

celery_app - 用于测试的Celery app

fixture返回一个可以用于测试的Celery app

示例:

def test_create_task(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    assert mul.delay(4, 4).get(timeout=10) == 16

celery_worker - Embed live worker.

这个fixture启动一个Celery worker示例,您可以使用它进行集成测试。这个worker在一个 separate thread 启动,并且在测试返回后关闭。

默认情况下,fixture 最多会等待 10 秒让worker完成未完成的任务,如果超过时间限制,则会引发异常。 可以通过在 celery_worker_parameters() fixture返回的字典中设置 shutdown_timeout 来自定义超时。

The timeout can be customized by setting the shutdown_timeout key in the dictionary returned by the celery_worker_parameters() fixture.

示例:

## Put this in your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'redis://'
    }

def test_add(celery_worker):
    mytask.delay()


## If you wish to override some setting in one test cases
## only - you can use the ``celery`` mark:
@pytest.mark.celery(result_backend='rpc')
def test_other(celery_worker):
    ...

默认情况下Heartbeats是关闭的,这意味着测试worker不会为worker-online,worker-offlineworker-heartbeat发送时间。修改 celery_worker_parameters()fixture来开启heartbeats:

## Put this in your conftest.py
@pytest.fixture(scope="session")
def celery_worker_parameters():
    return {"without_heartbeat": False}
    ...

Session scope:会话范围

celery_config- Override to setup Celery test app configuration.

您可以重新定义这个fixture以配置Celery 测试应用。

你的fixture返回的配置将用于配置 celery_app()celery_session_app() fixtures。

示例:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }

celery_parameters - Override to setup Celery test app parameters.

你可以重新定义这个fixture来改变测试Celery app的 __init__ 参数。和celery_config()区别很大的是,这些会在Celeryopen in new window实例化的时候直接传递。

你的fixture返回的配置将用于配置 celery_app()celery_session_app() fixtures。

示例:

@pytest.fixture(scope='session')
def celery_parameters():
    return {
        'task_cls':  my.package.MyCustomTaskClass,
        'strict_typing': False,
    }

celery_worker_parameters - Override to setup Celery test app parameters.

你可以重新定义这个fixture来改变测试Celery app的 __init__ 参数。这些会在WorkControlleropen in new window实例化的时候直接传递。

你的fixture返回的配置将用于配置 celery_app()celery_session_app() fixtures。

示例:

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('high-prio', 'low-prio'),
        'exclude_queues': ('celery'),
    }

celery_enable_logging - Override to enable logging in embedded workers

This is a fixture you can override to enable logging in embedded workers.

示例:

@pytest.fixture(scope='session')
def celery_enable_logging():
    return True

celery_includes - Add additional imports for embedded workers.

You can override fixture to include modules when an embedded worker starts.

您可以让它返回要导入的模块名称列表,可以是任务模块、注册信号的模块等。

示例:

@pytest.fixture(scope='session')
def celery_includes():
    return [
        'proj.tests.tasks',
        'proj.tests.celery_signal_handlers',
    ]

celery_worker_pool - Override the pool used for embedded workers.

You can override fixture to configure the execution pool used for embedded workers.

示例:

@pytest.fixture(scope='session')
def celery_worker_pool():
    return 'prefork'

Warning:

不能使用 gevent/eventlet 池,除非您的整个测试套件在启用了猴子补丁的情况下运行。

celery_session_worker - Embedded worker that lives throughout the session.¶

This fixture starts a worker that lives throughout the testing session (it won't be started/stopped for every test).

示例:

## Add this to your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }

## Do this in your tests.
def test_add_task(celery_session_worker):
    assert add.delay(2, 2) == 4

Warning:

将临时worker和会话混合在一起不是一个好主意

celery_session_app - 用于测试的Celery app (会话范围).

This can be used by other session scoped fixtures when they need to refer to a Celery app instance.

use_celery_app_trap - Raise exception on falling back to default app.

This is a fixture you can override in your conftest.py, to enable the "app trap": if something tries to access the default or current_app, an exception is raised.

示例:

@pytest.fixture(scope='session')
def use_celery_app_trap():
    return True

If a test wants to access the default app, you would have to mark it using the depends_on_current_app fixture:

@pytest.mark.usefixtures('depends_on_current_app')
def test_something():
    something()