配置和默认配置

示例配置

一个包含基本配置的示例:

### Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

## List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

### Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

新的小写设置

4.0新增加了小写设置。下面是对照表。

旧版配置格式在6.0之后将不再支持,尽快升级。Celery提供了升级命令,大概这儿open in new window能看到。

Setting nameReplace with
CELERY_ACCEPT_CONTENTaccept_content
CELERY_ENABLE_UTCenable_utc
CELERY_IMPORTSimports
CELERY_INCLUDEinclude
CELERY_TIMEZONEtimezone
CELERYBEAT_MAX_LOOP_INTERVALbeat_max_loop_interval
CELERYBEAT_SCHEDULEbeat_schedule
CELERYBEAT_SCHEDULERbeat_scheduler
CELERYBEAT_SCHEDULE_FILENAMEbeat_schedule_filename
CELERYBEAT_SYNC_EVERYbeat_sync_every
BROKER_URLbroker_url
BROKER_TRANSPORTbroker_transport
BROKER_TRANSPORT_OPTIONSbroker_transport_options
BROKER_CONNECTION_TIMEOUTbroker_connection_timeout
BROKER_CONNECTION_RETRYbroker_connection_retry
BROKER_CONNECTION_MAX_RETRIESbroker_connection_max_retries
BROKER_FAILOVER_STRATEGYbroker_failover_strategy
BROKER_HEARTBEATbroker_heartbeat
BROKER_LOGIN_METHODbroker_login_method
BROKER_POOL_LIMITbroker_pool_limit
BROKER_USE_SSLbroker_use_ssl
CELERY_CACHE_BACKENDcache_backend
CELERY_CACHE_BACKEND_OPTIONScache_backend_options
CASSANDRA_COLUMN_FAMILY((20220807005220-jh8rb2w "cassandra_table"))
CASSANDRA_ENTRY_TTLcassandra_entry_ttl
CASSANDRA_KEYSPACEcassandra_keyspace
CASSANDRA_PORTcassandra_port
CASSANDRA_READ_CONSISTENCYcassandra_read_consistency
CASSANDRA_SERVERScassandra_servers
CASSANDRA_WRITE_CONSISTENCYcassandra_write_consistency
CASSANDRA_OPTIONScassandra_options
S3_ACCESS_KEY_IDs3_access_key_id
S3_SECRET_ACCESS_KEYs3_secret_access_key
S3_BUCKETs3_bucket
S3_BASE_PATHs3_base_path
S3_ENDPOINT_URLs3_endpoint_url
S3_REGIONs3_region
CELERY_COUCHBASE_BACKEND_SETTINGScouchbase_backend_settings
CELERY_ARANGODB_BACKEND_SETTINGSarangodb_backend_settings
CELERY_MONGODB_BACKEND_SETTINGSmongodb_backend_settings
CELERY_EVENT_QUEUE_EXPIRESevent_queue_expires
CELERY_EVENT_QUEUE_TTLevent_queue_ttl
CELERY_EVENT_QUEUE_PREFIXevent_queue_prefix
CELERY_EVENT_SERIALIZERevent_serializer
CELERY_REDIS_DBredis_db
CELERY_REDIS_HOSTredis_host
CELERY_REDIS_MAX_CONNECTIONSredis_max_connections
CELERY_REDIS_USERNAMEredis_username
CELERY_REDIS_PASSWORDredis_password
CELERY_REDIS_PORTredis_port
CELERY_REDIS_BACKEND_USE_SSLredis_backend_use_ssl
CELERY_RESULT_BACKENDresult_backend
CELERY_MAX_CACHED_RESULTSresult_cache_max
CELERY_MESSAGE_COMPRESSIONresult_compression
CELERY_RESULT_EXCHANGEresult_exchange
CELERY_RESULT_EXCHANGE_TYPEresult_exchange_type
CELERY_RESULT_EXPIRESresult_expires
CELERY_RESULT_PERSISTENTresult_persistent
CELERY_RESULT_SERIALIZERresult_serializer
CELERY_RESULT_DBURIUse result_backend instead.
CELERY_RESULT_ENGINE_OPTIONSdatabase_engine_options
[...]_DB_SHORT_LIVED_SESSIONSdatabase_short_lived_sessions
CELERY_RESULT_DB_TABLE_NAMESdatabase_db_names
CELERY_SECURITY_CERTIFICATEsecurity_certificate
CELERY_SECURITY_CERT_STOREsecurity_cert_store
CELERY_SECURITY_KEYsecurity_key
CELERY_ACKS_LATEtask_acks_late
CELERY_ACKS_ON_FAILURE_OR_TIMEOUTtask_acks_on_failure_or_timeout
CELERY_ALWAYS_EAGERtask_always_eager
CELERY_ANNOTATIONStask_annotations
CELERY_COMPRESSIONtask_compression
CELERY_CREATE_MISSING_QUEUEStask_create_missing_queues
CELERY_DEFAULT_DELIVERY_MODEtask_default_delivery_mode
CELERY_DEFAULT_EXCHANGEtask_default_exchange
CELERY_DEFAULT_EXCHANGE_TYPEtask_default_exchange_type
CELERY_DEFAULT_QUEUEtask_default_queue
CELERY_DEFAULT_RATE_LIMITtask_default_rate_limit
CELERY_DEFAULT_ROUTING_KEYtask_default_routing_key
CELERY_EAGER_PROPAGATEStask_eager_propagates
CELERY_IGNORE_RESULTtask_ignore_result
CELERY_PUBLISH_RETRYtask_publish_retry
CELERY_PUBLISH_RETRY_POLICYtask_publish_retry_policy
CELERY_QUEUEStask_queues
CELERY_ROUTEStask_routes
CELERY_SEND_SENT_EVENTtask_send_sent_event
CELERY_SERIALIZERtask_serializer
CELERYD_SOFT_TIME_LIMITtask_soft_time_limit
CELERY_TASK_TRACK_STARTEDtask_track_started
CELERY_TASK_REJECT_ON_WORKER_LOSTtask_reject_on_worker_lost
CELERYD_TIME_LIMITtask_time_limit
CELERYD_AGENTworker_agent
CELERYD_AUTOSCALERworker_autoscaler
CELERYD_CONCURRENCYworker_concurrency
CELERYD_CONSUMERworker_consumer
CELERY_WORKER_DIRECTworker_direct
CELERY_DISABLE_RATE_LIMITSworker_disable_rate_limits
CELERY_ENABLE_REMOTE_CONTROLworker_enable_remote_control
CELERYD_HIJACK_ROOT_LOGGERworker_hijack_root_logger
CELERYD_LOG_COLORworker_log_color
CELERYD_LOG_FORMATworker_log_format
CELERYD_WORKER_LOST_WAITworker_lost_wait
CELERYD_MAX_TASKS_PER_CHILDworker_max_tasks_per_child
CELERYD_POOLworker_pool
CELERYD_POOL_PUTLOCKSworker_pool_putlocks
CELERYD_POOL_RESTARTSworker_pool_restarts
CELERYD_PREFETCH_MULTIPLIERworker_prefetch_multiplier
CELERYD_REDIRECT_STDOUTSworker_redirect_stdouts
CELERYD_REDIRECT_STDOUTS_LEVELworker_redirect_stdouts_level
CELERY_SEND_EVENTSworker_send_task_events
CELERYD_STATE_DBworker_state_db
CELERYD_TASK_LOG_FORMATworker_task_log_format
CELERYD_TIMERworker_timer
CELERYD_TIMER_PRECISIONworker_timer_precision

