测试
任务及单元测试
要在单元测试中测试任务行为,首选方法是mocking。
Eager mode:
task_always_eager 设置启用的eager模式根据定义不适合单元测试。
使用eager模式进行测试时,只是在worker中模拟发生了什么,这和真实情况会存在许多差异。
请注意,eager模式下执行的任务默认情况下不会将结果写入后端。 如果要启用此功能,请查看 task_store_eager_result。
Celery 任务很像 web 视图,因为它应该只定义如何在被称为任务的上下文中执行操作。
这意味着任务最好只处理序列化、消息头、重试等事情,而实际逻辑在其他地方实现。
假设我们有这样一个任务:
from .models import Product
@app.task(bind=True)
def send_order(self, product_pk, quantity, price):
price = Decimal(price) # json serializes this to string.
# models are passed by id, not serialized.
product = Product.objects.get(product_pk)
try:
product.order(quantity, price)
except OperationalError as exc:
raise self.retry(exc=exc)
绑定的任务意味着任务的第一个参数将始终是任务实例(self)。 这意味着您确实将 self 参数作为第一个参数,并且可以使用 Task 类方法和属性。
你可以像下面的示例使用mocking给这个任务写单元测试:
from pytest import raises
from celery.exceptions import Retry
## for python 2: use mock.patch from `pip install mock`.
from unittest.mock import patch
from proj.models import Product
from proj.tasks import send_order
class test_send_order:
@patch('proj.tasks.Product.order') # < patching Product in module above
def test_success(self, product_order):
product = Product.objects.create(
name='Foo',
)
send_order(product.pk, 3, Decimal(30.3))
product_order.assert_called_with(3, Decimal(30.3))
@patch('proj.tasks.Product.order')
@patch('proj.tasks.send_order.retry')
def test_failure(self, send_order_retry, product_order):
product = Product.objects.create(
name='Foo',
)
# Set a side effect on the patched methods
# so that they raise the errors we want.
send_order_retry.side_effect = Retry()
product_order.side_effect = OperationalError()
with raises(Retry):
send_order(product.pk, 3, Decimal(30.6))
pytest
4.0新增特性
Celery还提供了一个pytest插件,它添加了可以在集成(或单元)测试套件中使用的fixture。
Celery also makes a pytest plugin available that adds fixtures that you can use in your integration (or unit) test suites.
Enabling
默认这些插件是禁用状态,启用的话可以使用:
pip install celery[pytest]
pip install pytest-celery
- 或者添加
PYTEST_PLUGINS=celery.contrib.pytest
的环境变量 - or add
pytest_plugins = ("celery.contrib.pytest", )
to your root conftest.py
Marks
celery
- Set test app configuration:配置测试应用
celery
mark允许你为一个测试用例覆盖配置:
@pytest.mark.celery(result_backend='redis://')
def test_something():
...
或者一个类中的所有测试用例
@pytest.mark.celery(result_backend='redis://')
class test_something:
def test_one(self):
...
def test_two(self):
...
Fixtures
Function scope:作用域
celery_app
- 用于测试的Celery app
fixture返回一个可以用于测试的Celery app
示例:
def test_create_task(celery_app, celery_worker):
@celery_app.task
def mul(x, y):
return x * y
assert mul.delay(4, 4).get(timeout=10) == 16
celery_worker
- Embed live worker.
这个fixture启动一个Celery worker示例,您可以使用它进行集成测试。这个worker在一个 separate thread 启动,并且在测试返回后关闭。
默认情况下,fixture 最多会等待 10 秒让worker完成未完成的任务,如果超过时间限制,则会引发异常。 可以通过在 celery_worker_parameters()
fixture返回的字典中设置 shutdown_timeout
来自定义超时。
The timeout can be customized by setting the shutdown_timeout
key in the dictionary returned by the celery_worker_parameters()
fixture.
示例:
## Put this in your conftest.py
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'amqp://',
'result_backend': 'redis://'
}
def test_add(celery_worker):
mytask.delay()
## If you wish to override some setting in one test cases
## only - you can use the ``celery`` mark:
@pytest.mark.celery(result_backend='rpc')
def test_other(celery_worker):
...
默认情况下Heartbeats是关闭的,这意味着测试worker不会为worker-online
,worker-offline
和worker-heartbeat
发送时间。修改 celery_worker_parameters()
fixture来开启heartbeats:
## Put this in your conftest.py
@pytest.fixture(scope="session")
def celery_worker_parameters():
return {"without_heartbeat": False}
...
Session scope:会话范围
celery_config
- Override to setup Celery test app configuration.
您可以重新定义这个fixture以配置Celery 测试应用。
你的fixture返回的配置将用于配置 celery_app()
和 celery_session_app()
fixtures。
示例:
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'amqp://',
'result_backend': 'rpc',
}
celery_parameters
- Override to setup Celery test app parameters.
你可以重新定义这个fixture来改变测试Celery app的 __init__
参数。和celery_config()
区别很大的是,这些会在Celery实例化的时候直接传递。
你的fixture返回的配置将用于配置 celery_app()
和 celery_session_app()
fixtures。
示例:
@pytest.fixture(scope='session')
def celery_parameters():
return {
'task_cls': my.package.MyCustomTaskClass,
'strict_typing': False,
}
celery_worker_parameters
- Override to setup Celery test app parameters.
你可以重新定义这个fixture来改变测试Celery app的 __init__
参数。这些会在WorkController实例化的时候直接传递。
你的fixture返回的配置将用于配置 celery_app()
和 celery_session_app()
fixtures。
示例:
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
'queues': ('high-prio', 'low-prio'),
'exclude_queues': ('celery'),
}
celery_enable_logging
- Override to enable logging in embedded workers
This is a fixture you can override to enable logging in embedded workers.
示例:
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
celery_includes
- Add additional imports for embedded workers.
You can override fixture to include modules when an embedded worker starts.
您可以让它返回要导入的模块名称列表,可以是任务模块、注册信号的模块等。
示例:
@pytest.fixture(scope='session')
def celery_includes():
return [
'proj.tests.tasks',
'proj.tests.celery_signal_handlers',
]
celery_worker_pool
- Override the pool used for embedded workers.
You can override fixture to configure the execution pool used for embedded workers.
示例:
@pytest.fixture(scope='session')
def celery_worker_pool():
return 'prefork'
Warning:
不能使用 gevent/eventlet 池,除非您的整个测试套件在启用了猴子补丁的情况下运行。
celery_session_worker
- Embedded worker that lives throughout the session.¶
This fixture starts a worker that lives throughout the testing session (it won't be started/stopped for every test).
示例:
## Add this to your conftest.py
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'amqp://',
'result_backend': 'rpc',
}
## Do this in your tests.
def test_add_task(celery_session_worker):
assert add.delay(2, 2) == 4
Warning:
将临时worker和会话混合在一起不是一个好主意
celery_session_app
- 用于测试的Celery app (会话范围).
This can be used by other session scoped fixtures when they need to refer to a Celery app instance.
use_celery_app_trap
- Raise exception on falling back to default app.
This is a fixture you can override in your conftest.py
, to enable the "app trap": if something tries to access the default or current_app, an exception is raised.
示例:
@pytest.fixture(scope='session')
def use_celery_app_trap():
return True
If a test wants to access the default app, you would have to mark it using the depends_on_current_app
fixture:
@pytest.mark.usefixtures('depends_on_current_app')
def test_something():
something()