配置和默认配置
示例配置
一个包含基本配置的示例:
### Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'
## List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)
### Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'
task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
新的小写设置
4.0新增加了小写设置。下面是对照表。
旧版配置格式在6.0之后将不再支持,尽快升级。Celery提供了升级命令,大概这儿能看到。
Setting name | Replace with |
---|---|
CELERY_ACCEPT_CONTENT | accept_content |
CELERY_ENABLE_UTC | enable_utc |
CELERY_IMPORTS | imports |
CELERY_INCLUDE | include |
CELERY_TIMEZONE | timezone |
CELERYBEAT_MAX_LOOP_INTERVAL | beat_max_loop_interval |
CELERYBEAT_SCHEDULE | beat_schedule |
CELERYBEAT_SCHEDULER | beat_scheduler |
CELERYBEAT_SCHEDULE_FILENAME | beat_schedule_filename |
CELERYBEAT_SYNC_EVERY | beat_sync_every |
BROKER_URL | broker_url |
BROKER_TRANSPORT | broker_transport |
BROKER_TRANSPORT_OPTIONS | broker_transport_options |
BROKER_CONNECTION_TIMEOUT | broker_connection_timeout |
BROKER_CONNECTION_RETRY | broker_connection_retry |
BROKER_CONNECTION_MAX_RETRIES | broker_connection_max_retries |
BROKER_FAILOVER_STRATEGY | broker_failover_strategy |
BROKER_HEARTBEAT | broker_heartbeat |
BROKER_LOGIN_METHOD | broker_login_method |
BROKER_POOL_LIMIT | broker_pool_limit |
BROKER_USE_SSL | broker_use_ssl |
CELERY_CACHE_BACKEND | cache_backend |
CELERY_CACHE_BACKEND_OPTIONS | cache_backend_options |
CASSANDRA_COLUMN_FAMILY | ((20220807005220-jh8rb2w "cassandra_table")) |
CASSANDRA_ENTRY_TTL | cassandra_entry_ttl |
CASSANDRA_KEYSPACE | cassandra_keyspace |
CASSANDRA_PORT | cassandra_port |
CASSANDRA_READ_CONSISTENCY | cassandra_read_consistency |
CASSANDRA_SERVERS | cassandra_servers |
CASSANDRA_WRITE_CONSISTENCY | cassandra_write_consistency |
CASSANDRA_OPTIONS | cassandra_options |
S3_ACCESS_KEY_ID | s3_access_key_id |
S3_SECRET_ACCESS_KEY | s3_secret_access_key |
S3_BUCKET | s3_bucket |
S3_BASE_PATH | s3_base_path |
S3_ENDPOINT_URL | s3_endpoint_url |
S3_REGION | s3_region |
CELERY_COUCHBASE_BACKEND_SETTINGS | couchbase_backend_settings |
CELERY_ARANGODB_BACKEND_SETTINGS | arangodb_backend_settings |
CELERY_MONGODB_BACKEND_SETTINGS | mongodb_backend_settings |
CELERY_EVENT_QUEUE_EXPIRES | event_queue_expires |
CELERY_EVENT_QUEUE_TTL | event_queue_ttl |
CELERY_EVENT_QUEUE_PREFIX | event_queue_prefix |
CELERY_EVENT_SERIALIZER | event_serializer |
CELERY_REDIS_DB | redis_db |
CELERY_REDIS_HOST | redis_host |
CELERY_REDIS_MAX_CONNECTIONS | redis_max_connections |
CELERY_REDIS_USERNAME | redis_username |
CELERY_REDIS_PASSWORD | redis_password |
CELERY_REDIS_PORT | redis_port |
CELERY_REDIS_BACKEND_USE_SSL | redis_backend_use_ssl |
CELERY_RESULT_BACKEND | result_backend |
CELERY_MAX_CACHED_RESULTS | result_cache_max |
CELERY_MESSAGE_COMPRESSION | result_compression |
CELERY_RESULT_EXCHANGE | result_exchange |
CELERY_RESULT_EXCHANGE_TYPE | result_exchange_type |
CELERY_RESULT_EXPIRES | result_expires |
CELERY_RESULT_PERSISTENT | result_persistent |
CELERY_RESULT_SERIALIZER | result_serializer |
CELERY_RESULT_DBURI | Use result_backend instead. |
CELERY_RESULT_ENGINE_OPTIONS | database_engine_options |
[...]_DB_SHORT_LIVED_SESSIONS | database_short_lived_sessions |
CELERY_RESULT_DB_TABLE_NAMES | database_db_names |
CELERY_SECURITY_CERTIFICATE | security_certificate |
CELERY_SECURITY_CERT_STORE | security_cert_store |
CELERY_SECURITY_KEY | security_key |
CELERY_ACKS_LATE | task_acks_late |
CELERY_ACKS_ON_FAILURE_OR_TIMEOUT | task_acks_on_failure_or_timeout |
CELERY_ALWAYS_EAGER | task_always_eager |
CELERY_ANNOTATIONS | task_annotations |
CELERY_COMPRESSION | task_compression |
CELERY_CREATE_MISSING_QUEUES | task_create_missing_queues |
CELERY_DEFAULT_DELIVERY_MODE | task_default_delivery_mode |
CELERY_DEFAULT_EXCHANGE | task_default_exchange |
CELERY_DEFAULT_EXCHANGE_TYPE | task_default_exchange_type |
CELERY_DEFAULT_QUEUE | task_default_queue |
CELERY_DEFAULT_RATE_LIMIT | task_default_rate_limit |
CELERY_DEFAULT_ROUTING_KEY | task_default_routing_key |
CELERY_EAGER_PROPAGATES | task_eager_propagates |
CELERY_IGNORE_RESULT | task_ignore_result |
CELERY_PUBLISH_RETRY | task_publish_retry |
CELERY_PUBLISH_RETRY_POLICY | task_publish_retry_policy |
CELERY_QUEUES | task_queues |
CELERY_ROUTES | task_routes |
CELERY_SEND_SENT_EVENT | task_send_sent_event |
CELERY_SERIALIZER | task_serializer |
CELERYD_SOFT_TIME_LIMIT | task_soft_time_limit |
CELERY_TASK_TRACK_STARTED | task_track_started |
CELERY_TASK_REJECT_ON_WORKER_LOST | task_reject_on_worker_lost |
CELERYD_TIME_LIMIT | task_time_limit |
CELERYD_AGENT | worker_agent |
CELERYD_AUTOSCALER | worker_autoscaler |
CELERYD_CONCURRENCY | worker_concurrency |
CELERYD_CONSUMER | worker_consumer |
CELERY_WORKER_DIRECT | worker_direct |
CELERY_DISABLE_RATE_LIMITS | worker_disable_rate_limits |
CELERY_ENABLE_REMOTE_CONTROL | worker_enable_remote_control |
CELERYD_HIJACK_ROOT_LOGGER | worker_hijack_root_logger |
CELERYD_LOG_COLOR | worker_log_color |
CELERYD_LOG_FORMAT | worker_log_format |
CELERYD_WORKER_LOST_WAIT | worker_lost_wait |
CELERYD_MAX_TASKS_PER_CHILD | worker_max_tasks_per_child |
CELERYD_POOL | worker_pool |
CELERYD_POOL_PUTLOCKS | worker_pool_putlocks |
CELERYD_POOL_RESTARTS | worker_pool_restarts |
CELERYD_PREFETCH_MULTIPLIER | worker_prefetch_multiplier |
CELERYD_REDIRECT_STDOUTS | worker_redirect_stdouts |
CELERYD_REDIRECT_STDOUTS_LEVEL | worker_redirect_stdouts_level |
CELERY_SEND_EVENTS | worker_send_task_events |
CELERYD_STATE_DB | worker_state_db |
CELERYD_TASK_LOG_FORMAT | worker_task_log_format |
CELERYD_TIMER | worker_timer |
CELERYD_TIMER_PRECISION | worker_timer_precision |
配置指令
一般配置
accept_content
默认:{'json'} (set, list, or tuple)
Content-types/serializers可接受的格式白名单
白名单之外的格式的消息将会被丢弃,同时伴随一个错误。
默认只有json是开启的,可以添加任意content type,包括pickle和yaml。这种情况下确保只有可信的客户端才能访问broker。更多信息参考:Security
示例:
## using serializer name
accept_content = ['json']
## or the actual content-type (MIME)
accept_content = ['application/json']
result_accept_content
默认:None(可以是set, list, tuple)
4.3新增加配置
结果后端可接受的 content-types/serializers 格式白名单。
白名单之外的格式的消息将会被丢弃,同时伴随一个错误。
默认情况下和accept_content一样,也可以指定不同的序列化程序。通常,如果使用签名消息,并且结果以未签名的形式存储在结果后端,则需要执行此操作。
示例:
## using serializer name
result_accept_content = ['json']
## or the actual content-type (MIME)
result_accept_content = ['application/json']
时间和日期设置
enable_utc
2.5增加配置
默认:3.0之后默认开启
开启后消息中的日期和时间会被转成UTC时间
Note that workers running Celery versions below 2.5 will assume a local timezone for all messages, so only enable if all workers have been upgraded.
timezone
2.5新增配置
默认:"UTC"
设置Celery使用的时区。所有 pytz 库中的时区都可用。
If not set the UTC timezone is used. For backwards compatibility there’s also a enable_utc setting, and when this is set to false the system local timezone is used instead.
任务设置
task_annotations
2.5新增配置
默认:None
该设置可以用来重写任何从配置中读取的task属性。该设置可以是一个词典,也可以是一个注释对象列表,用于筛选任务并返回要更改的属性映射。
更改tasks.add
任务中的rate_limit
属性:
task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
或者修改所有任务:
task_annotations = {'*': {'rate_limit': '10/s'}}
还可以修改方法,比如on_failure
:
def my_on_failure(self, exc, task_id, args, kwargs, einfo):
print('Oh no! Task failed: {0!r}'.format(exc))
task_annotations = {'*': {'on_failure': my_on_failure}}
如果您需要更高的灵活性,则可以使用对象而不是字典来选择要批注的任务:
class MyAnnotate:
def annotate(self, task):
if task.name.startswith('tasks.'):
return {'rate_limit': '10/s'}
task_annotations = (MyAnnotate(), {other,})
task_compression
默认:None
任务消息使用的压缩方式。可以是gzip
,bzip2
(如果可用),或者任何其他压缩格式 registered in the Kombu compression registry。(kombu是python的一个消息库)
默认情况下发送不压缩的消息。
task_protocol
默认:2(4.0开始)
设置消息协议的版本。支持的协议版本为1和2。4.x+以及3.1.24支持协议2
task_serializer
默认:"json" (4.0开始,早期版本为pickle)
指定要使用的默认的序列化方式(值类型为字符串)。可以是json
,pickle
,yaml
,msgpack
或者其他在kombu.serialization.registry
注册的自定义序列化方法。
See also:
task_publish_retry
2.2新增配置
默认: Enabled.
决定在连接中断或其他连接错误的情况下是否重试发布任务消息。另请参阅 task_publish_retry_policy.
task_publish_retry_policy
2.2新增配置
默认: 查看 Message Sending Retry.
定义在连接中断或其他连接错误的情况下重试发布任务消息时的默认策略。
任务执行设置
task_always_eager
默认:Disabled
如果为True
,所有任务将在本地阻塞执行直到返回。apply_async()
和Task.delay()
将返回一个EagerResult实例,that emulates the API and behavior of AsyncResult``, except the result is already evaluated.
就是说,任务不会发送到队列中,而是本地执行。
task_eager_propagates
默认: Disabled.
如果为 True
, 紧急任务 (通过 task.apply() 执行,或者开启了task_always_eager), will propagate exceptions.
It's the same as always running apply()
with throw=True
.
task_store_eager_result
5.1新增配置
默认: Disabled.
如果为True
并且task_always_eager为 True
以及task_ignore_result为 False
,那么 eagerly(紧急的) 执行的任务结果会被保存到后端。
默认情况下,即使task_always_eager为 True
以及task_ignore_result为 False
,那么 eagerly(紧急的) 执行的任务结果也不会被保存到后端。
task_remote_tracebacks
默认: Disabled.
如果开启,任务结果将包括引发错误的堆栈信息。
这个需要 tblib 库,可以通过下面的命令安装。
pip install celery[tblib]
task_ignore_result
默认: Disabled.
是否存储任务返回值,如果你希望存储错误信息而不仅仅是成功的返回值,可以设置task_store_errors_even_if_ignored
task_store_errors_even_if_ignored
默认: Disabled.
设置后worker会存储所有任务的错误信息,即使是开启了Task.ignore_result``
task_track_started
默认: Disabled.
如果设置为 True
,则任务被执行时会报告一个started
的状态。默认是False
。任务总是处于挂起、已完成或等待重试状态。当有长时间运行的任务,并且需要报告当前正在运行的任务时,拥有“已启动”状态会很有用。
task_time_limit
默认:不限制
任务硬执行时间限制,单位为秒,超时后任务会被终止并替换为新的任务。
task_soft_time_limit
默认: 不限制
任务的软超时时长,以秒为单位。
任务超时会受到 SoftTimeLimitExceeded`` 错误,在硬限制超时到来前可以捕捉到它来执行清理任务:
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
task_acks_late
默认: Disabled.
Late ack means the task messages will be acknowledged after the task has been executed, not just before (the default behavior).
See also:
task_acks_on_failure_or_timeout
默认: Enabled
When enabled messages for all tasks will be acknowledged even if they fail or time out.
Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if
task_acks_late is enabled.
task_reject_on_worker_lost
默认: Disabled.
即使开启 task_acks_late, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL
/INT
, etc).
Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning:
开启这个选项可能导致消息陷入循环,确保知道自己在做什么。
task_default_rate_limit
默认: 不限制.
全局的默认任务速率限制
该值用于没有自定义速率限制的任务
See also:
worker_disable_rate_limits设置可以关闭所有限速
任务结果后端设置
result_backend
默认:没有默认后端
后端用来存储任务结果,可以是下面的任意一个:
- rpc:RPC 后端设置
- database:数据库后端设置
- redis:Redis后端设置
- cache:Cache后端设置
- mongodb:MongoDB 后端设置
- cassandra:Cassandra后端设置
- elasticsearch:Elasticsearch后端设置
- ironcache:IronCache后端设置
- couchbase:Couchbase后端设置
- arangodb:ArangoDB后端设置
- couchdb:CouchDB后端设置
- cosmosdbsql(实验性):CosmosDB后端设置(实验性
- filesystem:File-system后端设置
- consul:Consul 存储后端设置
- azureblockblob:Azure Block Blob后端设置
- s3:S3后端设置
result_backend_always_retry
默认:False
If enable, backend will try to retry on the event of recoverable exceptions instead of propagating the exception.It will use an exponential backoff sleep time between 2 retries.
result_backend_max_sleep_between_retries_ms
默认: 10000
This specifies the maximum sleep time between two backend operation retry.
result_backend_base_sleep_between_retries_ms
默认: 10
This specifies the base amount of sleep time between two backend operation retry.
result_backend_max_retries
默认: Inf
这是可恢复异常情况下的最大重试次数。
result_backend_transport_options
默认: {}
(空字典).
A dict of additional options passed to the underlying transport.
See your transport user manual for supported options (如果有的话).
Example setting the visibility timeout (supported by Redis and SQS transports):
result_backend_transport_options = {'visibility_timeout': 18000} # 5 hours
result_serializer
默认: json
从 4.0 (早期版本: pickle).
结果序列化格式
查看Serializers:序列化获取更多序列化信息
result_compression
默认: 不压缩
给任务结果用的可选的压缩方法。和task_compression设置的选项相同。
result_extended
默认: False
将任务的扩展属性 (name, args, kwargs, worker, retries, queue, delivery_info) 写入后端。
result_expires
默认: 一天后过期
Time (in seconds, or a timedelta`` object) for when after stored task tombstones will be deleted.
A built-in periodic task will delete the results after this time (celery.backend_cleanup
), assuming that celery beat
is enabled. The task runs daily at 4am.
A value of None
or 0 means results will never expire (depending on backend specifications).
Note:
For the moment this only works with the AMQP, database, cache, Couchbase, and Redis backends.
When using the database backend,
celery beat
must be running for the results to be expired.
result_cache_max
默认: Disabled
Enables client caching of results.
This can be useful for the old deprecated 'amqp' backend where the result is unavailable as soon as one result instance consumes it.
This is the total number of results to cache before older results are evicted. A value of 0 or None means no limit, and a value of -1
will disable the cache.
result_chord_join_timeout
默认: 3.0
The timeout in seconds (int/float) when joining a group’s results within a chord.
result_chord_retry_interval
默认: 1.0
重试定时任务的时间间隔
override_backends
默认: Disabled
实现后端的类的路径
允许覆盖后端实现。如果您需要存储有关已执行任务的其他元数据、覆盖重试策略等,则此功能非常有用。
示例:
override_backends = {"db": "custom_module.backend.class"}
数据库后端设置
示例
要使用数据库作为后端,需要在result_backend设置中配置一个db+prefix
的URL:
result_backend = 'db+scheme://user:password@host:port/dbname'
示例:
## sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'
## mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'
## postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'
## oracle
result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'
Please see Supported Databases for a table of supported databases,and Connection String for more information about connectionstrings (this is the part of the URI that comes after the db+prefix
).
database_engine_options
默认:{} 空字典
使用该设置可以指定其他SQLAlchemy数据库引擎选项:
## echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}
database_short_lived_sessions
默认:Disabled
默认关闭,如果启用,它们可以显著降低性能,尤其是在处理大量任务的系统上。此选项对于由于缓存的数据库连接因不活动而过时而遇到错误的低流量情况非常有用,例如,可以通过启用短命会话来修复间歇性错误,如(OperationalError)(2006,“MySQL server has gone away”)。此选项仅影响数据库后端。
database_table_schemas
默认:{} 空字典
在使用SQLAlchemy 作为结果后端时,Celery会自动创建两张表来存储任务的元数据。这个设置允许你自定义表结构:
## use custom schema for the database result backend.
database_table_schemas = {
'task': 'celery',
'group': 'celery',
}
SQLAlchemy不是个数据库,是一个类似于ORM的玩意儿。
database_table_names
默认:{} 空字典
在使用SQLAlchemy 作为结果后端时,Celery会自动创建两张表来存储任务的元数据。这个设置允许你自定义表的名字:
## use custom table names for the database result backend.
database_table_names = {
'task': 'myapp_taskmeta',
'group': 'myapp_groupmeta',
}
RPC 后端设置
result_persistent
Default: Disabled by default (transient messages).
If set to True
, result messages will be persistent. This means the
messages won’t be lost after a broker restart.
Example configuration
result_backend = 'rpc://'
result_persistent = False
Please note : using this backend could trigger the raise of celery.backends.rpc.BacklogLimitExceeded
if the task tombstone is too old .
E.g.
for i in range(10000):
r = debug_task.delay()
print(r.state) # this would raise celery.backends.rpc.BacklogLimitExceeded
Cache后端设置
Note:
Cache 后端支持pylibmc
和python-memcached
两个库。优先使用pylibmc
。
使用单个Membercached server:
result_backend = 'cache+memcached://127.0.0.1:11211/'
多个:
result_backend = """
cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()
The “memory” backend stores the cache in memory only:
result_backend = 'cache'
cache_backend = 'memory'
cache_backend_options
默认:{} 空字典
可以使用这个选项设置pylibmc
库的选项:
cache_backend_options = {
'binary': True,
'behaviors': {'tcp_nodelay': True},
}
cache_backend
不再使用该设置,该设置的功能已集成到result_backend中。
Note:
The django-celery-results - Using the Django ORM/Cache as a result backend library uses cache_backend
for choosing django caches.
MongoDB 后端设置
Note:
需要pymongo
库
mongodb_backend_settings
这是个字典,支持的keys如下:
database
要链接的库的名称,默认是`celery`.
taskmeta_collection
存储任务元数据的`collection`的名称,默认是`celery_taskmeta`.
max_pool_size
Passed as max_pool_size to PyMongo’s Connection or MongoClient constructor. It is the maximum number of TCP connections to keep open to MongoDB at a given time. If there are more open connections than max_pool_size, sockets will be closed when they are released. Defaults to 10.
options
Additional keyword arguments to pass to the mongodb connection constructor. See the `pymongo` docs to see a list of arguments supported.
Example configuration
result_backend = 'mongodb://localhost:27017/'
mongodb_backend_settings = {
'database': 'mydb',
'taskmeta_collection': 'my_taskmeta_collection',
}
Redis后端设置
配置后端URL
注意:使用Redis作为后端需要安装
redis
库,使用pip install celery[redis]
来安装。查看 Bundles 获取更多信息。
几种配置方式:
## 格式
result_backend = 'redis://username:password@host:port/db'
## 两个示例,含义相同。不指定具体地址和 dbnumber 的时候会使用localhost和db0
result_backend = 'redis://localhost/0'
result_backend = 'redis://'
## TLS连接格式
result_backend = 'rediss://username:password@host:port/db?ssl_cert_reqs=required'
## socket连接格式
result_backend = 'socket:///path/to/redis.sock'
注意:
ssl_cert_reqs
应该是required
,optional
或者none
中的一个。(为了向后兼容也可能是CERT_REQUIRED
,CERT_OPTIONAL
,CERT_NONE
)
URL中的字段定义如下:
username
5.1.0新加入字段
只支持Redis>=6.0以及py-redis>=3.4.0
使用旧版本时可以省略该字段:result_backend = 'redis://:password@host:port/db'
password
: 密码host
: 域名或IP地址port
: Redis端口,默认6379db
: db number,默认是0
当使用TLS连接时,你可以在broker_use_ssl 中传递所有的值。Paths to certificates must be URL encoded, and ssl_cert_reqs
is required.示例:
result_backend = 'rediss://:password@host:port/db?\
ssl_cert_reqs=required\
&ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem\ # /var/ssl/myca.pem
&ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem\ # /var/ssl/redis-server-cert.pem
&ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem' # /var/ssl/private/worker-key.pem
这个地方有个歧义,不知道这句话是说
ssl_cert_reqs
这个参数是必须的还是说这个参数必须被设置从required
。不过这一段下边依然有上边提到的注意事项,推测是说这个参数必须有,而不是必须设置为required
注意:
ssl_cert_reqs
应该是required
,optional
或者none
中的一个。(为了向后兼容也可能是CERT_REQUIRED
,CERT_OPTIONAL
,CERT_NONE
)
redis_backend_health_check_interval
默认:未配置
健康检查间隔,也就是 x 秒检查一次是否存活。如果检查时碰到了超时或者连接错误,会重试一次。
redis_backend_use_ssl
默认:关闭
Redis后端支持SSL。该值需要以字典形式提供。可用的参数和broker_use_ssl下的 redis 小节相同
redis_max_connections
默认:无限制
最大连接数,超过之后会报一个ConnectionError
错误
redis_socket_connect_timeout
4.0.1新增加
默认:None
连接Socket的超时时间 (int/float)。
redis_socket_timeout
默认:120.0 Seconds。
读写操作时Socket的超时时间 (int/float),Redis作为后端时使用的配置。
redis_retry_on_timeout
4.4.1新增加
默认:False
上一个配置超时之后是否重试,Redis作为后端时使用的配置。使用Unix socket连接的Redis不要设置该值。
redis_socket_keepalive
4.4.1新增加
默认:False
是否保持TCP连接。Redis作为后端时使用的配置。
Cassandra后端设置
S3后端设置
Azure Block Blob后端设置
Elasticsearch后端设置
To use Elasticsearch as the result backend you simply need to configure the result_backend setting with the correct URL.
Example configuration
result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
elasticsearch_retry_on_timeout
Default: False
超时是否在不同个节点上重试
elasticsearch_max_retries
Default: 3.
抛出异常之前重试的次数
elasticsearch_timeout
Default: 10.0 seconds.
Elasticsearch后端使用的全局超时时间
elasticsearch_save_meta_as_text
Default: True
Should meta saved as text or as native json. Result is always serialized as text.
AWS DynamoDB后端设置
IronCache后端设置
Couchbase后端设置
ArangoDB后端设置
CosmosDB后端设置(实验性
CouchDB后端设置
File-system后端设置
This backend can be configured using a file URL, for example:
CELERY_RESULT_BACKEND = 'file:///var/celery/results'
The configured directory needs to be shared and writable by all servers using the backend.
If you're trying Celery on a single system you can simply use the backend without any further configuration. For larger clusters you could use NFS, GlusterFS, CIFS, HDFS (using FUSE), or any other file-system.
Consul 存储后端设置
消息路由
task_queues
默认:None
多数用户可能更应该使用Automatic routing:自动路由,而不是这个设置。
如果想要配置高级路由,这个设置是worker从kombu.Queue对象取的一个列表。
该设置可以使用 -Q
选项覆盖,此列表中的各个队列可以使用-X
选项通过名称排除。
查看Basics获取更多信息。
The default is a queue/exchange/binding
key of celery
, with exchange type direct
.
另请参阅task_routes
task_routes
默认:None
路由器列表,或用于将任务路由到队列的单个路由。到达目的之前会按照顺序查询路由列表。
一条路由可以被指定为:
- 带有签名的函数
(name, args, kwargs, options, task=None, **kwargs)
- 带有路由功能(信息)的字符串
- 包含路由规范的字典:会被转成一个
celery.routes.MapRoute
实例 - A list of (pattern, route) tuples:会被转成一个
celery.routes.MapRoute
实例
示例:
task_routes = {
'celery.ping': 'default',
'mytasks.add': 'cpu-bound',
'feed.tasks.*': 'feeds', # <-- glob pattern
re.compile(r'(image|video)\.tasks\..*'): 'media', # <-- regex
'video.encode': {
'queue': 'video',
'exchange': 'media',
'routing_key': 'media.video.encode',
},
}
task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default'})
Where myapp.tasks.route_task
could be:
def route_task(self, name, args, kwargs, options, task=None, **kw):
if task == 'celery.ping':
return {'queue': 'default'}
route_task可以返回字符串或字典。然后,字符串表示它是task_queues中的队列名称,dict表示它是自定义路由.
发送任务时会按照顺序查找路由,第一个不返回None的便是要使用的路由。消息中(任务设置)的选项会和路由选项合并,其中任务设置优先级较高。
比如一个apply_async()
有以下参数:
Task.apply_async(immediate=False, exchange='video',
routing_key='video.compress')
路由返回值为:
{'immediate': True, 'exchange': 'urgent'}
那么最终合并后的选项为:
immediate=False, exchange='video', routing_key='video.compress'
(默认的消息选项在Task类中定义)
task_routes中定义的值在合并时优先级高于在task_queues中定义的值,比如下面的设置:
task_queues = {
'cpubound': {
'exchange': 'cpubound',
'routing_key': 'cpubound',
},
}
task_routes = {
'tasks.add': {
'queue': 'cpubound',
'routing_key': 'tasks.add',
'serializer': 'json',
},
}
最终合并到tasks.add
中会变成:
{'exchange': 'cpubound',
'routing_key': 'tasks.add',
'serializer': 'json'}
Routers查看更多信息
task_queue_max_priority
brokers: RabbitMQ
默认:None
RabbitMQ消息优先级查看更多信息。
task_default_priority
brokers: RabbitMQ
默认:None
RabbitMQ消息优先级查看更多信息。
task_inherit_parent_priority
brokers: RabbitMQ
默认:False
开启后,子任务会继承父任务的优先级
## 任务链中的最后一个任务的优先级也将设置为5。
chain = celery.chain(add.s(2) | add.s(2).set(priority=5) | add.s(3))
当使用delay()
或apply_async()
从父任务调用子任务时,优先级继承也起作用。
查看RabbitMQ消息优先级获取更多信息
worker_direct
默认:Disabled
启用此选项后,每个worker都有一个专用队列,从而可以将任务路由到特定的workers。
每个worker的队列名称都是自动生成的,名称基于hostname
,使用.dq
后缀,using the C.dq
exchange.
比如一个主机名为w1@example.com
的节点上的worker,其队列名将会是:
w1@example.com.dq
然后你就可以指定主机名作为routing_key
,exchange为C.dq
来将任务路由过去:
task_routes = {
'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}
task_create_missing_queues
默认:Enabled
如果开启,任何未在task_queues中定义的指定队列都将自动创建。详情查看:Automatic routing:自动路由.
task_default_queue
默认:"celery"
如果消息没有路由或者没有指定自定义队列,那么.apply_async
会使用默认队列
队列必须在task_queues列出。If task_queues isn't specified then it's automatically created containing one queue entry, where this name is used as the name of that queue.
另请参阅:
更改默认队列的名称
task_default_exchange
默认:Uses the value set for task_default_queue
Name of the default exchange to use when no custom exchange is specified for a key in the task_queues setting.
task_default_exchange_type
默认:"direct"
Default exchange type used when no custom exchange type is specified for a key in the task_queues setting.
task_default_routing_key
默认: Uses the value set for task_default_queue
The default routing key used when no custom routing key is specified for a key in the task_queues setting
task_default_delivery_mode
默认:"persistent"
Can be transient (messages not written to disk) or persistent (written to disk).
Broker设置
broker_url
默认:"amqp://"
默认的broker URL。这个URL的格式需要遵循:
transport://userid:password@hostname:port/virtual_host
其中只有scheme部分(transport://)是必须的,其他部分都是可选的。根据scheme有不同的默认值。
transport部分是要用的broker,默认是amqp,(使用librabbitmq
,如果没有安装则使用pyamqp
),其他的选择包括:redis://
, sqs://
, and qpid://
.
或者你也可以使用自己的实现(自己写一个连接器):
broker_url = 'proj.transports.MyTransport://localhost'
或者可以指定多个:
broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'
或者像这样:
broker_url = [
'transport://userid:password@localhost:port//',
'transport://userid:password@hostname:port//'
]
这些稍后会用在broker_failover_strategy.
在Kombu文档中的URLs部分获取更多信息
broker_read_url / broker_write_url
默认:从broker_url获取
这个设置可以设置不同的连接URL用于生产和消费,用来取代broker_url。
示例:
broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'
他们也都可以设置一个列表用来做主备切换。配置和broker_url一样
broker_failover_strategy
默认:"round-robin"
broker连接对象的故障转移策略,默认是轮询。If supplied,may map to a key in kombu.connection.failover_strategies
, or be a reference to any method that yields a single item from a supplied list.
示例:
## Random failover strategy
def random_failover_strategy(servers):
it = list(servers) # don't modify callers list
shuffle = random.shuffle
for _ in repeat(None):
shuffle(it)
yield it[0]
broker_failover_strategy = random_failover_strategy
broker_heartbeat
transports supported: pyamqp
默认:120.0(由服务器协商)
Note:该值仅由worker使用,clients目前不使用heartbeat。
仅使用TCP/IP并不总是能够及时检测连接丢失,因此AMQP定义了一种称为心跳的东西,client和broker都使用它来检测连接是否关闭。
如果heartbeat的值为10s,则会以broker_heartbeat_checkrate设置的速率进行心跳动作,这个值默认是2,意思是在broker_heartbeat
设置的时间内执行broker_heartbeat_checkrate
次心跳动作。
broker_heartbeat_checkrate
transports supported:pyamqp
默认:2.0
这个值配合broker_heartbeat一起使用,定义出心跳间隔
broker_use_ssl
只支持redis
以及pyamqp
默认:关闭
根据连接器可用选项有所不同
pyamqp
该值设置为True
时使用默认的SSL设置。使用字典来自定义策略。The format used is Python’s ssl.wrap_socket()
options.
注意SSL socket通常有自己专用的端口。
提供客户端证书并根据自定义证书颁发机构验证服务器证书的示例:
import ssl
broker_use_ssl = {
'keyfile': '/var/ssl/private/worker-key.pem',
'certfile': '/var/ssl/amqp-server-cert.pem',
'ca_certs': '/var/ssl/myca.pem',
'cert_reqs': ssl.CERT_REQUIRED
}
注意:小心使用
broker_use_ssl=True
。默认配置下的证书可能不好使,参考Python的ssl module security considerations 这个文档。
redis
和上边一样,但Key不同,Redis这边包括以下四个Keys:
ssl_cert_reqs
:必选,SSLContext.verify_mode
中的一个值:ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
ssl_ca_certs
:可选,CA证书的路径ssl_certfile
:可选,client 证书的路径ssl_keyfile
:可选,client key的路径
注意:
ssl_cert_reqs
应该是required
,optional
或者none
中的一个。(为了向后兼容也可能是CERT_REQUIRED
,CERT_OPTIONAL
,CERT_NONE
)
broker_pool_limit
2.3新增配置
默认:10
连接池中的最大连接数。
2.5之后连接池默认处于开启状态,默认限制为10个连接,根据需要调整。
如果设置为None
或者0
将不使用连接池,每次连接都会建立新的连接。
broker_connection_timeout
默认:4.0
连接AMQP的默认超时时间,这个设置在使用gevent
的时候是关闭的。
Note:
这个设置只是用于连接broker的超时设置。不能用于发送任务,那种情况可以查看broker_transport_options
broker_connection_retry
默认:Enabled
连接broker失败时是否自动重试。并且在broker_connection_max_retries设定的重试次数用完之前,每次重试间隔都会增加。
broker_connection_max_retries
默认:100
最大重试次数,如果设置为0
或者None
,会一直重试
broker_login_method
默认:"AMQPLAIN"
设置amqp登录方式
broker_transport_options
2.2新增
默认:{} 空字典
A dict of additional options passed to the underlying transport.See your transport user manual for supported options (if any).Example setting the visibility timeout (supported by Redis and SQS transports):
broker_transport_options = {'visibility_timeout': 18000} # 5 hours
Example setting the producer connection maximum number of retries (so producers won’t retry forever if the broker isn’t available at the first task execution):
broker_transport_options = {'max_retries': 5}
Worker
imports
默认:[] 空列表
worker启动时要导入的一系列模块。
它不仅用于指定要导入的任务模块,还用于导入信号处理程序和其他远程控制命令等。
模块将按原始顺序导入。
include
默认:[] 空列表
和imports作用相同,可以同时存在,导入顺序在imports之后。
worker_deduplicate_successful_tasks
5.1之后新增
默认:False
执行每个任务之前,让每个worker检查是否是重复任务
仅当Message Broker重新传递具有相同标识符的任务(已启用延迟确认)并且其在结果后端中的状态为成功时,才会发生重复数据删除。
To avoid overflowing the result backend with queries, a local cache of successfully executed tasks is checked before querying the result backend in case the task was already successfully executed by the same worker that received the task.
This cache can be made persistent by setting the worker_state_db setting.
如果结果后端不是持久的(例如,RPC后端),则忽略此设置。
worker_concurrency
默认:CPU核心数量
执行任务的工作线程数量。如果任务主要是I/O任务,则可以尝试适当调大。如果是CPU负载型,建议保持默认。
worker_prefetch_multiplier
默认:4
预读消息系数,可以理解为每个worker每次预读几个消息。一般不用改。请注意第一个启动的worker会接收到更多任务。
要关闭预读,把该值设置为 1 。设置为 0 表示worker会尽量取更多的消息。
更多关于预读的信息,参考:Prefetch Limits
注意:
定时任务、ETA任务不受此配置限制
worker_lost_wait
默认:10S
某些情况下worker意外终止可能导致任务完成前返回一个错误结果。这个配置指定了在抛出一个 WorkerLostError 错误之前等待时间。
worker_max_tasks_per_child
Maximum number of tasks a pool worker process can execute before
it's replaced with a new one. Default is no limit.
worker_max_memory_per_child
默认:无限制。类型:int(KB)
worker在被新工作进程替换之前可能消耗的最大驻留内存量(以KB为单位)。如果单个任务导致worker超过此限制,则该任务将完成,并且该工作人员将在之后被替换。
示例:
worker_max_memory_per_child = 12000 # 12MB
worker_disable_rate_limits
默认:关闭(任务限速默认开启)
关闭所有任务限速,即时设置了明确的限速。
worker_state_db
默认:None
用来持久存储任务状态的文件名,可以是相对路径或者绝对路径,根据Python版本不同可能会增加.db
后缀。
该设置还可以通过celery worker --statedb
进行配置
worker_timer_precision
默认:1.0s
设置ETA计划程序在重新检查计划之间可以休眠的最长时间(秒)。
将此值设置为1秒意味着调度程序的精度将为1秒。如果需要接近毫秒的精度,可以将其设置为0.1。
这是一个时间精度的配置,可能用到的时候才能理解吧。
worker_enable_remote_control
默认:开启
是否开启workers的远程控制
worker_proc_alive_timeout
默认:4.0
等待一个新的worker process启动的超时时间。
worker_cancel_long_running_tasks_on_connection_loss
5.1新增配置。
默认:关闭
Kill all long-running tasks with late acknowledgment enabled on connection loss.
Tasks which have not been acknowledged before the connection loss cannot do so anymore since their channel is gone and the task is redelivered back to the queue. This is why tasks with late acknowledged enabled must be idempotent as they may be executed more than once. In this case, the task is being executed twice per connection loss (and sometimes in parallel in other workers).
When turning this option on, those tasks which have not been completed are cancelled and their execution is terminated. Tasks which have completed in any way before the connection loss are recorded as such in the result backend as long as task_ignore_result is not enabled.
Warning:
6.0之后,该选项会被默认设置为True
Events
worker_send_task_events
默认:关闭
发送 task-related 事件以便任务可以被类似于 flower
之类的工具监控。Sets the default value for the workers
-E``argument.
task_send_sent_event
2.2新增配置
默认:关闭
开启之后,会为每个任务发送一个task-send事件,以便于任务被worker消费之前可以被追踪。
event_queue_ttl
支持:amqp
默认:5s
当消息被发送到一个监控客户端的时间队列时该消息的过期时间。
比如,如果这个值设置为10,那么这个消息会在交付给这个队列10s之后被删除。
event_queue_expires
支持:amqp
默认60.0s
Expiry time in seconds (int/float) for when after a monitor clients
event queue will be deleted (x-expires
).
event_queue_prefix
默认:"celeryev"
事件接收器队列的名称前缀。
event_exchange
默认:"celeryev"
Name of the event exchange.
警告:
实验性质的选项,谨慎使用。
event_serializer
默认:"json"
发送时间消息时使用的序列化格式
更多参考:Serializers:序列化
远程控制命令
提示:
要关闭远程控制命令,可以查看worker_enable_remote_control设置
control_queue_ttl
默认:300
远程控制命令队列中的消息过期之前的时间(秒)。
如果使用默认的300秒,这意味着如果发送了一个远程控制命令,并且在300秒内没有工作人员拾取该命令,则该命令将被丢弃。
这个设置还适用于 remote control reply queues.
control_queue_expires
默认:10
未被使用的remote control command 对列被删除的时间。
这个设置还适用于 remote control reply queues.
control_exchange
默认:"celery"
Name of the control command exchange.
警告:
实验性质,小心使用
日志
worker_hijack_root_logger
2.2新增加配置
默认:开启
简单说就是如果想要使用自定义的日志收集器就设置成False。
worker_log_color
默认:如果日志输出到终端,则默认开启
日志输出是不是有颜色。。
worker_log_format
默认:
"[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
The format to use for log messages,更多信息参考Python的logging模块。
worker_task_log_format
默认:
"[%(asctime)s: %(levelname)s/%(processName)s]
%(task_name)s[%(task_id)s]: %(message)s"
The format to use for log messages logged in tasks,更多信息参考Python的logging模块。
worker_redirect_stdouts
默认:开启
开启后stdout
和stderr
会被重定向到当前的日志记录器
Used by celery worker and celery beat .
worker_redirect_stdouts_level
默认:WARNING
The log level output to stdout and stderr is logged as.Can be one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
安全
security_key
默认:None
2.5新增
当使用Message Signing:消息签名时,用来指定签名消息用的包含私钥的文件的路径,可以是相对路径或绝对路径。
security_certificate
默认:None
2.5新增
当使用Message Signing:消息签名时,用来指定签名消息用的 X.509 证书文件的路径,可以是相对路径或绝对路径。
security_cert_store
默认:None
2.5新增
给Message Signing:消息签名使用的包含 X.509 证书的文件夹,也可以使用通配符,(比如 /etc/certs/*.pem
)
难以理解这配置。。。
security_digest
默认:sha256
4.3新增
当Message Signing:消息签名启用时,签名消息时使用的加密算法。参考链接:Message digests (Hashing)
。。没啥必要改,也没啥必要看
自定义组件类(高级
worker_pool
默认:"prefork" (celery.concurrency.prefork:TaskPool)
worker 使用的 pool class 的名称。
worker_autoscaler
2.2新增
默认:"celery.worker.autoscale:Autoscaler"
autoscaler使用的类名
worker_consumer
默认:"celery.worker.consumer:Consumer"
worker使用的 consumer 类的类名
worker_timer
默认:"kombu.asynchronous.hub.timer:Timer"
Name of the ETA scheduler class used by the worker. Default is or set by the pool implementation.
Beat Settings (celery beat)
beat_schedule
默认: {} (empty mapping).
The periodic task schedule used by beat. See Entries.
beat_scheduler
默认: "celery.beat:PersistentScheduler".
The default scheduler class. May be set to "django_celery_beat.schedulers:DatabaseScheduler" for instance, if used alongside django-celery-beat extension.
Can also be set via the celery beat -S argument.
beat_schedule_filename
默认: "celerybeat-schedule".
Name of the file used by PersistentScheduler to store the last run times of periodic tasks. Can be a relative or absolute path, but be aware that the suffix .db may be appended to the file name (depending on Python version).
Can also be set via the celery beat --schedule argument.
beat_sync_every
默认: 0.
The number of periodic tasks that can be called before another database sync is issued. A value of 0 (default) means sync based on timing - default of 3 minutes as determined by scheduler.sync_every. If set to 1, beat will call sync after every task message sent.
beat_max_loop_interval
默认: 0.
The maximum number of seconds beat can sleep between checking the schedule.
The default for this value is scheduler specific. For the default Celery beat scheduler the value is 300 (5 minutes), but for the django-celery-beat database scheduler it’s 5 seconds because the schedule may be changed externally, and so it must take changes to the schedule into account.
Also when running Celery beat embedded (-B) on Jython as a thread the max interval is overridden and set to 1 so that it’s possible to shut down in a timely manner.