配置指令

一般配置

accept_content

默认:{'json'} (set, list, or tuple)

Content-types/serializers可接受的格式白名单

白名单之外的格式的消息将会被丢弃,同时伴随一个错误。

默认只有json是开启的,可以添加任意content type,包括pickle和yaml。这种情况下确保只有可信的客户端才能访问broker。更多信息参考:Securityopen in new window

示例:

## using serializer name
accept_content = ['json']

## or the actual content-type (MIME)
accept_content = ['application/json']

result_accept_content

默认:None(可以是set, list, tuple)

4.3新增加配置

结果后端可接受的 content-types/serializers 格式白名单。

白名单之外的格式的消息将会被丢弃,同时伴随一个错误。

默认情况下和accept_content一样,也可以指定不同的序列化程序。通常,如果使用签名消息,并且结果以未签名的形式存储在结果后端,则需要执行此操作。

示例:

## using serializer name
result_accept_content = ['json']

## or the actual content-type (MIME)
result_accept_content = ['application/json']

时间和日期设置

enable_utc

2.5增加配置

默认:3.0之后默认开启

开启后消息中的日期和时间会被转成UTC时间

Note that workers running Celery versions below 2.5 will assume a local timezone for all messages, so only enable if all workers have been upgraded.

timezone

2.5新增配置

默认:"UTC"

设置Celery使用的时区。所有 pytzopen in new window 库中的时区都可用。

If not set the UTC timezone is used. For backwards compatibility there’s also a enable_utc setting, and when this is set to false the system local timezone is used instead.

任务设置

task_annotations

2.5新增配置

默认:None

该设置可以用来重写任何从配置中读取的task属性。该设置可以是一个词典,也可以是一个注释对象列表,用于筛选任务并返回要更改的属性映射。

更改tasks.add任务中的rate_limit属性:

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

或者修改所有任务:

task_annotations = {'*': {'rate_limit': '10/s'}}

还可以修改方法,比如on_failure

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
    print('Oh no! Task failed: {0!r}'.format(exc))

task_annotations = {'*': {'on_failure': my_on_failure}}

如果您需要更高的灵活性,则可以使用对象而不是字典来选择要批注的任务:

class MyAnnotate:

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})

task_compression

默认:None

任务消息使用的压缩方式。可以是gzip,bzip2(如果可用),或者任何其他压缩格式 registered in the Kombu compression registry。(kombu是python的一个消息库)

默认情况下发送不压缩的消息。

task_protocol

默认:2(4.0开始)

设置消息协议的版本。支持的协议版本为1和2。4.x+以及3.1.24支持协议2

task_serializer

默认:"json" (4.0开始,早期版本为pickle)

指定要使用的默认的序列化方式(值类型为字符串)。可以是json,pickle,yaml,msgpack或者其他在kombu.serialization.registry注册的自定义序列化方法。

See also:

Serializersopen in new window.

task_publish_retry

2.2新增配置

默认: Enabled.

决定在连接中断或其他连接错误的情况下是否重试发布任务消息。另请参阅 task_publish_retry_policy.

task_publish_retry_policy

2.2新增配置

默认: 查看 Message Sending Retry.

定义在连接中断或其他连接错误的情况下重试发布任务消息时的默认策略。

任务执行设置

task_always_eager

默认:Disabled

如果为True,所有任务将在本地阻塞执行直到返回。apply_async()Task.delay()将返回一个EagerResultopen in new window实例,that emulates the API and behavior of AsyncResult``, except the result is already evaluated.

就是说,任务不会发送到队列中,而是本地执行。

task_eager_propagates

默认: Disabled.

如果为 True, 紧急任务 (通过 task.apply() 执行,或者开启了task_always_eager), will propagate exceptions.

It's the same as always running apply() with throw=True.

task_store_eager_result

5.1新增配置

默认: Disabled.

如果为True 并且task_always_eager为 True以及task_ignore_result为 False,那么 eagerly(紧急的) 执行的任务结果会被保存到后端。

默认情况下,即使task_always_eager为 True以及task_ignore_result为 False,那么 eagerly(紧急的) 执行的任务结果也不会被保存到后端。

task_remote_tracebacks

默认: Disabled.

如果开启,任务结果将包括引发错误的堆栈信息。

这个需要 tblibopen in new window 库,可以通过下面的命令安装。

pip install celery[tblib]

task_ignore_result

默认: Disabled.

是否存储任务返回值,如果你希望存储错误信息而不仅仅是成功的返回值,可以设置task_store_errors_even_if_ignored

task_store_errors_even_if_ignored

默认: Disabled.

设置后worker会存储所有任务的错误信息,即使是开启了Task.ignore_result``

task_track_started

默认: Disabled.

如果设置为 True ,则任务被执行时会报告一个started的状态。默认是False。任务总是处于挂起、已完成或等待重试状态。当有长时间运行的任务,并且需要报告当前正在运行的任务时,拥有“已启动”状态会很有用。

task_time_limit

默认:不限制

任务硬执行时间限制,单位为秒,超时后任务会被终止并替换为新的任务。

task_soft_time_limit

默认: 不限制

任务的软超时时长,以秒为单位。

任务超时会受到 SoftTimeLimitExceeded`` 错误,在硬限制超时到来前可以捕捉到它来执行清理任务:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()

task_acks_late

默认: Disabled.

Late ack means the task messages will be acknowledged after the task has been executed, not just before (the default behavior).

See also:

FAQ: Should I use retry or acks_late?open in new window.

task_acks_on_failure_or_timeout

默认: Enabled

When enabled messages for all tasks will be acknowledged even if they fail or time out.

Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if
task_acks_late is enabled.

task_reject_on_worker_lost

默认: Disabled.

即使开启 task_acks_late, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning:

开启这个选项可能导致消息陷入循环,确保知道自己在做什么。

task_default_rate_limit

默认: 不限制.

全局的默认任务速率限制

该值用于没有自定义速率限制的任务

See also:

worker_disable_rate_limits设置可以关闭所有限速

任务结果后端设置

result_backend

默认:没有默认后端

后端用来存储任务结果,可以是下面的任意一个:

  • rpc:RPC 后端设置
  • database:数据库后端设置
  • redis:Redis后端设置
  • cache:Cache后端设置
  • mongodb:MongoDB 后端设置
  • cassandra:Cassandra后端设置
  • elasticsearch:Elasticsearch后端设置
  • ironcache:IronCache后端设置
  • couchbase:Couchbase后端设置
  • arangodb:ArangoDB后端设置
  • couchdb:CouchDB后端设置
  • cosmosdbsql(实验性):CosmosDB后端设置(实验性
  • filesystem:File-system后端设置
  • consul:Consul 存储后端设置
  • azureblockblob:Azure Block Blob后端设置
  • s3:S3后端设置

result_backend_always_retry

默认:False

If enable, backend will try to retry on the event of recoverable exceptions instead of propagating the exception.It will use an exponential backoff sleep time between 2 retries.

result_backend_max_sleep_between_retries_ms

默认: 10000

This specifies the maximum sleep time between two backend operation retry.

result_backend_base_sleep_between_retries_ms

默认: 10

This specifies the base amount of sleep time between two backend operation retry.

result_backend_max_retries

默认: Inf

这是可恢复异常情况下的最大重试次数。

result_backend_transport_options

默认: {} (空字典).

A dict of additional options passed to the underlying transport.

See your transport user manual for supported options (如果有的话).

Example setting the visibility timeout (supported by Redis and SQS transports):

result_backend_transport_options = {'visibility_timeout': 18000}  # 5 hours

result_serializer

默认: json 从 4.0 (早期版本: pickle).

结果序列化格式

查看Serializers:序列化获取更多序列化信息

result_compression

默认: 不压缩

给任务结果用的可选的压缩方法。和task_compression设置的选项相同。

result_extended

默认: False

将任务的扩展属性 (name, args, kwargs, worker, retries, queue, delivery_info) 写入后端。

result_expires

默认: 一天后过期

Time (in seconds, or a timedelta`` object) for when after stored task tombstones will be deleted.

A built-in periodic task will delete the results after this time (celery.backend_cleanup), assuming that celery beat is enabled. The task runs daily at 4am.

A value of None or 0 means results will never expire (depending on backend specifications).

Note:

For the moment this only works with the AMQP, database, cache, Couchbase, and Redis backends.

When using the database backend, celery beat must be running for the results to be expired.

result_cache_max

默认: Disabled

Enables client caching of results.

This can be useful for the old deprecated 'amqp' backend where the result is unavailable as soon as one result instance consumes it.

This is the total number of results to cache before older results are evicted. A value of 0 or None means no limit, and a value of -1 will disable the cache.

result_chord_join_timeout

默认: 3.0

The timeout in seconds (int/float) when joining a group’s results within a chord.

result_chord_retry_interval

默认: 1.0

重试定时任务的时间间隔

override_backends

默认: Disabled

实现后端的类的路径

允许覆盖后端实现。如果您需要存储有关已执行任务的其他元数据、覆盖重试策略等,则此功能非常有用。

示例:

override_backends = {"db": "custom_module.backend.class"}

数据库后端设置

示例

要使用数据库作为后端,需要在result_backend设置中配置一个db+prefix的URL:

result_backend = 'db+scheme://user:password@host:port/dbname'

示例:

## sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

## mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

## postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

## oracle
result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'

Please see Supported Databasesopen in new window for a table of supported databases,and Connection Stringopen in new window for more information about connectionstrings (this is the part of the URI that comes after the db+prefix).

database_engine_options

默认:{} 空字典

使用该设置可以指定其他SQLAlchemy数据库引擎选项:

## echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}

database_short_lived_sessions

默认:Disabled

默认关闭,如果启用,它们可以显著降低性能,尤其是在处理大量任务的系统上。此选项对于由于缓存的数据库连接因不活动而过时而遇到错误的低流量情况非常有用,例如,可以通过启用短命会话来修复间歇性错误,如(OperationalError)(2006,“MySQL server has gone away”)。此选项仅影响数据库后端。

database_table_schemas

默认:{} 空字典

在使用SQLAlchemy 作为结果后端时,Celery会自动创建两张表来存储任务的元数据。这个设置允许你自定义表结构:

## use custom schema for the database result backend.
database_table_schemas = {
    'task': 'celery',
    'group': 'celery',
}

SQLAlchemy不是个数据库,是一个类似于ORM的玩意儿。

database_table_names

默认:{} 空字典

在使用SQLAlchemy 作为结果后端时,Celery会自动创建两张表来存储任务的元数据。这个设置允许你自定义表的名字:

## use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}

RPC 后端设置

result_persistent

Default: Disabled by default (transient messages).

If set to True, result messages will be persistent. This means the
messages won’t be lost after a broker restart.

Example configuration
result_backend = 'rpc://'
result_persistent = False

Please note : using this backend could trigger the raise of celery.backends.rpc.BacklogLimitExceeded if the task tombstone is too old .

E.g.

for i in range(10000):
    r = debug_task.delay()

print(r.state)  # this would raise celery.backends.rpc.BacklogLimitExceeded

Cache后端设置

Note:
Cache 后端支持pylibmcpython-memcached两个库。优先使用pylibmc

使用单个Membercached server:

result_backend = 'cache+memcached://127.0.0.1:11211/'

多个:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

The “memory” backend stores the cache in memory only:

result_backend = 'cache'
cache_backend = 'memory'

cache_backend_options

默认:{} 空字典

可以使用这个选项设置pylibmc库的选项:

cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}

cache_backend

不再使用该设置,该设置的功能已集成到result_backend中。

Note:
The django-celery-results - Using the Django ORM/Cache as a result backendopen in new window library uses cache_backend for choosing django caches.

MongoDB 后端设置

Note:
需要pymongo

mongodb_backend_settings

这是个字典,支持的keys如下:

  • database

      要链接的库的名称,默认是`celery`.
    
  • taskmeta_collection

      存储任务元数据的`collection`的名称,默认是`celery_taskmeta`.
    
  • max_pool_size

      Passed as max_pool_size to PyMongo’s Connection or MongoClient constructor. It is the maximum number of TCP connections to keep open to MongoDB at a given time. If there are more open connections than max_pool_size, sockets will be closed when they are released. Defaults to 10.
    
  • options

      Additional keyword arguments to pass to the mongodb connection constructor.  See the `pymongo` docs to see a list of arguments supported.
    
Example configuration
result_backend = 'mongodb://localhost:27017/'
mongodb_backend_settings = {
    'database': 'mydb',
    'taskmeta_collection': 'my_taskmeta_collection',
}

Redis后端设置

配置后端URL

注意:使用Redis作为后端需要安装 redis库,使用pip install celery[redis]来安装。查看 Bundlesopen in new window 获取更多信息。

几种配置方式:

## 格式
result_backend = 'redis://username:password@host:port/db'

## 两个示例,含义相同。不指定具体地址和 dbnumber 的时候会使用localhost和db0
result_backend = 'redis://localhost/0'
result_backend = 'redis://'

## TLS连接格式
result_backend = 'rediss://username:password@host:port/db?ssl_cert_reqs=required'

## socket连接格式
result_backend = 'socket:///path/to/redis.sock'

注意:ssl_cert_reqs应该是required,optional或者none中的一个。(为了向后兼容也可能是CERT_REQUIRED, CERT_OPTIONAL, CERT_NONE

URL中的字段定义如下:

  1. username

5.1.0新加入字段

只支持Redis>=6.0以及py-redis>=3.4.0

使用旧版本时可以省略该字段:result_backend = 'redis://:password@host:port/db'

  1. password : 密码
  2. host : 域名或IP地址
  3. port : Redis端口,默认6379
  4. db : db number,默认是0

当使用TLS连接时,你可以在broker_use_ssl 中传递所有的值。Paths to certificates must be URL encoded, and ssl_cert_reqs is required.示例:

result_backend = 'rediss://:password@host:port/db?\
    ssl_cert_reqs=required\
    &ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem\                  # /var/ssl/myca.pem
    &ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem\     # /var/ssl/redis-server-cert.pem
    &ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem'   # /var/ssl/private/worker-key.pem

这个地方有个歧义,不知道这句话是说ssl_cert_reqs这个参数是必须的还是说这个参数必须被设置从required 。不过这一段下边依然有上边提到的注意事项,推测是说这个参数必须有,而不是必须设置为required

注意:ssl_cert_reqs应该是required,optional或者none中的一个。(为了向后兼容也可能是CERT_REQUIRED, CERT_OPTIONAL, CERT_NONE

redis_backend_health_check_interval

默认:未配置

健康检查间隔,也就是 x 秒检查一次是否存活。如果检查时碰到了超时或者连接错误,会重试一次。

redis_backend_use_ssl

默认:关闭

Redis后端支持SSL。该值需要以字典形式提供。可用的参数和broker_use_ssl下的 redis 小节相同

redis_max_connections

默认:无限制

最大连接数,超过之后会报一个ConnectionError错误

redis_socket_connect_timeout

4.0.1新增加

默认:None

连接Socket的超时时间 (int/float)。

redis_socket_timeout

默认:120.0 Seconds。

读写操作时Socket的超时时间 (int/float),Redis作为后端时使用的配置。

redis_retry_on_timeout

4.4.1新增加

默认:False

上一个配置超时之后是否重试,Redis作为后端时使用的配置。使用Unix socket连接的Redis不要设置该值。

redis_socket_keepalive

4.4.1新增加

默认:False

是否保持TCP连接。Redis作为后端时使用的配置。

Cassandra后端设置

S3后端设置

Azure Block Blob后端设置

Elasticsearch后端设置

To use Elasticsearch as the result backend you simply need to configure the result_backend setting with the correct URL.
Example configuration

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'

elasticsearch_retry_on_timeout

Default: False

超时是否在不同个节点上重试

elasticsearch_max_retries

Default: 3.

抛出异常之前重试的次数

elasticsearch_timeout

Default: 10.0 seconds.

Elasticsearch后端使用的全局超时时间

elasticsearch_save_meta_as_text

Default: True

Should meta saved as text or as native json. Result is always serialized as text.

AWS DynamoDB后端设置

IronCache后端设置

Couchbase后端设置

ArangoDB后端设置

CosmosDB后端设置(实验性

CouchDB后端设置

File-system后端设置

This backend can be configured using a file URL, for example:

CELERY_RESULT_BACKEND = 'file:///var/celery/results'

The configured directory needs to be shared and writable by all servers using the backend.

If you're trying Celery on a single system you can simply use the backend without any further configuration. For larger clusters you could use NFS, GlusterFS, CIFS, HDFS (using FUSE), or any other file-system.

Consul 存储后端设置

消息路由

task_queues

默认:None

多数用户可能更应该使用Automatic routing:自动路由,而不是这个设置。

如果想要配置高级路由,这个设置是worker从kombu.Queueopen in new window对象取的一个列表。

该设置可以使用 -Q选项覆盖,此列表中的各个队列可以使用-X选项通过名称排除。

查看Basics获取更多信息。

The default is a queue/exchange/binding key of celery, with exchange type direct.

另请参阅task_routes

task_routes

默认:None

路由器列表,或用于将任务路由到队列的单个路由。到达目的之前会按照顺序查询路由列表。

一条路由可以被指定为:

  • 带有签名的函数(name, args, kwargs, options, task=None, **kwargs)
  • 带有路由功能(信息)的字符串
  • 包含路由规范的字典:会被转成一个celery.routes.MapRoute实例
  • A list of (pattern, route) tuples:会被转成一个celery.routes.MapRoute实例

示例:

task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media',
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default'})

Where myapp.tasks.route_task could be:

def route_task(self, name, args, kwargs, options, task=None, **kw):
    if task == 'celery.ping':
        return {'queue': 'default'}

route_task可以返回字符串或字典。然后,字符串表示它是task_queues中的队列名称,dict表示它是自定义路由.

发送任务时会按照顺序查找路由,第一个不返回None的便是要使用的路由。消息中(任务设置)的选项会和路由选项合并,其中任务设置优先级较高。

比如一个apply_async()有以下参数:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')

路由返回值为:

{'immediate': True, 'exchange': 'urgent'}

那么最终合并后的选项为:

immediate=False, exchange='video', routing_key='video.compress'

(默认的消息选项在Taskopen in new window类中定义)

task_routes中定义的值在合并时优先级高于在task_queues中定义的值,比如下面的设置:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}

最终合并到tasks.add中会变成:

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}

Routers查看更多信息

task_queue_max_priority

brokers: RabbitMQ

默认:None

RabbitMQ消息优先级查看更多信息。

task_default_priority

brokers: RabbitMQ

默认:None

RabbitMQ消息优先级查看更多信息。

task_inherit_parent_priority

brokers: RabbitMQ

默认:False

开启后,子任务会继承父任务的优先级

## 任务链中的最后一个任务的优先级也将设置为5。
chain = celery.chain(add.s(2) | add.s(2).set(priority=5) | add.s(3))

当使用delay()apply_async()从父任务调用子任务时,优先级继承也起作用。

查看RabbitMQ消息优先级获取更多信息

worker_direct

默认:Disabled

启用此选项后,每个worker都有一个专用队列,从而可以将任务路由到特定的workers。

每个worker的队列名称都是自动生成的,名称基于hostname,使用.dq后缀,using the C.dq exchange.

比如一个主机名为w1@example.com的节点上的worker,其队列名将会是:

w1@example.com.dq

然后你就可以指定主机名作为routing_key,exchange为C.dq来将任务路由过去:

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}

task_create_missing_queues

默认:Enabled

如果开启,任何未在task_queues中定义的指定队列都将自动创建。详情查看:Automatic routing:自动路由.

task_default_queue

默认:"celery"

如果消息没有路由或者没有指定自定义队列,那么.apply_async会使用默认队列

队列必须在task_queues列出。If task_queues isn't specified then it's automatically created containing one queue entry, where this name is used as the name of that queue.

另请参阅:
更改默认队列的名称

task_default_exchange

默认:Uses the value set for task_default_queue

Name of the default exchange to use when no custom exchange is specified for a key in the task_queues setting.

task_default_exchange_type

默认:"direct"

Default exchange type used when no custom exchange type is specified for a key in the task_queues setting.

task_default_routing_key

默认: Uses the value set for task_default_queue

The default routing key used when no custom routing key is specified for a key in the task_queues setting

task_default_delivery_mode

默认:"persistent"

Can be transient (messages not written to disk) or persistent (written to disk).

Broker设置

broker_url

默认:"amqp://"

默认的broker URL。这个URL的格式需要遵循:

transport://userid:password@hostname:port/virtual_host

其中只有scheme部分(transport://)是必须的,其他部分都是可选的。根据scheme有不同的默认值。

transport部分是要用的broker,默认是amqp,(使用librabbitmq,如果没有安装则使用pyamqp),其他的选择包括:redis://, sqs://, and qpid://.

或者你也可以使用自己的实现(自己写一个连接器):

broker_url = 'proj.transports.MyTransport://localhost'

或者可以指定多个:

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

或者像这样:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

这些稍后会用在broker_failover_strategy.

在Kombu文档中的URLsopen in new window部分获取更多信息

broker_read_url / broker_write_url

默认:从broker_url获取

这个设置可以设置不同的连接URL用于生产和消费,用来取代broker_url。

示例:

broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'

他们也都可以设置一个列表用来做主备切换。配置和broker_url一样

broker_failover_strategy

默认:"round-robin"

broker连接对象的故障转移策略,默认是轮询。If supplied,may map to a key in kombu.connection.failover_strategies, or be a reference to any method that yields a single item from a supplied list.

示例:

## Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy

broker_heartbeat

transports supported: pyamqp

默认:120.0(由服务器协商)

Note:该值仅由worker使用,clients目前不使用heartbeat。

仅使用TCP/IP并不总是能够及时检测连接丢失,因此AMQP定义了一种称为心跳的东西,client和broker都使用它来检测连接是否关闭。

如果heartbeat的值为10s,则会以broker_heartbeat_checkrate设置的速率进行心跳动作,这个值默认是2,意思是在broker_heartbeat设置的时间内执行broker_heartbeat_checkrate次心跳动作。

broker_heartbeat_checkrate

transports supported:pyamqp

默认:2.0

这个值配合broker_heartbeat一起使用,定义出心跳间隔

broker_use_ssl

只支持redis以及pyamqp

默认:关闭

根据连接器可用选项有所不同

pyamqp

该值设置为True时使用默认的SSL设置。使用字典来自定义策略。The format used is Python’s ssl.wrap_socket() options.

注意SSL socket通常有自己专用的端口。

提供客户端证书并根据自定义证书颁发机构验证服务器证书的示例:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

注意:小心使用broker_use_ssl=True。默认配置下的证书可能不好使,参考Python的ssl module security considerationsopen in new window 这个文档。

redis

和上边一样,但Key不同,Redis这边包括以下四个Keys:

  • ssl_cert_reqs:必选,SSLContext.verify_mode中的一个值:

    • ssl.CERT_NONE
    • ssl.CERT_OPTIONAL
    • ssl.CERT_REQUIRED
  • ssl_ca_certs:可选,CA证书的路径

  • ssl_certfile:可选,client 证书的路径

  • ssl_keyfile:可选,client key的路径

注意:ssl_cert_reqs应该是required,optional或者none中的一个。(为了向后兼容也可能是CERT_REQUIRED, CERT_OPTIONAL, CERT_NONE

broker_pool_limit

2.3新增配置

默认:10

连接池中的最大连接数。

2.5之后连接池默认处于开启状态,默认限制为10个连接,根据需要调整。

如果设置为None或者0将不使用连接池,每次连接都会建立新的连接。

broker_connection_timeout

默认:4.0

连接AMQP的默认超时时间,这个设置在使用gevent的时候是关闭的。

Note:
这个设置只是用于连接broker的超时设置。不能用于发送任务,那种情况可以查看broker_transport_options

broker_connection_retry

默认:Enabled

连接broker失败时是否自动重试。并且在broker_connection_max_retries设定的重试次数用完之前,每次重试间隔都会增加。

broker_connection_max_retries

默认:100

最大重试次数,如果设置为0或者None,会一直重试

broker_login_method

默认:"AMQPLAIN"

设置amqp登录方式

broker_transport_options

2.2新增

默认:{} 空字典

A dict of additional options passed to the underlying transport.See your transport user manual for supported options (if any).Example setting the visibility timeout (supported by Redis and SQS transports):

broker_transport_options = {'visibility_timeout': 18000}  # 5 hours

Example setting the producer connection maximum number of retries (so producers won’t retry forever if the broker isn’t available at the first task execution):

broker_transport_options = {'max_retries': 5}

Worker

imports

默认:[] 空列表

worker启动时要导入的一系列模块。

它不仅用于指定要导入的任务模块,还用于导入信号处理程序和其他远程控制命令等。

模块将按原始顺序导入。

include

默认:[] 空列表

和imports作用相同,可以同时存在,导入顺序在imports之后。

worker_deduplicate_successful_tasks

5.1之后新增

默认:False

执行每个任务之前,让每个worker检查是否是重复任务

仅当Message Broker重新传递具有相同标识符的任务(已启用延迟确认)并且其在结果后端中的状态为成功时,才会发生重复数据删除。

To avoid overflowing the result backend with queries, a local cache of successfully executed tasks is checked before querying the result backend in case the task was already successfully executed by the same worker that received the task.

This cache can be made persistent by setting the worker_state_db setting.

如果结果后端不是持久的(例如,RPC后端),则忽略此设置。

worker_concurrency

默认:CPU核心数量

执行任务的工作线程数量。如果任务主要是I/O任务,则可以尝试适当调大。如果是CPU负载型,建议保持默认。

worker_prefetch_multiplier

默认:4

预读消息系数,可以理解为每个worker每次预读几个消息。一般不用改。请注意第一个启动的worker会接收到更多任务。

要关闭预读,把该值设置为 1 。设置为 0 表示worker会尽量取更多的消息。

更多关于预读的信息,参考:Prefetch Limits

注意:
定时任务、ETA任务不受此配置限制

worker_lost_wait

默认:10S

某些情况下worker意外终止可能导致任务完成前返回一个错误结果。这个配置指定了在抛出一个 WorkerLostErroropen in new window 错误之前等待时间。

worker_max_tasks_per_child

Maximum number of tasks a pool worker process can execute before
it's replaced with a new one. Default is no limit.

worker_max_memory_per_child

默认:无限制。类型:int(KB)

worker在被新工作进程替换之前可能消耗的最大驻留内存量(以KB为单位)。如果单个任务导致worker超过此限制,则该任务将完成,并且该工作人员将在之后被替换。

示例:

worker_max_memory_per_child = 12000  # 12MB

worker_disable_rate_limits

默认:关闭(任务限速默认开启)

关闭所有任务限速,即时设置了明确的限速。

worker_state_db

默认:None

用来持久存储任务状态的文件名,可以是相对路径或者绝对路径,根据Python版本不同可能会增加.db后缀。

该设置还可以通过celery worker --statedb进行配置

worker_timer_precision

默认:1.0s

设置ETA计划程序在重新检查计划之间可以休眠的最长时间(秒)。

将此值设置为1秒意味着调度程序的精度将为1秒。如果需要接近毫秒的精度,可以将其设置为0.1。

这是一个时间精度的配置,可能用到的时候才能理解吧。

worker_enable_remote_control

默认:开启

是否开启workers的远程控制

worker_proc_alive_timeout

默认:4.0

等待一个新的worker process启动的超时时间。

worker_cancel_long_running_tasks_on_connection_loss

5.1新增配置。

默认:关闭

Kill all long-running tasks with late acknowledgment enabled on connection loss.

Tasks which have not been acknowledged before the connection loss cannot do so anymore since their channel is gone and the task is redelivered back to the queue. This is why tasks with late acknowledged enabled must be idempotent as they may be executed more than once. In this case, the task is being executed twice per connection loss (and sometimes in parallel in other workers).

When turning this option on, those tasks which have not been completed are cancelled and their execution is terminated. Tasks which have completed in any way before the connection loss are recorded as such in the result backend as long as task_ignore_result is not enabled.

Warning:
6.0之后,该选项会被默认设置为True

Events

worker_send_task_events

默认:关闭

发送 task-related 事件以便任务可以被类似于 flower之类的工具监控。Sets the default value for the workers
-E``argument.

task_send_sent_event

2.2新增配置

默认:关闭

开启之后,会为每个任务发送一个task-send事件,以便于任务被worker消费之前可以被追踪。

event_queue_ttl

支持:amqp

默认:5s

当消息被发送到一个监控客户端的时间队列时该消息的过期时间。

比如,如果这个值设置为10,那么这个消息会在交付给这个队列10s之后被删除。

event_queue_expires

支持:amqp

默认60.0s

Expiry time in seconds (int/float) for when after a monitor clients
event queue will be deleted (x-expires).

event_queue_prefix

默认:"celeryev"

事件接收器队列的名称前缀。

event_exchange

默认:"celeryev"

Name of the event exchange.

警告:
实验性质的选项,谨慎使用。

event_serializer

默认:"json"

发送时间消息时使用的序列化格式

更多参考:Serializers:序列化

远程控制命令

提示:
要关闭远程控制命令,可以查看worker_enable_remote_control设置

control_queue_ttl

默认:300

远程控制命令队列中的消息过期之前的时间(秒)。

如果使用默认的300秒,这意味着如果发送了一个远程控制命令,并且在300秒内没有工作人员拾取该命令,则该命令将被丢弃。

这个设置还适用于 remote control reply queues.

control_queue_expires

默认:10

未被使用的remote control command 对列被删除的时间。

这个设置还适用于 remote control reply queues.

control_exchange

默认:"celery"

Name of the control command exchange.

警告:
实验性质,小心使用

日志

worker_hijack_root_logger

2.2新增加配置

默认:开启

简单说就是如果想要使用自定义的日志收集器就设置成False。

worker_log_color

默认:如果日志输出到终端,则默认开启

日志输出是不是有颜色。。

worker_log_format

默认:

"[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"

The format to use for log messages,更多信息参考Python的loggingopen in new window模块。

worker_task_log_format

默认:

"[%(asctime)s: %(levelname)s/%(processName)s]
%(task_name)s[%(task_id)s]: %(message)s"

The format to use for log messages logged in tasks,更多信息参考Python的loggingopen in new window模块。

worker_redirect_stdouts

默认:开启

开启后stdoutstderr会被重定向到当前的日志记录器

Used by celery worker and celery beat .

worker_redirect_stdouts_level

默认:WARNING

The log level output to stdout and stderr is logged as.Can be one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.

安全

security_key

默认:None

2.5新增

当使用Message Signing:消息签名时,用来指定签名消息用的包含私钥的文件的路径,可以是相对路径或绝对路径。

security_certificate

默认:None

2.5新增

当使用Message Signing:消息签名时,用来指定签名消息用的 X.509 证书文件的路径,可以是相对路径或绝对路径。

security_cert_store

默认:None

2.5新增

给Message Signing:消息签名使用的包含 X.509 证书的文件夹,也可以使用通配符,(比如 /etc/certs/*.pem

难以理解这配置。。。

security_digest

默认:sha256

4.3新增

当Message Signing:消息签名启用时,签名消息时使用的加密算法。参考链接:Message digests (Hashing)open in new window

。。没啥必要改,也没啥必要看

自定义组件类(高级

worker_pool

默认:"prefork" (celery.concurrency.prefork:TaskPool)

worker 使用的 pool class 的名称。

worker_autoscaler

2.2新增

默认:"celery.worker.autoscale:Autoscaler"

autoscaler使用的类名

worker_consumer

默认:"celery.worker.consumer:Consumer"

worker使用的 consumer 类的类名

worker_timer

默认:"kombu.asynchronous.hub.timer:Timer"

Name of the ETA scheduler class used by the worker. Default is or set by the pool implementation.

Beat Settings (celery beat)

beat_schedule

默认: {} (empty mapping).

The periodic task schedule used by beatopen in new window. See Entriesopen in new window.

beat_scheduler

默认: "celery.beat:PersistentScheduler".

The default scheduler class. May be set to "django_celery_beat.schedulers:DatabaseScheduler" for instance, if used alongside django-celery-beatopen in new window extension.

Can also be set via the celery beat -Sopen in new window argument.

beat_schedule_filename

默认: "celerybeat-schedule".

Name of the file used by PersistentScheduler to store the last run times of periodic tasks. Can be a relative or absolute path, but be aware that the suffix .db may be appended to the file name (depending on Python version).

Can also be set via the celery beat --scheduleopen in new window argument.

beat_sync_every

默认: 0.

The number of periodic tasks that can be called before another database sync is issued. A value of 0 (default) means sync based on timing - default of 3 minutes as determined by scheduler.sync_every. If set to 1, beat will call sync after every task message sent.

beat_max_loop_interval

默认: 0.

The maximum number of seconds beat open in new windowcan sleep between checking the schedule.

The default for this value is scheduler specific. For the default Celery beat scheduler the value is 300 (5 minutes), but for the django-celery-beatopen in new window database scheduler it’s 5 seconds because the schedule may be changed externally, and so it must take changes to the schedule into account.

Also when running Celery beat embedded (-Bopen in new window) on Jython as a thread the max interval is overridden and set to 1 so that it’s possible to shut down in a timely manner.