本文档描述了Celery(4.3)的当前稳定版本。 对于开发文档,转到此处

Celery - 分布式任务队列

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。

它是一个任务队列,专注于实时处理,同时还支持任务调度。

Celery拥有庞大而多样化的用户和贡献者社区,您应该加入我们on IRCour mailing-list

Celery是开源的,并根据BSD许可获得许可。

捐赠¶ T0>

这个项目依赖于您的慷慨捐赠。

如果您使用Celery创建商业产品,请考虑成为我们的支持者或我们的赞助商以确保Celery的未来。

入门

内容¶ T0>

入门

发布:4.3
日期:2019年4月2日

Celery简介

什么是任务队列?

任务队列用作跨线程或机器分配工作的机制。

任务队列的输入是称为任务的工作单元。 专用工作进程不断监视任务队列以执行新工作。

Celery通过消息进行通信,通常使用经纪人在客户和工人之间进行调解。 为了启动任务,客户端向队列添加消息,然后代理将该消息传递给工作者。

Celery系统可以由多个workers和brikers组成,让位于高可用性和水平扩展。

Celery是用Python编写的,但协议可以用任何语言实现。 除了Python之外,还有Node.js的node-celeryPHP客户端

还可以实现语言互操作性,从而暴露HTTP端点并具有请求它的任务(webhooks)。

What do I need?

Celery需要消息传输来发送和接收消息。 RabbitMQ和Redis代理传输功能齐全,但也支持无数其他实验性解决方案,包括使用SQLite进行本地开发。

Celery可以在一台机器上,多台机器上,甚至跨数据中心运行。

Get Started

如果这是您第一次尝试使用Celery,或者您没有跟上3.1版本的开发并且来自以前的版本,那么您应该阅读我们的入门教程:

Celery is…

  • Simple

    Celery易于使用和维护,不需要配置文件

    它有一个活跃,友好的社区,您可以与之交谈以寻求支持,包括邮件列表IRC频道

    这是您可以制作的最简单的应用程序之一:

    from celery import Celery
    
    app = Celery('hello', broker='amqp://guest@localhost//')
    
    @app.task
    def hello():
        return 'hello world'
    
  • 高度可用

    工作者和客户端将在连接丢失或失败时自动重试,并且某些代理以Primary / PrimaryPrimary / Replica复制的方式支持HA。

  • Fast

    单个Celery进程每分钟可处理数百万个任务,具有亚毫秒的往返延迟(使用RabbitMQ,librabbitmq和 optimized settings)。

  • Flexible

    Almost every part of Celery can be extended or used on its own, Custom pool implementations, serializers, compression schemes, logging, schedulers, consumers, producers, broker transports, and much more.

It supports

  • Result Stores

    • AMQP, Redis
    • Memcached,
    • SQLAlchemy, Django ORM
    • Apache Cassandra, Elasticsearch
  • 序列化

    • pickle, json, yaml, msgpack.
    • zlib, bzip2 compression.
    • Cryptographic message signing.
Features

  • 监控

    工作人员发出监视事件流,内置和外部工具使用它们来实时告诉您集群正在做什么。

    Read more….

  • Work-flows

    简单而复杂的工作流可以使用一组我们称之为“画布”的强大原语组成,包括分组,链接,分块等。

    Read more….

  • Time & Rate Limits

    您可以控制每秒/分钟/小时可以执行的任务数,或者允许任务运行多长时间,并且可以将其设置为默认值,针对特定工作程序或针对每种任务类型单独设置。

    Read more….

  • 调度

    您可以指定以秒为单位运行任务的时间或datetime,或者您可以根据简单的时间间隔使用定期任务来重复发生事件,或者支持分钟,小时,星期几,日的Crontab表达式月份和月份。

    Read more….

  • Resource Leak Protection

    The --max-tasks-per-child option is used for user tasks leaking resources, like memory or file descriptors, that are simply out of your control.

    Read more….

  • User Components

    Each worker component can be customized, and additional components can be defined by the user. The worker is built up using “bootsteps” — a dependency graph enabling fine grained control of the worker’s internals.

Framework Integration

Celery很容易与Web框架集成,其中一些甚至还有集成包:

For Django see First steps with Django.

The integration packages aren’t strictly necessary, but they can make development easier, and sometimes they add important hooks like closing database connections at fork(2).

Installation

您可以通过Python包索引(PyPI)或从源安装Celery。

To install using pip:

$ pip install -U Celery
Bundles

Celery还定义了一组bundle,可用于安装Celery和给定功能的依赖项。

您可以使用括号在需求中或pip命令行中指定这些。 可以通过逗号分隔多个包来指定它们。

$ pip install "celery[librabbitmq]"

$ pip install "celery[librabbitmq,redis,auth,msgpack]"

The following bundles are available:

Serializers
celery[auth]:使用auth安全序列化程序。
celery[msgpack]:
 使用msgpack序列化程序。
celery[yaml]:使用yaml序列化程序。
Concurrency
celery[eventlet]:
 使用eventlet池。
celery[gevent]:使用gevent池。
传输和后端
celery[librabbitmq]:
 

使用librabbitmq C库。

celery[redis]:

for using Redis as a message transport or as a result backend.

celery[sqs]:

for using Amazon SQS as a message transport (experimental).

celery[tblib]:

for using the task_remote_tracebacks feature.

celery[memcache]:
 

for using Memcached as a result backend (using pylibmc)

celery[pymemcache]:
 

for using Memcached as a result backend (pure-Python implementation).

celery[cassandra]:
 

for using Apache Cassandra as a result backend with DataStax driver.

celery[couchbase]:
 

for using Couchbase as a result backend.

celery[arangodb]:
 

for using ArangoDB as a result backend.

celery[elasticsearch]:
 

for using Elasticsearch as a result backend.

celery[riak]:

for using Riak as a result backend.

celery[dynamodb]:
 

for using AWS DynamoDB as a result backend.

celery[zookeeper]:
 

for using Zookeeper as a message transport.

celery[sqlalchemy]:
 

for using SQLAlchemy as a result backend (supported).

celery[pyro]:

for using the Pyro4 message transport (experimental).

celery[slmq]:

for using the SoftLayer Message Queue transport (experimental).

celery[consul]:

for using the Consul.io Key/Value store as a message transport or result backend (experimental).

celery[django]:

specifies the lowest version possible for Django support.

You should probably not use this in your requirements, it’s here for informational purposes only.

Downloading and installing from source

Download the latest version of Celery from PyPI:

https://pypi.org/project/celery/

You can install it by doing the following,:

$ tar xvfz celery-0.0.0.tar.gz
$ cd celery-0.0.0
$ python setup.py build
# python setup.py install

如果您当前没有使用virtualenv,则必须以特权用户身份执行最后一个命令。

Using the development version
With pip

Celery开发版还需要开发版kombuamqpbilliardvine

You can install the latest snapshot of these using the following pip commands:

$ pip install https://github.com/celery/celery/zipball/master#egg=celery
$ pip install https://github.com/celery/billiard/zipball/master#egg=billiard
$ pip install https://github.com/celery/py-amqp/zipball/master#egg=amqp
$ pip install https://github.com/celery/kombu/zipball/master#egg=kombu
$ pip install https://github.com/celery/vine/zipball/master#egg=vine
With git

Please see the Contributing section.

Brokers

Release:4.3
Date:Apr 02, 2019

Celery支持多种消息传输方式。

Broker Instructions
Using RabbitMQ
Installation & Configuration

RabbitMQ是默认代理,因此除了要使用的代理实例的URL位置之外,它不需要任何其他依赖项或初始配置:

broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'

有关代理URL的说明以及Celery可用的各种代理配置选项的完整列表,请参阅代理设置,并参阅下面的设置用户名,密码和vhost。

Installing the RabbitMQ Server

See Installing RabbitMQ over at RabbitMQ’s website. For macOS see Installing RabbitMQ on macOS.

Note

如果您在安装和使用rabbitmqctl后遇到nodedown错误,那么此博客文章可以帮助您确定问题的根源:

Setting up RabbitMQ

要使用Celery,我们需要创建一个RabbitMQ用户,一个虚拟主机并允许该用户访问该虚拟主机:

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

用上面的myusermypasswordmyvhost替换为适当的值。

有关access control的更多信息,请参见RabbitMQ 管理指南

Installing RabbitMQ on macOS

在macOS上安装RabbitMQ的最简单方法是使用Homebrew用于macOS的新的闪亮包管理系统。

首先,使用Homebrew文档提供的单行命令安装Homebrew:

ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)"

最后,我们可以使用brew安装RabbitMQ:

$ brew install rabbitmq

在使用brew安装RabbitMQ之后,需要在路径中添加以下内容才能启动和停止代理:将其添加到shell的启动文件中(例如, .bash_profile.profile)。

PATH=$PATH:/usr/local/sbin
Configuring the system host name

如果您使用的DHCP服务器为您提供随机主机名,则需要永久配置主机名。 这是因为RabbitMQ使用主机名与节点通信。

使用scutil命令永久设置主机名:

$ sudo scutil --set HostName myhost.local

然后将该主机名添加到/ etc / hosts,以便可以将其解析回IP地址:

127.0.0.1       localhost myhost myhost.local

If you start the rabbitmq-server, your rabbit node should now be rabbit@myhost, as verified by rabbitmqctl:

$ sudo rabbitmqctl status
Status of node rabbit@myhost ...
[{running_applications,[{rabbit,"RabbitMQ","1.7.1"},
                    {mnesia,"MNESIA  CXC 138 12","4.4.12"},
                    {os_mon,"CPO  CXC 138 46","2.2.4"},
                    {sasl,"SASL  CXC 138 11","2.1.8"},
                    {stdlib,"ERTS  CXC 138 10","1.16.4"},
                    {kernel,"ERTS  CXC 138 10","2.13.4"}]},
{nodes,[rabbit@myhost]},
{running_nodes,[rabbit@myhost]}]
...done.

如果您的DHCP服务器为您提供以IP地址开头的主机名(例如,23.10.112.31.comcast.net),这一点尤为重要。 在这种情况下,RabbitMQ将尝试使用rabbit @ 23:非法主机名。

Starting/Stopping the RabbitMQ server

To start the server:

$ sudo rabbitmq-server

你也可以通过添加-detached选项在后台运行它(注意:只有一个破折号):

$ sudo rabbitmq-server -detached

切勿使用killkill(1))来停止RabbitMQ服务器,而是使用rabbitmqctl命令:

$ sudo rabbitmqctl stop

服务器运行时,您可以继续阅读设置RabbitMQ

Using Redis
Installation

对于Redis支持,您必须安装其他依赖项。 您可以使用celery [redis] bundle一次性安装Celery和这些依赖项:

$ pip install -U "celery[redis]"
Configuration

配置很简单,只需配置Redis数据库的位置:

app.conf.broker_url = 'redis://localhost:6379/0'

URL的格式为:

redis://:password@hostname:port/db_number

方案之后的所有字段都是可选的,并且将使用数据库0在端口6379上默认为localhost

如果应该使用Unix套接字连接,则URL必须采用以下格式:

redis+socket:///path/to/redis.sock

Specifying a different database number when using a Unix socket is possible by adding the virtual_host parameter to the URL:

redis+socket:///path/to/redis.sock?virtual_host=db_number

直接连接到Redis Sentinel列表也很容易:

app.conf.broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381'
app.conf.broker_transport_options = { 'master_name': "cluster1" }
Visibility Timeout

可见性超时定义在将消息重新传递给另一个工作程序之前等待工作程序确认任务的秒数。 请务必在下方看到警告

此选项通过broker_transport_options设置进行设置:

app.conf.broker_transport_options = {'visibility_timeout': 3600}  # 1 hour.

Redis的默认可见性超时为1小时。

Results

如果您还想在Redis中存储状态和返回任务值,则应配置以下设置:

app.conf.result_backend = 'redis://localhost:6379/0'

有关Redis结果后端支持的选项的完整列表,请参阅Redis后端设置

如果您使用的是Sentinel,则应使用result_backend_transport_options设置指定master_name:

app.conf.result_backend_transport_options = {'master_name': "mymaster"}
Caveats
Fanout prefix

默认情况下,所有虚拟主机都将看到广播消息。

您必须设置传输选项以为消息添加前缀,以便它们仅由活动虚拟主机接收:

app.conf.broker_transport_options = {'fanout_prefix': True}

请注意,您将无法与运行旧版本的工作程序或未启用此设置的工作程序进行通信。

此设置将来是默认设置,因此最好早点而不是稍后迁移。

Fanout patterns

默认情况下,工作人员将收到所有与任务相关

为避免这种情况,您必须设置fanout_patterns扇出选项,以便工作人员只能订阅与工作人员相关的事件:

app.conf.broker_transport_options = {'fanout_patterns': True}

Note that this change is backward incompatible so all workers in the cluster must have this option enabled, or else they won’t be able to communicate.

This option will be enabled by default in the future.

Visibility timeout

If a task isn’t acknowledged within the Visibility Timeout the task will be redelivered to another worker and executed.

This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

So you have to increase the visibility timeout to match the time of the longest ETA you’re planning to use.

Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.

Periodic tasks won’t be affected by the visibility timeout, as this is a concept separate from ETA/countdown.

You can increase this timeout by configuring a transport option with the same name:

app.conf.broker_transport_options = {'visibility_timeout': 43200}

The value must be an int describing the number of seconds.

Key eviction

Redis may evict keys from the database in some situations

If you experience an error like:

InconsistencyError: Probably the key ('_kombu.binding.celery') has been
removed from the Redis database.

then you may want to configure the redis-server to not evict keys by setting the timeout parameter to 0 in the redis configuration file.

Using Amazon SQS
Installation

对于Amazon SQS支持,您必须安装其他依赖项。 您可以使用celery [sqs] bundle一次性安装Celery和这些依赖项:

$ pip install celery[sqs]
Configuration

You have to specify SQS in the broker URL:

broker_url = 'sqs://ABCDEFGHIJKLMNOPQRST:ZYXK7NiynGlTogH8Nj+P9nlE73sq3@'

where the URL format is:

sqs://aws_access_key_id:aws_secret_access_key@

Please note that you must remember to include the @ sign at the end and encode the password so it can always be parsed correctly. For example:

from kombu.utils.url import quote

aws_access_key = quote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = quote("ZYXK7NiynGlTogH8Nj+P9nlE73sq3")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)

The login credentials can also be set using the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, in that case the broker URL may only be sqs://.

If you are using IAM roles on instances, you can set the BROKER_URL to: sqs:// and kombu will attempt to retrieve access tokens from the instance metadata.

Options
Region

The default region is us-east-1 but you can select another region by configuring the broker_transport_options setting:

broker_transport_options = {'region': 'eu-west-1'}

See also

An overview of Amazon Web Services regions can be found here:

Visibility Timeout

The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker. Also see caveats below.

This option is set via the broker_transport_options setting:

broker_transport_options = {'visibility_timeout': 3600}  # 1 hour.

The default visibility timeout is 30 minutes.

Polling Interval

The polling interval decides the number of seconds to sleep between unsuccessful polls. This value can be either an int or a float. By default the value is one second: this means the worker will sleep for one second when there’s no more messages to read.

You must note that more frequent polling is also more expensive, so increasing the polling interval can save you money.

The polling interval can be set via the broker_transport_options setting:

broker_transport_options = {'polling_interval': 0.3}

Very frequent polling intervals can cause busy loops, resulting in the worker using a lot of CPU time. If you need sub-millisecond precision you should consider using another transport, like RabbitMQ <broker-amqp>, or Redis <broker-redis>.

Queue Prefix

By default Celery won’t assign any prefix to the queue names, If you have other services using SQS you can configure it do so using the broker_transport_options setting:

broker_transport_options = {'queue_name_prefix': 'celery-'}
Caveats
  • If a task isn’t acknowledged within the visibility_timeout, the task will be redelivered to another worker and executed.

    This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

    So you have to increase the visibility timeout to match the time of the longest ETA you’re planning to use.

    Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.

    Periodic tasks won’t be affected by the visibility timeout, as it is a concept separate from ETA/countdown.

    The maximum visibility timeout supported by AWS as of this writing is 12 hours (43200 seconds):

    broker_transport_options = {'visibility_timeout': 43200}
    
  • SQS doesn’t yet support worker remote control commands.

  • SQS doesn’t yet support events, and so cannot be used with celery events, celerymon, or the Django Admin monitor.

Results

Multiple products in the Amazon Web Services family could be a good candidate to store or publish results with, but there’s no such result backend included at this point.

Warning

Don’t use the amqp result backend with SQS.

It will create one queue for every task, and the queues will not be collected. This could cost you money that would be better spent contributing an AWS result store backend back to Celery :)

Broker Overview

This is comparison table of the different transports supports, more information can be found in the documentation for each individual transport (see Broker Instructions).

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Amazon SQS Stable No No
Zookeeper Experimental No No

Experimental brokers may be functional but they don’t have dedicated maintainers.

Missing monitor support means that the transport doesn’t implement events, and as such Flower, celery events, celerymon and other event-based monitoring tools won’t work.

Remote control means the ability to inspect and manage workers at runtime using the celery inspect and celery control commands (and other tools using the remote control API).

First Steps with Celery

Celery is a task queue with batteries included. 它易于使用,因此您可以在不了解其解决的问题的完整复杂性的情况下开始使用。 它围绕最佳实践进行设计,以便您的产品可以扩展并与其他语言集成,并提供在生产中运行此类系统所需的工具和支持。

在本教程中,您将学习使用Celery的绝对基础知识。

Learn about;

  • 选择并安装消息传输(broker)。
  • 安装Celery并创建您的第一个任务。
  • 启动worker并调用任务。
  • 在任务过渡到不同状态时跟踪任务,并检查返回值。

芹菜起初看起来可能令人生畏 - 但不要担心 - 本教程将让您立刻开始。 故意保持简单,以免混淆高级功能。 完成本教程后,浏览其余文档是个好主意。 例如,Next Steps教程将展示Celery的功能。

Choosing a Broker

Celery需要一个解决方案来发送和接收消息;通常这是以一个名为消息代理的单独服务的形式出现的。

有几种选择,包括:

RabbitMQ

RabbitMQ功能齐全,稳定,耐用且易于安装。 它是生产环境的绝佳选择。 有关在Celery中使用RabbitMQ的详细信息:

If you’re using Ubuntu or Debian install RabbitMQ by executing this command:

$ sudo apt-get install rabbitmq-server

Or, if you want to run it on Docker execute this:

$ docker run -d -p 5462:5462 rabbitmq

命令完成后,代理已经在后台运行,准备为您移动消息: Starting rabbitmq-server: SUCCESS.

Don’t worry if you’re not running Ubuntu or Debian, you can go to this website to find similarly simple installation instructions for other platforms, including Microsoft Windows:

Redis

Redis也是功能完备的,但在突然终止或电源故障时更容易丢失数据。 有关使用Redis的详细信息:

Using Redis

如果你想在Docker上运行它,执行以下命令:

$ docker run -d -p 6379:6379 redis
Other brokers

In addition to the above, there are other experimental transport implementations to choose from, including Amazon SQS.

See Broker Overview for a full list.

Installing Celery

Celery位于Python包索引(PyPI)上,因此可以使用标准的Python工具安装,如pipeasy_install

$ pip install celery
Application

您需要的第一件事是Celery实例。 我们称之为Celery应用程序或仅仅是app 由于此实例用作您希望在Celery中执行的所有操作的入口点,例如创建任务和管理工作程序,因此其他模块必须可以导入它。

在本教程中,我们将所有内容保存在单个模块中,但对于较大的项目,您需要创建专用模块

Let’s create the file tasks.py:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Celery的第一个参数是当前模块的名称。 只有在__ main __模块中定义任务时才能自动生成名称。

第二个参数是broker关键字参数,指定要使用的消息代理的URL。 这里使用RabbitMQ(也是默认选项)。

有关更多选择,请参阅上面的选择代理 - 对于RabbitMQ,您可以使用amqp:// localhost,或者对于Redis,您可以使用redis:// localhost

您定义了一个名为add的单个任务,返回两个数字的总和。

Running the Celery worker server

您现在可以通过使用worker参数执行我们的程序来运行worker:

$ celery -A tasks worker --loglevel=info

Note

如果工作人员未启动,请参阅疑难解答部分。

在生产中,您将希望在后台运行worker作为守护程序。 为此,您需要使用平台提供的工具,或者supervisord(有关详细信息,请参阅Daemonization)。

有关可用命令行选项的完整列表,请执行以下操作:

$  celery worker --help

还有其他几个可用的命令,也可以提供帮助:

$ celery help
Calling the task

要调用我们的任务,您可以使用delay()方法。

这是apply_async()方法的便捷快捷方式,可以更好地控制任务执行(请参阅调用任务):

>>> from tasks import add
>>> add.delay(4, 4)

该任务现在已由您之前开始的工作人员处理。 您可以通过查看worker的控制台输出来验证这一点。

调用任务将返回AsyncResult实例。 这可用于检查任务的状态,等待任务完成,或获取其返回值(或者如果任务失败,则获取异常和回溯)。

默认情况下不启用结果。 要在数据库中执行远程过程调用或跟踪任务结果,您需要配置Celery以使用结果后端。 这将在下一节中介绍。

Keeping Results

如果您想跟踪任务的状态,Celery需要在某处存储或发送状态。 有几个内置的结果后端可供选择:SQLAlchemy / Django ORM,MemcachedRedis RPCRabbitMQ AMQP),以及 - 或者您可以定义自己的。

在本例中,我们使用rpc结果后端,它将状态作为瞬态消息发回。 后端通过Celery后端参数指定,(如果选择使用配置模块,则通过result_backend设置):

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用Redis作为结果后端,但仍然使用RabbitMQ作为消息代理(一种流行的组合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

To read more about result backends please see Result Backends.

现在配置了结果后端,让我们再次调用该任务。 这次你将继续调用任务时返回的AsyncResult实例:

>>> result = add.delay(4, 4)

ready()方法返回任务是否已完成处理:

>>> result.ready()
False

您可以等待结果完成,但这很少使用,因为它将异步调用转换为同步调用:

>>> result.get(timeout=1)
8

如果任务引发异常,get()将重新引发异常,但您可以通过指定propagate参数来覆盖它:

>>> result.get(propagate=False)

如果任务引发异常,您还可以访问原始回溯:

>>> result.traceback

Warning

后端使用资源来存储和传输结果。 要确保释放资源,最后必须在调用任务后返回的EVERY AsyncResult实例上调用get()forget()

有关完整的结果对象引用,请参见celery.result

Configuration

与消费类电器一样,Celery不需要太多配置即可运行。 它有一个输入和一个输出。 输入必须连接到代理,输出可以选择连接到结果后端。 但是,如果仔细观察背面,会有一个盖子显示滑块,刻度盘和按钮的负载:这是配置。

对于大多数用例,默认配置应该足够好,但是可以配置许多选项以使Celery完全按照需要工作。 阅读可用选项是一个好主意,熟悉可配置的内容。 您可以在配置和默认值参考中阅读有关选项的信息。

可以直接在应用程序上或使用专用配置模块设置配置。 例如,您可以通过更改task_serializer设置来配置用于序列化任务有效负载的默认序列化程序:

app.conf.task_serializer = 'json'

If you’re configuring many settings at once you can use update:

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于大型项目,建议使用专用配置模块。 不鼓励硬编码周期性任务间隔和任务路由选项。 将它们保存在集中位置要好得多。 对于库来说尤其如此,因为它使用户能够控制其任务的行为方式。 集中配置还允许您的SysAdmin在发生系统故障时进行简单的更改。

您可以通过调用app.config_from_object()方法告诉Celery实例使用配置模块:

app.config_from_object('celeryconfig')

此模块通常称为“celeryconfig”,但您可以使用任何模块名称。

在上面的例子中,名为celeryconfig.py的模块必须可以从当前目录或Python路径加载。 它可能看起来像这样:

celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

要验证配置文件是否正常工作且不包含任何语法错误,您可以尝试导入它:

$ python -m celeryconfig

有关配置选项的完整参考,请参阅配置和默认值

为了演示配置文件的强大功能,您可以将错误的任务路由到专用队列:

celeryconfig.py:

task_routes = {
    'tasks.add': 'low-priority',
}

或者不是路由它,而是可以对任务进行速率限制,这样一分钟内只能处理10个这种类型的任务(10 / m):

celeryconfig.py:

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

如果您使用RabbitMQ或Redis作为代理,那么您还可以指示工作人员在运行时为任务设置新的速率限制:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
    new rate limit set successfully

有关任务路由的详细信息,请参阅路由任务;有关注释的详细信息,请参阅task_annotations设置;有关远程控制的更多信息,请参阅监控和管理指南命令以及如何监控您的员工正在做什么。

Where to go from here

如果您想了解更多信息,请继续阅读后续步骤教程,然后阅读用户指南

疑难解答

常见问题中还有一个故障排除部分。

Worker无法启动:权限错误
  • 如果您使用的是Debian,Ubuntu或其他基于Debian的发行版:

    Debian recently renamed the /dev/shm special file to /run/shm.

    A simple workaround is to create a symbolic link:

    # ln -s /run/shm /dev/shm
    
  • Others:

    如果你提供任何 - pidfile - logfile - 陈述参数,那么你必须确保它们指向一个启动worker的用户可写和可读的文件或目录。

结果后端不起作用或任务始终处于PENDING状态

默认情况下,所有任务都是PENDING,因此状态将更好地命名为“unknown”。 Celery在发送任务时不更新状态,并且任何没有历史记录的任务都被认为是待处理的(毕竟你知道任务ID)。

  1. 确保该任务未启用ignore_result

    启用此选项将强制工作人员跳过更新状态。

  2. 确保未启用task_ignore_result设置。

  3. 确保您没有任何旧工作人员仍在运行。

    偶然启动多个工作人员很容易,因此在开始新工作之前,请确保先前的工作人员已正确关闭。

    未配置预期结果后端的旧工作程序可能正在运行并且正在劫持任务。

    - pidfile参数可以设置为绝对路径,以确保不会发生这种情况。

  4. 确保客户端配置了正确的后端。

    如果由于某种原因,客户端配置为使用与工作者不同的后端,则您将无法接收结果。 确保后端配置正确:

    >>> result = task.delay()
    >>> print(result.backend)
    

Next Steps

Celery的第一步指南是故意最小的。 在本指南中,我将更详细地演示Celery提供的内容,包括如何为您的应用程序和库添加Celery支持。

本文档未记录Celery的所有功能和最佳实践,因此建议您还阅读用户指南

Using Celery in your Application
Our Project

Project layout:

proj/__init__.py
    /celery.py
    /tasks.py
proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

在本单元中,您创建了我们的Celery实例(有时称为app)。 要在项目中使用Celery,只需导入此实例即可。

  • broker参数指定要使用的代理的URL。

    有关详细信息,请参阅选择代理

  • backend参数指定要使用的结果后端,

    它用于跟踪任务状态和结果。 虽然默认情况下禁用了结果,但我在此处使用RPC结果后端,因为我演示了如何在以后检索结果,您可能希望为应用程序使用不同的后端。 他们都有不同的优点和缺点。 如果您不需要结果,最好禁用它们。 通过设置@task(ignore_result = True)选项,也可以为单个任务禁用结果。

    有关详细信息,请参阅保持结果

  • include参数是工作程序启动时要导入的模块列表。 您需要在此处添加我们的任务模块,以便工作人员能够找到我们的任务。

proj/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)
Starting the worker

celery程序可用于启动worker(您需要在proj上面的目录中运行worker):

$ celery -A proj worker -l info

当工人开始时,你应该看到横幅和一些消息:

-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

- broker是您在celery模块中的broker参数中指定的URL,您还可以使用在命令行上指定其他代理 - b选项。

- Concurrency是用于同时处理任务的prefork工作进程的数量,当所有这些进程忙于工作时,新任务必须等待其中一个任务完成才能处理。

默认并发数是该计算机上CPU的数量(包括核心),您可以使用芹菜 worker指定自定义数字 -c 选项。 没有推荐值,因为最佳数量取决于许多因素,但如果您的任务主要是I / O限制,那么您可以尝试增加它,实验表明添加两倍以上的CPU数量很少有效,而且可能会降低性能。

包括默认的prefork池,Celery还支持使用Eventlet,Gevent,并在单个线程中运行(参见Concurrency)。

- Events是一个选项,在启用时会导致Celery为工作中发生的操作发送监视消息(事件)。 这些可以被监视器程序使用,例如celery events,以及Flower - 实时Celery监视器,您可以在监测和管理指南

- Queues是工作人员将从中使用任务的队列列表。 可以告诉工作人员同时从多个队列中消耗,这用于将消息路由到特定工作人员,作为服务质量,关注点分离和优先级排序的方法,所有这些都在路由指南中描述

您可以通过传入 --help标志来获取命令行参数的完整列表:

$ celery worker --help

Workers Guide中更详细地描述了这些选项。

Stopping the worker

要停止工作人员,只需按Control-c即可。 Workers Guide中详细列出了工人支持的信号列表。

In the background

在生产中,您需要在后台运行worker,这在daemonization教程中有详细描述。

守护程序脚本使用celery multi命令在后台启动一个或多个worker:

$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

You can restart it too:

$ celery  multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

or stop it:

$ celery multi stop w1 -A proj -l info

stop命令是异步的,因此它不会等待worker关闭。 您可能希望使用stopwait命令,这可确保在退出之前完成所有当前正在执行的任务:

$ celery multi stopwait w1 -A proj -l info

Note

celery multi不存储有关worker的信息,因此在重新启动时需要使用相同的命令行参数。 停止时只能使用相同的pidfile和logfile参数。

默认情况下,它会在当前目录中创建pid和日志文件,以防止多个工作人员相互启动,鼓励您将这些文件放在专用目录中:

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

使用multi命令可以启动多个worker,并且还有一个强大的命令行语法来为不同的worker指定参数,例如:

$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

有关更多示例,请参阅API参考中的multi模块。

About the --app argument

- app参数指定要使用的Celery应用程序实例,它必须采用module.path:attribute的形式

但它也支持快捷方式表单如果只指定了包名称,它将尝试按以下顺序搜索应用程序实例:

With --app=proj:

  1. 名为proj.app的属性,或
  2. 名为proj.celery的属性,或
  3. 模块proj中的任何属性,其中值是Celery应用程序,或

如果没有找到这些,它将尝试一个名为proj.celery的子模块:

  1. an attribute named proj.celery.app, or
  2. an attribute named proj.celery.celery, or
  3. Any attribute in the module proj.celery where the value is a Celery application.

此方案模仿文档中使用的实践 - 即,单个包含模块的proj:app,以及大型项目的proj.celery:app

Calling Tasks

您可以使用delay()方法调用任务:

>>> add.delay(2, 2)

此方法实际上是另一个名为apply_async()的方法的星型参数快捷方式:

>>> add.apply_async((2, 2))

后者允许您指定执行选项,例如运行时间(倒计时),应该发送到的队列,等等:

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

在上面的示例中,任务将被发送到名为lopri的队列,并且该任务将在发送消息后最早10秒执行。

直接应用任务将在当前进程中执行任务,因此不会发送任何消息:

>>> add(2, 2)
4

这三种方法 - delay()apply_async(),并且应用(__ call __)表示Celery调用API,它也用于签名。

有关Calling API的更详细概述,请参阅Calling User Guide

每个任务调用都将被赋予唯一标识符(UUID),这是任务ID。

delayapply_async方法返回AsyncResult实例,该实例可用于跟踪任务执行状态。 但为此你需要启用结果后端,以便状态可以存储在某个地方。

默认情况下禁用结果,因为没有适合每个应用程序的结果后端,因此要选择一个需要考虑每个后端的缺点。 对于许多任务来说,保持返回值甚至不是非常有用,所以它是一个合理的默认值。 另请注意,结果后端不用于监视任务和工作程序,因为Celery使用专用事件消息(请参阅监视和管理指南)。

如果配置了结果后端,则可以检索任务的返回值:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

您可以通过查看id属性找到任务的ID:

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

如果任务引发异常,您还可以检查异常和回溯,实际上result.get()将默认传播任何错误:

>>> res = add.delay(2)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/celery/celery/result.py", line 113, in get
    interval=interval)
File "/opt/devel/celery/celery/backends/rpc.py", line 138, in wait_for
    raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)

如果您不希望传播错误,则可以通过传递propagate参数来禁用它:

>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)

在这种情况下,它将返回引发的异常实例,因此要检查任务是成功还是失败,您必须在结果实例上使用相应的方法:

>>> res.failed()
True

>>> res.successful()
False

那么它如何知道任务是否失败? 它可以通过查看任务state找到:

>>> res.state
'FAILURE'

任务只能处于单个状态,但可以通过多个状态进行。 典型任务的阶段可以是:

PENDING -> STARTED -> SUCCESS

启动状态是一种特殊状态,仅在启用task_track_started设置或者为任务设置@task(track_started = True)选项时才会记录。

挂起状态实际上不是记录状态,而是任何未知任务ID的默认状态:您可以从此示例中看到:

>>> from proj.celery import app

>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

如果重试任务,阶段可能变得更加复杂。 为了证明,对于一个重试两次的任务,阶段将是:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

要阅读有关任务状态的更多信息,您应该在任务用户指南中看到States部分。

Calling Guide中详细介绍了呼叫任务。

Canvas: Designing Work-flows

您刚学会了如何使用tasks delay方法调用任务,这通常是您所需要的,但有时您可能希望将任务调用的签名传递给另一个进程或作为参数传递给另一个函数,对于这个Celery使用的东西叫做签名

签名以一种方式包装单个任务调用的参数和执行选项,以便它可以传递给函数,甚至可以通过线路进行序列化和发送。

您可以使用参数(2, 2)创建add任务的签名,并倒计时这样10秒:

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

还有一个使用星形参数的快捷方式:

>>> add.s(2, 2)
tasks.add(2, 2)
并且再次调用API ...

签名实例还支持调用API:意味着它们具有delayapply_async方法。

但是区别在于签名可能已经指定了参数签名。 add任务有两个参数,因此指定两个参数的签名将构成一个完整的签名:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

但是,您也可以创建不完整的签名来创建我们称之为partials的内容:

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

s2现在是部分签名,需要另一个参数完成,这可以在调用签名时解决:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

在这里,您添加了现有参数2前面的参数8,形成了add(8, 2)的完整签名。

关键字参数也可以在以后添加,然后将这些参数与任何现有的关键字参数合并,但新参数优先:

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

如上所述,签名支持调用API:意思是;

  • sig.apply_async(args=(), kwargs={}, **options)

    使用可选的部分参数和部分关键字参数调用签名。 还支持部分执行选项。

  • sig.delay(*args, **kwargs)

    apply_async的星形参数版本。 任何参数都将附加到签名中的参数,并且关键字参数将与任何现有键合并。

所以这一切似乎都非常有用,但你能用这些做些什么呢? To get to that I must introduce the canvas primitives…

The Primitives

这些原语本身就是签名对象,因此它们可以以多种方式组合以组成复杂的工作流。

Note

这些示例检索结果,因此要尝试它们,您需要配置结果后端。 上面的示例项目已经这样做了(参见Celery的后端参数)。

Let’s look at some examples:

Groups

group并行调用任务列表,它返回一个特殊的结果实例,允许您将结果作为一组进行检查,并按顺序检索返回值。

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • Partial group
>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains

任务可以链接在一起,以便在一个任务返回后调用另一个任务:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

or a partial chain:

>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

Chains can also be written like this:

>>> (add.s(4, 4) | mul.s(8))().get()
64
Chords

chord 是一个带回调的组:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90

链接到另一个任务的组将自动转换为 chord:

>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90

Since these primitives are all of the signature type they can be combined almost however you want, for example:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

请务必在Canvas用户指南中阅读有关工作流程的更多信息。

Routing

Celery支持AMQP提供的所有路由功能,但它也支持将消息发送到命名队列的简单路由。

task_routes设置使您可以按名称路由任务,并将所有内容集中在一个位置:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

您还可以在运行时使用apply_asyncqueue参数指定队列:

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

然后,您可以通过指定celery worker -Q选项使工作人员从此队列中消耗:

$ celery -A proj worker -Q hipri

您可以使用逗号分隔列表指定多个队列,例如,您可以使工作程序使用默认队列和hipri队列,其中默认队列名为celery 由于历史原因:

$ celery -A proj worker -Q hipri,celery

队列的顺序无关紧要,因为工作人员将给予队列相同的权重。

要了解有关路由的更多信息,包括使用AMQP路由的全部功能,请参阅Routing Guide

Remote Control

如果您使用RabbitMQ(AMQP),Redis或Qpid作为代理,那么您可以在运行时控制和检查worker。

例如,您可以看到工作人员当前正在处理的任务:

$ celery -A proj inspect active

这是通过使用广播消息传递实现的,因此群集中的每个工作人员都会接收所有远程控制命令。

您还可以使用 - destination选项指定一个或多个工作人员来处理请求。 这是以逗号分隔的工作人员主机名列表:

$ celery -A proj inspect active --destination=celery@example.com

如果未提供目的地,则每个工作人员将采取行动并回复该请求。

celery inspect命令包含的命令不会更改worker中的任何内容,它只会回复有关worker内部内容的信息和统计信息。 有关可以执行的检查命令列表:

$ celery -A proj inspect --help

然后是celery control命令,它包含在运行时实际更改worker中的内容的命令:

$ celery -A proj control --help

例如,您可以强制工作人员启用事件消息(用于监视任务和工作人员):

$ celery -A proj control enable_events

启用事件后,您可以启动事件转储程序以查看工作人员正在执行的操作:

$ celery -A proj events --dump

or you can start the curses interface:

$ celery -A proj events

when you’re finished monitoring you can disable events again:

$ celery -A proj control disable_events

The celery status command also uses remote control commands and shows a list of online workers in the cluster:

$ celery -A proj status

You can read more about the celery command and monitoring in the Monitoring Guide.

Timezone

All times and dates, internally and in messages uses the UTC timezone.

When the worker receives a message, for example with a countdown set it converts that UTC time to local time. If you wish to use a different timezone than the system timezone then you must configure that using the timezone setting:

app.conf.timezone = 'Europe/London'
Optimization

默认情况下,默认配置未针对吞吐量进行优化,它尝试在许多短任务和较少长任务之间中途行走,这是吞吐量和公平调度之间的折衷。

如果您有严格的公平计划要求,或者想要优化吞吐量,那么您应该阅读优化指南

If you’re using RabbitMQ then you can install the librabbitmq module: this is an AMQP client implemented in C:

$ pip install librabbitmq
What to do now?

现在您已阅读本文档,您应继续阅读用户指南

如果您愿意,还有一个API参考

Resources

Getting Help
Mailing list

有关Celery的使用,开发和未来的讨论,请加入celery-users邮件列表。

IRC

快来与IRC聊聊吧。 The #celery channel is located at the Freenode network.

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/celery/celery/issues/

Contributing

Development of celery happens at GitHub: https://github.com/celery/celery

You’re highly encouraged to participate in the development of celery. If you don’t like GitHub (for some reason) you’re welcome to send regular patches.

Be sure to also read the Contributing to Celery section in the documentation.

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

User Guide

Release:4.3
Date:Apr 02, 2019

Application

Celery库必须在使用前进行实例化,此实例称为应用程序(或简称app)。

该应用程序是线程安全的,因此具有不同配置,组件和任务的多个Celery应用程序可以在同一个进程空间中共存。

Let’s create one now:

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行显示了应用程序的文本表示:包括app类的名称(Celery),当前主模块的名称(__ main __)和内存对象的地址(0x100469fd0)。

Main Name

其中只有一个很重要,那就是主要的模块名称。 让我们来看看为什么会这样。

当您在Celery中发送任务消息时,该消息将不包含任何源代码,而只包含您要执行的任务的名称。 这与主机名在互联网上的工作方式类似:每个工作人员都维护任务名称到其实际功能的映射,称为任务注册表

无论何时定义任务,该任务也将添加到本地注册表:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

在那里你再次看到__ main __;每当Celery无法检测到该函数属于哪个模块时,它就会使用主模块名称来生成任务名称的开头。

这只是一组有限用例中的问题:

  1. 如果定义任务的模块作为程序运行。
  2. 如果应用程序是在Python shell(REPL)中创建的。

例如,在这里,任务模块也用于通过app.worker_main()启动一个worker:

tasks.py:

from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

执行此模块时,任务将以“__ main __”开头命名,但当模块由另一个进程导入时,比如调用任务时,任务将以“”开头命名任务“(模块的真实名称):

>>> from tasks import add
>>> add.name
tasks.add

您可以为主模块指定另一个名称:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

See also

Names

Configuration

您可以设置几个选项来改变Celery的工作方式。 这些选项可以直接在应用程序实例上设置,也可以使用专用配置模块。

The configuration is available as app.conf:

>>> app.conf.timezone
'Europe/London'

您还可以在其中直接设置配置值:

>>> app.conf.enable_utc = True

或使用update方法一次更新多个键:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

配置对象由按顺序查阅的多个字典组成:

  1. Changes made at run-time.
  2. The configuration module (if any)
  3. The default configuration (celery.app.defaults).

您甚至可以使用app.add_defaults()方法添加新的默认源。

See also

转至Configguration reference以获取所有可用设置及其默认值的完整列表。

config_from_object

app.config_from_object()方法从配置对象加载配置。

这可以是配置模块,也可以是具有配置属性的任何对象。

请注意,在调用config_from_object()时,将重置先前设置的任何配置。 如果要设置其他配置,则应在之后执行此操作。

示例1:使用模块名称

app.config_from_object()方法可以获取Python模块的完全限定名称,甚至是Python属性的名称,例如:“celeryconfig”“myproj.config.celery”“myproj.config:CeleryConfig”

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

celeryconfig模块可能如下所示:

celeryconfig.py:

enable_utc = True
timezone = 'Europe/London'

只要import celeryconfig是可能的,应用程序就可以使用它。

示例2:传递实际模块对象

您也可以传递已导入的模块对象,但并不总是建议这样做。

Tip

建议使用模块名称,因为这意味着在使用prefork池时不需要序列化模块。 如果您遇到配置问题或pickle错误,请尝试使用模块的名称。

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)
示例3:使用配置类/对象
from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')
config_from_envvar

app.config_from_envvar()从环境变量中获取配置模块名称

例如 - 从名为 CELERY_CONFIG_MODULE的环境变量中指定的模块加载配置:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

然后,您可以指定要通过环境使用的配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
Censored configuration

如果您想要打印配置,作为调试信息或类似信息,您可能还想过滤掉密码和API密钥等敏感信息。

Celery comes with several utilities useful for presenting the configuration, one is humanize():

>>> app.conf.humanize(with_defaults=False, censored=True)

此方法将配置作为列表字符串返回。 默认情况下,这仅包含对配置的更改,但您可以通过启用with_defaults参数来包含内置的默认键和值。

如果您希望将配置作为字典使用,则可以使用table()方法:

>>> app.conf.table(with_defaults=False, censored=True)

请注意,Celery将无法删除所有敏感信息,因为它仅使用正则表达式来搜索通常命名的密钥。 如果添加包含敏感信息的自定义设置,则应使用Celery将其标识为机密的名称来命名密钥。

如果名称包含以下任何子字符串,则将审查配置设置:

API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE

Laziness

应用程序实例是惰性的,这意味着在实际需要之前不会对其进行评估。

创建Celery实例只会执行以下操作:

  1. 创建用于事件的逻辑时钟实例。
  2. 创建任务注册表。
  3. 将自身设置为当前应用程序(但如果set_as_current参数被禁用则不会)
  4. 调用app.on_init()回调(默认情况下不执行任何操作)。

app.task()装饰器不会在定义任务时创建任务,而是将任务创建推迟到使用任务时或之后申请已经最终确定

此示例显示在您使用任务或访问属性(在本例中为repr())之前,如何创建任务:

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

应用程序的Finalization通过调用app.finalize()显式发生 - 或通过访问app.tasks属性隐式发生。

Finalizing the object will:

  1. Copy tasks that must be shared between apps

    Tasks are shared by default, but if the shared argument to the task decorator is disabled, then the task will be private to the app it’s bound to.

  2. Evaluate all pending task decorators.

  3. Make sure all tasks are bound to the current app.

    Tasks are bound to an app so that they can read default values from the configuration.

The “default app”

Celery didn’t always have applications, it used to be that there was only a module-based API, and for backwards compatibility the old API is still there until the release of Celery 5.0.

Celery always creates a special app - the “default app”, and this is used if no custom application has been instantiated.

The celery.task module is there to accommodate the old API, and shouldn’t be used if you use a custom app. You should always use the methods on the app instance, not the module based API.

For example, the old Task base class enables many compatibility features where some may be incompatible with newer features, such as task methods:

from celery.task import Task   # << OLD Task base class.

from celery import Task        # << NEW base class.

The new base class is recommended even if you use the old module-based API.

Breaking the chain

虽然可以依赖于当前设置的应用程序,但最佳做法是始终将应用程序实例传递给需要它的任何内容。

我称之为“app chain”,因为它根据传递的应用程序创建了一系列实例。

以下示例被视为不良做法:

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app

相反,它应该将app作为参数:

class Scheduler(object):

    def __init__(self, app):
        self.app = app

内部Celery使用celery.app.app_or_default()函数,以便所有内容也适用于基于模块的兼容性API

from celery.app import app_or_default

class Scheduler(object):
    def __init__(self, app=None):
        self.app = app_or_default(app)

在开发中,您可以设置 CELERY_TRACE_APP环境变量,以便在应用程序链断开时引发异常:

$ CELERY_TRACE_APP=1 celery worker -l info

Evolving the API

Celery has changed a lot from 2009 since it was initially created.

例如,在开始时可以使用任何可调用的任务:

def hello(to):
    return 'hello {0}'.format(to)

>>> from celery.execute import apply_async

>>> apply_async(hello, ('world!',))

或者您也可以创建任务类来设置某些选项,或覆盖其他行为

from celery.task import Task
from celery.registry import tasks

class Hello(Task):
    queue = 'hipri'

    def run(self, to):
        return 'hello {0}'.format(to)
tasks.register(Hello)

>>> Hello.delay('world!')

Later, it was decided that passing arbitrary call-able’s was an anti-pattern, since it makes it very hard to use serializers other than pickle, and the feature was removed in 2.0, replaced by task decorators:

from celery.task import task

@task(queue='hipri')
def hello(to):
    return 'hello {0}'.format(to)
Abstract Tasks

使用task()装饰器创建的所有任务都将从应用程序的base Task类继承。

You can specify a different base class using the base argument:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

To create a custom task class you should inherit from the neutral base class: celery.Task.

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)

Tip

If you override the tasks __call__ method, then it’s very important that you also call super so that the base call method can set up the default request used when a task is called directly.

The neutral base class is special because it’s not bound to any specific app yet. Once a task is bound to an app it’ll read configuration to set default values, and so on.

To realize a base class you need to create a task using the app.task() decorator:

@app.task(base=DebugTask)
def add(x, y):
    return x + y

It’s even possible to change the default base class for an application by changing its app.Task() attribute:

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]

Tasks

Tasks 是Celery应用程序的构建块。

任务是可以从任何可调用创建的类。 它扮演双重角色,既定义了调用任务(发送消息)时发生的情况,又定义了工作人员收到该消息时发生的情况。

每个任务类都有一个唯一的名称,该名称在消息中引用,以便工作人员可以找到要执行的正确函数。

A task message is not removed from the queue until that message has been acknowledged by a worker. A worker can reserve many messages in advance and even if the worker is killed – by power failure or some other reason – the message will be redelivered to another worker.

Ideally task functions should be idempotent: meaning the function won’t cause unintended effects even if called multiple times with the same arguments. Since the worker cannot detect if your tasks are idempotent, the default behavior is to acknowledge the message in advance, just before it’s executed, so that a task invocation that already started is never executed again.

If your task is idempotent you can set the acks_late option to have the worker acknowledge the message after the task returns instead. See also the FAQ entry Should I use retry or acks_late?.

Note that the worker will acknowledge the message if the child process executing the task is terminated (either by the task calling sys.exit(), or by signal) even when acks_late is enabled. This behavior is by purpose as…

  1. We don’t want to rerun tasks that forces the kernel to send a SIGSEGV (segmentation fault) or similar signals to the process.
  2. We assume that a system administrator deliberately killing the task does not want it to automatically restart.
  3. A task that allocates too much memory is in danger of triggering the kernel OOM killer, the same may happen again.
  4. A task that always fails when redelivered may cause a high-frequency message loop taking down the system.

If you really want a task to be redelivered in these scenarios you should consider enabling the task_reject_on_worker_lost setting.

Warning

无限期阻塞的任务最终可能会阻止工作实例执行任何其他工作。

如果您的任务执行I / O操作,请确保将超时添加到这些操作中,例如使用requests库将超时添加到Web请求中:

connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))

Time limits are convenient for making sure all tasks return in a timely manner, but a time limit event will actually kill the process by force so only use them to detect cases where you haven’t used manual timeouts yet.

The default prefork pool scheduler is not friendly to long-running tasks, so if you have tasks that run for minutes/hours make sure you enable the -Ofair command-line argument to the celery worker. See Prefork pool prefetch settings for more information, and for the best performance route long-running and short-running tasks to dedicated workers (Automatic routing).

If your worker hangs then please investigate what tasks are running before submitting an issue, as most likely the hanging is caused by one or more tasks hanging on a network operation.

在本章中,您将学习有关定义任务的所有内容,这是目录

Basics

您可以使用task()装饰器轻松地从任何可调用的任务创建任务:

from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

还可以为任务设置许多options,这些可以指定为装饰器的参数:

@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)
Bound tasks

A task being bound means the first argument to the task will always be the task instance (self), just like Python bound methods:

logger = get_task_logger(__name__)

@task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)

重试需要绑定任务(使用app.Task.retry()),以访问有关当前任务请求的信息,以及添加到自定义任务基类的任何其他功能。

Task inheritance

任务装饰器的base参数指定任务的基类:

import celery

class MyTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(base=MyTask)
def add(x, y):
    raise KeyError()
Names

每个任务都必须具有唯一的名称。

如果没有提供明确的名称,任务装饰器将为您生成一个,该名称将基于1)定义任务的模块,以及2)任务函数的名称。

Example setting explicit name:

>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y

>>> add.name
'sum-of-two-numbers'

最佳做法是使用模块名称作为名称空间,这样,如果已经在另一个模块中定义了具有该名称的任务,则名称不会发生冲突。

>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

您可以通过调查其.name属性来告知任务的名称:

>>> add.name
'tasks.add'

我们在此处指定的名称(tasks.add)正是在名为tasks.py的模块中定义任务时为我们自动生成的名称:

tasks.py:

@app.task
def add(x, y):
    return x + y
>>> from tasks import add
>>> add.name
'tasks.add'
Automatic naming and relative imports

相对导入和自动名称生成不能很好地结合在一起,因此如果您使用相对导入,则应明确设置名称。

例如,如果客户端将模块“myapp.tasks”导入“。tasks”,则工作人员将模块导入为“myapp.tasks”,生成的名称将不匹配,并且工作人员将引发NotRegistered错误。

INSTALLED_APPS中使用Django并使用project.myapp -style命名时也是如此:

INSTALLED_APPS = ['project.myapp']

如果以名称project.myapp安装应用程序,则任务模块将作为project.myapp.tasks导入,因此您必须确保始终使用导入任务同名:

>>> from project.myapp.tasks import mytask   # << GOOD

>>> from myapp.tasks import mytask    # << BAD!!!

第二个示例将导致任务命名不同,因为worker和客户端以不同的名称导入模块:

>>> from project.myapp.tasks import mytask
>>> mytask.name
'project.myapp.tasks.mytask'

>>> from myapp.tasks import mytask
>>> mytask.name
'myapp.tasks.mytask'

因此,您必须在导入模块方面保持一致,这也是Python的最佳实践。

同样,您不应该使用旧式相对导入:

from module import foo   # BAD!

from proj.module import foo  # GOOD!

新式相对进口很好,可以使用:

from .module import foo  # GOOD!

如果您希望将Celery与已经广泛使用这些模式的项目一起使用,并且您没有时间重构现有代码,那么您可以考虑明确指定名称而不是依赖于自动命名:

@task(name='proj.tasks.add')
def add(x, y):
    return x + y
Changing the automatic naming behavior

New in version 4.0.

在某些情况下,默认自动命名不合适。 考虑在许多不同的模块中有许多任务:

project/
       /__init__.py
       /celery.py
       /moduleA/
               /__init__.py
               /tasks.py
       /moduleB/
               /__init__.py
               /tasks.py

使用默认的自动命名,每个任务都有一个生成的名称,如moduleA.tasks.taskAmoduleA.tasks.taskBmoduleB.tasks.test ,依此类推。 You may want to get rid of having tasks in all task names. As pointed above, you can explicitly give names for all tasks, or you can change the automatic naming behavior by overriding app.gen_task_name(). Continuing with the example, celery.py may contain:

from celery import Celery

class MyCelery(Celery):

    def gen_task_name(self, name, module):
        if module.endswith('.tasks'):
            module = module[:-6]
        return super(MyCelery, self).gen_task_name(name, module)

app = MyCelery('main')

因此,每个任务都有一个名称,如moduleA.taskAmoduleA.taskBmoduleB.test

Warning

Make sure that your app.gen_task_name() is a pure function: meaning that for the same input it must always return the same output.

Task Request

app.Task.request包含与当前正在执行的任务有关的信息和状态。

The request defines the following attributes:

id:The unique id of the executing task.
group:The unique id of the task’s group, if this task is a member.
chord:The unique id of the chord this task belongs to (if the task is part of the header).
correlation_id:Custom ID used for things like de-duplication.
args:Positional arguments.
kwargs:Keyword arguments.
origin:Name of host that sent this task.
retries:How many times the current task has been retried. An integer starting at 0.
is_eager:Set to True if the task is executed locally in the client, not by a worker.
eta:The original ETA of the task (if any). This is in UTC time (depending on the enable_utc setting).
expires:The original expiry time of the task (if any). This is in UTC time (depending on the enable_utc setting).
hostname:Node name of the worker instance executing the task.
delivery_info:Additional message delivery information. This is a mapping containing the exchange and routing key used to deliver this task. Used by for example app.Task.retry() to resend the task to the same destination queue. Availability of keys in this dict depends on the message broker used.
reply-to:Name of queue to send replies back to (used with RPC result backend for example).
called_directly:
 This flag is set to true if the task wasn’t executed by the worker.
timelimit:A tuple of the current (soft, hard) time limits active for this task (if any).
callbacks:A list of signatures to be called if this task returns successfully.
errback:A list of signatures to be called if this task fails.
utc:Set to true the caller has UTC enabled (enable_utc).

New in version 3.1.

headers:Mapping of message headers sent with this task message (may be None).
reply_to:Where to send reply to (queue name).
correlation_id:Usually the same as the task id, often used in amqp to keep track of what a reply is for.

New in version 4.0.

root_id:The unique id of the first task in the workflow this task is part of (if any).
parent_id:The unique id of the task that called this task (if any).
chain:Reversed list of tasks that form a chain (if any). The last item in this list will be the next task to succeed the current task. If using version one of the task protocol the chain tasks will be in request.callbacks instead.
Example

访问上下文中的信息的示例任务是:

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
            self.request))

bind参数表示该函数将是“绑定方法”,以便您可以访问任务类型实例上的属性和方法。

Logging

工作人员将自动为您设置日志记录,或者您可以手动配置日志记录。

可以使用名为“celery.task”的特殊记录器,您可以从此记录器继承自动获取任务名称和唯一ID作为日志的一部分。

最佳做法是为模块顶部的所有任务创建一个通用记录器:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Celery使用标准的Python记录器库,可以在这里找到文档

您也可以使用print(),因为写入标准输出/ -err的任何内容都将被重定向到日志记录系统(您可以禁用此功能,请参阅worker_redirect_stdouts)。

Note

如果在任务或任务模块中的某处创建记录器实例,则工作人员不会更新重定向。

如果要将sys.stdoutsys.stderr重定向到自定义记录器,则必须手动启用它,例如:

import sys

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    old_outs = sys.stdout, sys.stderr
    rlevel = self.app.conf.worker_redirect_stdouts_level
    try:
        self.app.log.redirect_stdouts_to_logger(logger, rlevel)
        print('Adding {0} + {1}'.format(x, y))
        return x + y
    finally:
        sys.stdout, sys.stderr = old_outs

Note

If a specific Celery logger you need is not emitting logs, you should check that the logger is propagating properly. In this example “celery.app.trace” is enabled so that “succeeded in” logs are emitted:

import celery
import logging

@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
    logger = logging.getLogger('celery')
    logger.propagate = True
    logger = logging.getLogger('celery.app.trace')
    logger.propagate = True
Argument checking

New in version 4.0.

Celery将验证调用任务时传递的参数,就像Python在调用普通函数时所做的那样:

>>> @app.task
... def add(x, y):
...     return x + y

# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

您可以通过将输入属性设置为False来禁用任何任务的参数检查:

>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

# Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
Hiding sensitive information in arguments

New in version 4.0.

使用task_protocol 2或更高版本(默认自4.0以来)时,您可以使用argsreprkwargsrepr覆盖日志和监视事件中位置参数和关键字参数的表示方式调用参数:

>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

>>> charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

Warning

Sensitive information will still be accessible to anyone able to read your task message from the broker, or otherwise able intercept it.

For this reason you should probably encrypt your message if it contains sensitive information, or in this example with a credit card number the actual number could be stored encrypted in a secure store that you retrieve and decrypt in the task itself.

Retrying

app.Task.retry()可用于重新执行任务,例如在发生可恢复错误的情况下。

When you call retry it’ll send a new message, using the same task-id, and it’ll take care to make sure the message is delivered to the same queue as the originating task.

When a task is retried this is also recorded as a task state, so that you can track the progress of the task using the result instance (see States).

Here’s an example using retry:

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

Note

The app.Task.retry() call will raise an exception so any code after the retry won’t be reached. This is the Retry exception, it isn’t handled as an error but rather as a semi-predicate to signify to the worker that the task is to be retried, so that it can store the correct state when a result backend is enabled.

This is normal operation and always happens unless the throw argument to retry is set to False.

The bind argument to the task decorator will give access to self (the task type instance).

The exc argument is used to pass exception information that’s used in logs, and when storing task results. Both the exception and the traceback will be available in the task state (if a result backend is enabled).

If the task has a max_retries value the current exception will be re-raised if the max number of retries has been exceeded, but this won’t happen if:

  • An exc argument wasn’t given.

    In this case the MaxRetriesExceededError exception will be raised.

  • There’s no current exception

    If there’s no original exception to re-raise the exc argument will be used instead, so:

    self.retry(exc=Twitter.LoginError())
    

    will raise the exc argument given.

使用自定义重试延迟

When a task is to be retried, it can wait for a given amount of time before doing so, and the default delay is defined by the default_retry_delay attribute. By default this is set to 3 minutes. Note that the unit for setting the delay is in seconds (int or float).

You can also provide the countdown argument to retry() to override this default.

@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        something_raising()
    except Exception as exc:
        # overrides the default delay to retry after 1 minute
        raise self.retry(exc=exc, countdown=60)
Automatic retry for known exceptions

New in version 4.0.

Sometimes you just want to retry a task whenever a particular exception is raised.

Fortunately, you can tell Celery to automatically retry a task using autoretry_for argument in the task() decorator:

from twitter.exceptions import FailWhaleError

@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

If you want to specify custom arguments for an internal retry() call, pass retry_kwargs argument to task() decorator:

@app.task(autoretry_for=(FailWhaleError,),
          retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

This is provided as an alternative to manually handling the exceptions, and the example above will do the same as wrapping the task body in a tryexcept statement:

@app.task
def refresh_timeline(user):
    try:
        twitter.refresh_timeline(user)
    except FailWhaleError as exc:
        raise div.retry(exc=exc, max_retries=5)

If you want to automatically retry on any error, simply use:

@app.task(autoretry_for=(Exception,))
def x():
    ...

New in version 4.2.

If your tasks depend on another service, like making a request to an API, then it’s a good idea to use exponential backoff to avoid overwhelming the service with your requests. Fortunately, Celery’s automatic retry support makes it easy. Just specify the retry_backoff argument, like this:

from requests.exceptions import RequestException

@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
    ...

By default, this exponential backoff will also introduce random jitter to avoid having all the tasks run at the same moment. It will also cap the maximum backoff delay to 10 minutes. All these settings can be customized via options documented below.

Task.autoretry_for

A list/tuple of exception classes. If any of these exceptions are raised during the execution of the task, the task will automatically be retried. By default, no exceptions will be autoretried.

Task.retry_kwargs

A dictionary. Use this to customize how autoretries are executed. Note that if you use the exponential backoff options below, the countdown task option will be determined by Celery’s autoretry system, and any countdown included in this dictionary will be ignored.

Task.retry_backoff

A boolean, or a number. If this option is set to True, autoretries will be delayed following the rules of exponential backoff. The first retry will have a delay of 1 second, the second retry will have a delay of 2 seconds, the third will delay 4 seconds, the fourth will delay 8 seconds, and so on. (However, this delay value is modified by retry_jitter, if it is enabled.) If this option is set to a number, it is used as a delay factor. For example, if this option is set to 3, the first retry will delay 3 seconds, the second will delay 6 seconds, the third will delay 12 seconds, the fourth will delay 24 seconds, and so on. By default, this option is set to False, and autoretries will not be delayed.

Task.retry_backoff_max

A number. If retry_backoff is enabled, this option will set a maximum delay in seconds between task autoretries. By default, this option is set to 600, which is 10 minutes.

Task.retry_jitter

A boolean. Jitter is used to introduce randomness into exponential backoff delays, to prevent all tasks in the queue from being executed simultaneously. If this option is set to True, the delay value calculated by retry_backoff is treated as a maximum, and the actual delay value will be a random number between zero and that maximum. By default, this option is set to True.

选项列表

The task decorator can take a number of options that change the way the task behaves, for example you can set the rate limit for a task using the rate_limit option.

Any keyword argument passed to the task decorator will actually be set as an attribute of the resulting task class, and this is a list of the built-in attributes.

General
Task.name

The name the task is registered as.

You can set this name manually, or a name will be automatically generated using the module and class name.

See also Names.

Task.request

If the task is being executed this will contain information about the current request. Thread local storage is used.

See Task Request.

Task.max_retries

Only applies if the task calls self.retry or if the task is decorated with the autoretry_for argument.

The maximum number of attempted retries before giving up. If the number of retries exceeds this value a MaxRetriesExceededError exception will be raised.

Note

You have to call retry() manually, as it won’t automatically retry on exception..

The default is 3. A value of None will disable the retry limit and the task will retry forever until it succeeds.

Task.throws

Optional tuple of expected error classes that shouldn’t be regarded as an actual error.

Errors in this list will be reported as a failure to the result backend, but the worker won’t log the event as an error, and no traceback will be included.

Example:

@task(throws=(KeyError, HttpNotFound)):
def get_foo():
    something()

Error types:

  • Expected errors (in Task.throws)

    Logged with severity INFO, traceback excluded.

  • Unexpected errors

    Logged with severity ERROR, with traceback included.

Task.default_retry_delay

Default time in seconds before a retry of the task should be executed. Can be either int or float. Default is a three minute delay.

Task.rate_limit

Set the rate limit for this task type (limits the number of tasks that can be run in a given time frame). Tasks will still complete when a rate limit is in effect, but it may take some time before it’s allowed to start.

If this is None no rate limit is in effect. If it is an integer or float, it is interpreted as “tasks per second”.

The rate limits can be specified in seconds, minutes or hours by appending “/s”, “/m” or “/h” to the value. Tasks will be evenly distributed over the specified time frame.

Example: “100/m” (hundred tasks a minute). This will enforce a minimum delay of 600ms between starting two tasks on the same worker instance.

Default is the task_default_rate_limit setting: if not specified means rate limiting for tasks is disabled by default.

Note that this is a per worker instance rate limit, and not a global rate limit. To enforce a global rate limit (e.g., for an API with a maximum number of requests per second), you must restrict to a given queue.

Task.time_limit

The hard time limit, in seconds, for this task. When not set the workers default is used.

Task.soft_time_limit

The soft time limit for this task. When not set the workers default is used.

Task.ignore_result

Don’t store task state. Note that this means you can’t use AsyncResult to check if the task is ready, or get its return value.

Task.store_errors_even_if_ignored

If True, errors will be stored even if the task is configured to ignore results.

Task.serializer

A string identifying the default serialization method to use. Defaults to the task_serializer setting. Can be pickle, json, yaml, or any custom serialization methods that have been registered with kombu.serialization.registry.

Please see Serializers for more information.

Task.compression

A string identifying the default compression scheme to use.

Defaults to the task_compression setting. Can be gzip, or bzip2, or any custom compression schemes that have been registered with the kombu.compression registry.

Please see Compression for more information.

Task.backend

The result store backend to use for this task. An instance of one of the backend classes in celery.backends. Defaults to app.backend, defined by the result_backend setting.

Task.acks_late

If set to True messages for this task will be acknowledged after the task has been executed, not just before (the default behavior).

Note: This means the task may be executed multiple times should the worker crash in the middle of execution. Make sure your tasks are idempotent.

The global default can be overridden by the task_acks_late setting.

Task.track_started

If True the task will report its status as “started” when the task is executed by a worker. The default value is False as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried. Having a “started” status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The host name and process id of the worker executing the task will be available in the state meta-data (e.g., result.info[‘pid’])

The global default can be overridden by the task_track_started setting.

See also

The API reference for Task.

States

芹菜可以跟踪当前状态的任务。 状态还包含成功任务的结果,或失败任务的异常和回溯信息。

有几个结果后端可供选择,它们都有不同的优点和缺点(参见结果后端)。

在其生命周期中,任务将转换为几种可能的状态,并且每个状态可以具有附加到其上的任意元数据。 当任务进入新状态时,前一状态被遗忘,但可以推断出一些转换(例如,现在处于FAILED状态的任务,暗示已经在在某些时候开始状态)。

还有一组状态,如FAILURE_STATES的集合,以及READY_STATES的集合。

客户端使用这些集合的成员资格来决定是否应该重新引发异常(PROPAGATE_STATES),或者是否可以缓存状态(如果任务已准备就可以)。

You can also define Custom states.

Result Backends

如果您想跟踪任务或需要返回值,那么Celery必须在某处存储或发送状态,以便以后可以检索它们。 有几个内置的结果后端可供选择:SQLAlchemy / Django ORM,Memcached,RabbitMQ / QPid(rpc)和Redis - 或者您可以定义自己的。

No backend works well for every use case. You should read about the strengths and weaknesses of each backend, and choose the most appropriate for your needs.

Warning

Backends use resources to store and transmit results. To ensure that resources are released, you must eventually call get() or forget() on EVERY AsyncResult instance returned after calling a task.

RPC Result Backend (RabbitMQ/QPid)

The RPC result backend (rpc://) is special as it doesn’t actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once, and only by the client that initiated the task. Two different processes can’t wait for the same result.

Even with that limitation, it is an excellent choice if you need to receive state changes in real-time. Using messaging means the client doesn’t have to poll for new states.

The messages are transient (non-persistent) by default, so the results will disappear if the broker restarts. You can configure the result backend to send persistent messages using the result_persistent setting.

Database Result Backend

Keeping state in the database can be convenient for many, especially for web applications with a database already in place, but it also comes with limitations.

  • Polling the database for new states is expensive, and so you should increase the polling intervals of operations, such as result.get().

  • Some databases use a default transaction isolation level that isn’t suitable for polling tables for changes.

    In MySQL the default transaction isolation level is REPEATABLE-READ: meaning the transaction won’t see changes made by other transactions until the current transaction is committed.

    Changing that to the READ-COMMITTED isolation level is recommended.

Built-in States
PENDING

Task is waiting for execution or unknown. Any task id that’s not known is implied to be in the pending state.

STARTED

Task has been started. Not reported by default, to enable please see app.Task.track_started.

meta-data:pid and hostname of the worker process executing the task.
SUCCESS

Task has been successfully executed.

meta-data:result contains the return value of the task.
propagates:Yes
ready:Yes
FAILURE

Task execution resulted in failure.

meta-data:result contains the exception occurred, and traceback contains the backtrace of the stack at the point when the exception was raised.
propagates:Yes
RETRY

Task is being retried.

meta-data:result contains the exception that caused the retry, and traceback contains the backtrace of the stack at the point when the exceptions was raised.
propagates:No
REVOKED

Task has been revoked.

propagates:Yes
Custom states

You can easily define your own states, all you need is a unique name. The name of the state is usually an uppercase string. As an example you could have a look at the abortable tasks which defines a custom ABORTED state.

Use update_state() to update a task’s state:.

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

Here I created the state “PROGRESS”, telling any application aware of this state that the task is currently in progress, and also where it is in the process by having current and total counts as part of the state meta-data. This can then be used to create progress bars for example.

Creating pickleable exceptions

A rarely known Python fact is that exceptions must conform to some simple rules to support being serialized by the pickle module.

Tasks that raise exceptions that aren’t pickleable won’t work properly when Pickle is used as the serializer.

To make sure that your exceptions are pickleable the exception MUST provide the original arguments it was instantiated with in its .args attribute. The simplest way to ensure this is to have the exception call Exception.__init__.

Let’s look at some examples that work, and one that doesn’t:

# OK:
class HttpError(Exception):
    pass

# BAD:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code

# OK:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code
        Exception.__init__(self, status_code)  # <-- REQUIRED

So the rule is: For any exception that supports custom arguments *args, Exception.__init__(self, *args) must be used.

There’s no special support for keyword arguments, so if you want to preserve keyword arguments when the exception is unpickled you have to pass them as regular args:

class HttpError(Exception):

    def __init__(self, status_code, headers=None, body=None):
        self.status_code = status_code
        self.headers = headers
        self.body = body

        super(HttpError, self).__init__(status_code, headers, body)
Semipredicates

The worker wraps the task in a tracing function that records the final state of the task. There are a number of exceptions that can be used to signal this function to change how it treats the return of the task.

Ignore

该任务可能会引发Ignore以强制工作人员忽略该任务。 This means that no state will be recorded for the task, but the message is still acknowledged (removed from queue).

This can be used if you want to implement custom revoke-like functionality, or manually store the result of a task.

Example keeping revoked tasks in a Redis set:

from celery.exceptions import Ignore

@app.task(bind=True)
def some_task(self):
    if redis.ismember('tasks.revoked', self.request.id):
        raise Ignore()

Example that stores results manually:

from celery import states
from celery.exceptions import Ignore

@app.task(bind=True)
def get_tweets(self, user):
    timeline = twitter.get_timeline(user)
    if not self.request.called_directly:
        self.update_state(state=states.SUCCESS, meta=timeline)
    raise Ignore()
Reject

The task may raise Reject to reject the task message using AMQPs basic_reject method. This won’t have any effect unless Task.acks_late is enabled.

Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.

Reject can also be used to re-queue messages, but please be very careful when using this as it can easily result in an infinite message loop.

Example using reject when a task causes an out of memory condition:

import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
    file = get_file(path)
    try:
        renderer.render_scene(file)

    # if the file is too big to fit in memory
    # we reject it so that it's redelivered to the dead letter exchange
    # and we can manually inspect the situation.
    except MemoryError as exc:
        raise Reject(exc, requeue=False)
    except OSError as exc:
        if exc.errno == errno.ENOMEM:
            raise Reject(exc, requeue=False)

    # For any other error we retry after 10 seconds.
    except Exception as exc:
        raise self.retry(exc, countdown=10)

Example re-queuing the message:

from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def requeues(self):
    if not self.request.delivery_info['redelivered']:
        raise Reject('no reason', requeue=True)
    print('received two times')

Consult your broker documentation for more details about the basic_reject method.

Retry

The Retry exception is raised by the Task.retry method to tell the worker that the task is being retried.

Custom task classes

All tasks inherit from the app.Task class. The run() method becomes the task body.

As an example, the following code,

@app.task
def add(x, y):
    return x + y

will do roughly this behind the scenes:

class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]
Instantiation

A task is not instantiated for every request, but is registered in the task registry as a global instance.

This means that the __init__ constructor will only be called once per process, and that the task class is semantically closer to an Actor.

If you have a task,

from celery import Task

class NaiveAuthenticateServer(Task):

    def __init__(self):
        self.users = {'george': 'password'}

    def run(self, username, password):
        try:
            return self.users[username] == password
        except KeyError:
            return False

And you route every request to the same process, then it will keep state between requests.

This can also be useful to cache resources, For example, a base Task class that caches a database connection:

from celery import Task

class DatabaseTask(Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

that can be added to tasks like this:

@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        process_row(row)

然后,process_rows任务的db属性将始终在每个进程中保持不变。

Handlers
after_return(self, status, retval, task_id, args, kwargs, einfo)

Handler called after the task returns.

Parameters:
  • status – Current task state.
  • retval – Task return value/exception.
  • task_id – Unique id of the task.
  • args – Original arguments for the task that returned.
  • kwargs – Original keyword arguments for the task that returned.
Keyword Arguments:
 

einfoExceptionInfo instance, containing the traceback (if any).

The return value of this handler is ignored.

on_failure(self, exc, task_id, args, kwargs, einfo)

This is run by the worker when the task fails.

Parameters:
  • exc – The exception raised by the task.
  • task_id – Unique id of the failed task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
Keyword Arguments:
 

einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

on_retry(self, exc, task_id, args, kwargs, einfo)

This is run by the worker when the task is to be retried.

Parameters:
  • exc – The exception sent to retry().
  • task_id – Unique id of the retried task.
  • args – Original arguments for the retried task.
  • kwargs – Original keyword arguments for the retried task.
Keyword Arguments:
 

einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

on_success(self, retval, task_id, args, kwargs)

Run by the worker if the task executes successfully.

Parameters:
  • retval – The return value of the task.
  • task_id – Unique id of the executed task.
  • args – Original arguments for the executed task.
  • kwargs – Original keyword arguments for the executed task.

The return value of this handler is ignored.

Requests and custom requests

Upon receiving a message to run a task, the worker creates a request to represent such demand.

Custom task classes may override which request class to use by changing the attribute celery.app.task.Task.Request. You may either assign the custom request class itself, or its fully qualified name.

The request has several responsibilities. Custom request classes should cover them all – they are responsible to actually run and trace the task. We strongly recommend to inherit from celery.worker.request.Request.

When using the pre-forking worker, the methods on_timeout() and on_failure() are executed in the main worker process. An application may leverage such facility to detect failures which are not detected using celery.app.task.Task.on_failure().

As an example, the following custom request detects and logs hard time limits, and other failures.

import logging
from celery.worker.request import Request

logger = logging.getLogger('my.package')

class MyRequest(Request):
    'A minimal custom request to log failures and hard time limits.'

    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        super(Request, self).on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )
        logger.warning(
            'Failure detected for task %s',
            self.task.name
        )

class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

@app.task(base=MyTask)
def some_longrunning_task():
    # use your imagination
How it works

Here come the technical details. This part isn’t something you need to know, but you may be interested.

All defined tasks are listed in a registry. The registry contains a list of task names and their task classes. You can investigate this registry yourself:

>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
    <@task: celery.chord_unlock>,
 'celery.backend_cleanup':
    <@task: celery.backend_cleanup>,
 'celery.chord':
    <@task: celery.chord>}

This is the list of tasks built into Celery. Note that tasks will only be registered when the module they’re defined in is imported.

The default loader imports any modules listed in the imports setting.

The app.task() decorator is responsible for registering your task in the applications task registry.

When tasks are sent, no actual function code is sent with it, just the name of the task to execute. When the worker then receives the message it can look up the name in its task registry to find the execution code.

This means that your workers should always be updated with the same software as the client. This is a drawback, but the alternative is a technical challenge that’s yet to be solved.

Tips and Best Practices
Ignore results you don’t want

If you don’t care about the results of a task, be sure to set the ignore_result option, as storing results wastes time and resources.

@app.task(ignore_result=True)
def mytask():
    something()

Results can even be disabled globally using the task_ignore_result setting.

Results can be enabled/disabled on a per-execution basis, by passing the ignore_result boolean parameter, when calling apply_async or delay.

@app.task
def mytask(x, y):
    return x + y

# No result will be stored
result = mytask.apply_async(1, 2, ignore_result=True)
print result.get() # -> None

# Result will be stored
result = mytask.apply_async(1, 2, ignore_result=False)
print result.get() # -> 3

By default tasks will not ignore results (ignore_result=False) when a result backend is configured.

The option precedence order is the following:

  1. Global task_ignore_result
  2. ignore_result option
  3. Task execution option ignore_result
More optimization tips

You find additional optimization tips in the Optimizing Guide.

Avoid launching synchronous subtasks

Having a task wait for the result of another task is really inefficient, and may even cause a deadlock if the worker pool is exhausted.

Make your design asynchronous instead, for example by using callbacks.

Bad:

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

Good:

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

Here I instead created a chain of tasks by linking together different signature()’s. You can read about chains and other powerful constructs at Canvas: Designing Work-flows.

By default Celery will not allow you to run subtasks synchronously within a task, but in rare or extreme cases you might need to do so. WARNING: enabling subtasks to run synchronously is not recommended!

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get(disable_sync_subtasks=False)
    info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)
Performance and Strategies
Granularity

The task granularity is the amount of computation needed by each subtask. In general it is better to split the problem up into many small tasks rather than have a few long running tasks.

With smaller tasks you can process more tasks in parallel and the tasks won’t run long enough to block the worker from processing other waiting tasks.

However, executing a task does have overhead. A message needs to be sent, data may not be local, etc. So if the tasks are too fine-grained the overhead added probably removes any benefit.

See also

The book Art of Concurrency has a section dedicated to the topic of task granularity [AOC1].

[AOC1]Breshears, Clay. Section 2.2.1, “The Art of Concurrency”. O’Reilly Media, Inc. May 15, 2009. ISBN-13 978-0-596-52153-0.
Data locality

The worker processing the task should be as close to the data as possible. The best would be to have a copy in memory, the worst would be a full transfer from another continent.

If the data is far away, you could try to run another worker at location, or if that’s not possible - cache often used data, or preload data you know is going to be used.

The easiest way to share data between workers is to use a distributed cache system, like memcached.

See also

The paper Distributed Computing Economics by Jim Gray is an excellent introduction to the topic of data locality.

State

Since Celery is a distributed system, you can’t know which process, or on what machine the task will be executed. You can’t even know if the task will run in a timely manner.

The ancient async sayings tells us that “asserting the world is the responsibility of the task”. What this means is that the world view may have changed since the task was requested, so the task is responsible for making sure the world is how it should be; If you have a task that re-indexes a search engine, and the search engine should only be re-indexed at maximum every 5 minutes, then it must be the tasks responsibility to assert that, not the callers.

另一个陷阱是Django模型对象。 不应将它们作为任务的参数传递。 当任务正在运行时,从数据库中重新获取对象几乎总是更好的选择,因为使用旧数据可能会导致竞争状况。

想象一下以下情形,您有一篇文章和一项任务,该文章会自动在其中展开一些缩写:

class Article(models.Model):
    title = models.CharField()
    body = models.TextField()

@app.task
def expand_abbreviations(article):
    article.body.replace('MyCorp', 'My Corporation')
    article.save()

首先,作者创建文章并保存,然后作者单击启动缩写任务的按钮:

>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)

Now, the queue is very busy, so the task won’t be run for another 2 minutes. In the meantime another author makes changes to the article, so when the task is finally run, the body of the article is reverted to the old version because the task had the old body in its argument.

Fixing the race condition is easy, just use the article id instead, and re-fetch the article in the task body:

@app.task
def expand_abbreviations(article_id):
    article = Article.objects.get(id=article_id)
    article.body.replace('MyCorp', 'My Corporation')
    article.save()
>>> expand_abbreviations.delay(article_id)

这种方法甚至可能会带来性能优势,因为发送大型消息可能会很昂贵。

数据库事务

让我们看另一个例子:

from django.db import transaction

@transaction.commit_on_success
def create_article(request):
    article = Article.objects.create()
    expand_abbreviations.delay(article.pk)

这是Django视图,该视图在数据库中创建商品对象,然后将主键传递给任务。 它使用commit_on_success装饰器,该装饰器将在视图返回时提交事务,或者在视图引发异常时回滚。

如果任务在提交事务之前开始执行,则存在竞争条件;该数据库对象尚不存在!

解决方案是,一旦成功提交所有事务,就使用on_commit回调启动Celery任务。

from django.db.transaction import on_commit

def create_article(request):
    article = Article.objects.create()
    on_commit(lambda: expand_abbreviations.delay(article.pk))

Note

on_commit在Django 1.9及更高版本中可用,如果您使用的是该版本之前的版本,则django-transaction-hooks库为此添加了支持。

Example

让我们以一个真实世界为例:一个博客,其中发布的评论需要针对垃圾邮件进行过滤。 创建注释后,垃圾邮件过滤器将在后台运行,因此用户无需等待它完成。

我有一个Django博客应用程序,允许评论博客文章。 我将描述该应用程序的部分模型/视图和任务。

blog/models.py

The comment model looks like this:

from django.db import models
from django.utils.translation import ugettext_lazy as _


class Comment(models.Model):
    name = models.CharField(_('name'), max_length=64)
    email_address = models.EmailField(_('email address'))
    homepage = models.URLField(_('home page'),
                               blank=True, verify_exists=False)
    comment = models.TextField(_('comment'))
    pub_date = models.DateTimeField(_('Published date'),
                                    editable=False, auto_add_now=True)
    is_spam = models.BooleanField(_('spam?'),
                                  default=False, editable=False)

    class Meta:
        verbose_name = _('comment')
        verbose_name_plural = _('comments')

在发布评论的视图中,我首先将评论写入数据库,然后在后台启动垃圾邮件过滤器任务。

blog/views.py
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response

from blog import tasks
from blog.models import Comment


class CommentForm(forms.ModelForm):

    class Meta:
        model = Comment


def add_comment(request, slug, template_name='comments/create.html'):
    post = get_object_or_404(Entry, slug=slug)
    remote_addr = request.META.get('REMOTE_ADDR')

    if request.method == 'post':
        form = CommentForm(request.POST, request.FILES)
        if form.is_valid():
            comment = form.save()
            # Check spam asynchronously.
            tasks.spam_filter.delay(comment_id=comment.id,
                                    remote_addr=remote_addr)
            return HttpResponseRedirect(post.get_absolute_url())
    else:
        form = CommentForm()

    context = RequestContext(request, {'form': form})
    return render_to_response(template_name, context_instance=context)

要在评论中过滤垃圾邮件,我使用Akismet,该服务用于过滤发布到免费博客平台Wordpress的评论中的垃圾邮件。 Akismet是免费供个人使用,但对于商业用途,您需要付费。 您必须注册其服务才能获得API密钥。

要对Akismet进行API调用,我使用Michael Foord编写的akismet.py库。

blog/tasks.py
from celery import Celery

from akismet import Akismet

from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site

from blog.models import Comment


app = Celery(broker='amqp://')


@app.task
def spam_filter(comment_id, remote_addr=None):
    logger = spam_filter.get_logger()
    logger.info('Running spam filter for comment %s', comment_id)

    comment = Comment.objects.get(pk=comment_id)
    current_domain = Site.objects.get_current().domain
    akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
    if not akismet.verify_key():
        raise ImproperlyConfigured('Invalid AKISMET_KEY')


    is_spam = akismet.comment_check(user_ip=remote_addr,
                        comment_content=comment.comment,
                        comment_author=comment.name,
                        comment_author_email=comment.email_address)
    if is_spam:
        comment.is_spam = True
        comment.save()

    return is_spam

Calling Tasks

Basics

本文档描述了Celery的任务实例和canvas使用的统一“调用API”。

API定义了一组标准的执行选项,以及三种方法:

  • apply_async(args[, kwargs[, …]])

    Sends a task message.

  • delay(*args, **kwargs)

    发送任务消息的快捷方式,但不支持执行选项。

  • calling (__call__)

    应用支持调用API的对象(例如,add(2, 2))意味着该任务不会由工作人员执行,但在当前的过程中(不会发送消息)。

Quick Cheat Sheet

  • T.delay(arg, kwarg=value)
    星号参数快捷方式为.apply_async (.delay(*args, **kwargs) calls .apply_async(args, kwargs)).
  • T.apply_async((arg,), {'kwarg': value})
  • T.apply_async(countdown=10)
    executes in 10 seconds from now.
  • T.apply_async(eta=now + timedelta(seconds=10))
    executes in 10 seconds from now, specified using eta
  • T.apply_async(countdown=60, expires=120)
    executes in one minute from now, but expires after 2 minutes.
  • T.apply_async(expires=now + timedelta(days=2))
    expires in 2 days, set using datetime.
Example

delay()方法很方便,因为它看起来像调用常规函数:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

Using apply_async() instead you have to write:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

所以delay显然很方便,但如果要设置其他执行选项,则必须使用apply_async

本文档的其余部分将详细介绍任务执行选项。 所有示例都使用名为add的任务,返回两个参数的总和:

@app.task
def add(x, y):
    return x + y

There’s another way…

稍后您将在阅读Canvas时了解更多相关信息,但signature是用于传递任务调用签名的对象,(例如发送它通过网络),他们也支持Calling API:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
Linking (callbacks/errbacks)

Celery支持将任务链接在一起,以便一个任务跟随另一个。 回调任务将作为部分参数应用父任务的结果:

add.apply_async((2, 2), link=add.s(16))

这里,第一个任务(4)的结果将被发送到一个新任务,该任务将前一个结果加16,形成表达式 (2 + 2) + 16 = 20

如果任务引发异常(errback),也可以导致应用回调,但这与常规回调的行为不同,因为它将传递父任务的id,而不是结果。 这是因为可能无法始终序列化引发的异常,因此错误回调需要启用结果后端,并且任务必须检索任务的结果。

This is an example error callback:

@app.task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, exc, result.traceback))

可以使用link_error执行选项将其添加到任务中:

add.apply_async((2, 2), link_error=error_handler.s())

此外,linklink_error选项都可以表示为列表:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

然后将按顺序调用回调/错误回调,并且将使用父任务的返回值作为部分参数调用所有回调。

On message

Celery支持通过设置on_message回调来捕获所有状态更改。

例如,对于发送任务进度的长时间运行任务,您可以执行以下操作:

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
def on_raw_message(body):
    print(body)

r = hello.apply_async()
print(r.get(on_message=on_raw_message, propagate=False))

Will generate output like this:

{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 50},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 90},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': 'hello world: 10',
 'children': [],
 'status': 'SUCCESS',
 'traceback': None}
hello world: 10
ETA and Countdown

ETA(预计到达时间)允许您设置特定日期和时间,这是您执行任务的最早时间。 倒计时是将ETA设置为未来几秒的快捷方式。

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

保证在指定的日期和时间之后的某个时间执行该任务,但不一定在该确切的时间执行。 截止日期中断的可能原因可能包括队列中等待的许多项目,或者网络延迟时间过长。 为了确保您的任务及时执行,您应该监视队列是否拥塞。 使用Munin或类似工具接收警报,因此可以采取适当的措施来减轻工作量。 See Munin.

虽然countdown(倒计时)是一个整数,但eta必须是datetime对象,指定确切的日期和时间(包括毫秒精度和时区信息):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
Expiration

expires参数定义可选的到期时间,可以是任务发布后的秒数,也可以是使用datetime的特定日期和时间:

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

当工作程序收到过期任务时,它会将任务标记为REVOKEDTaskRevokedError)。

Message Sending Retry(消息发送重试)

如果连接失败,Celery将自动重试发送消息,并且可以配置重试行为 - 例如重试频率或最大重试次数 - 或者一起禁用。

要禁用重试,可以将retry(重试)执行选项设置为False

add.apply_async((2, 2), retry=False)
Retry Policy(重试策略)

重试策略是一种控制重试行为的映射,并且可以包含以下键:

  • max_retries

    放弃之前的最大重试次数,在这种情况下,将引发导致重试失败的异常。

    None表示它将永远重试。

    默认值为重试3次。

  • interval_start

    定义两次重试之间要等待的秒数(浮点数或整数)。 默认值为0(第一次重试是瞬时的)。

  • interval_step

    在每次连续重试时,此数字将被添加到重试延迟中(浮点数或整数)。 Default is 0.2.

  • interval_max

    重试之间等待的最大秒数(浮点数或整数)。 Default is 0.2.

例如,默认策略与以下内容相关:

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

the maximum time spent retrying will be 0.4 seconds. It’s set relatively short by default because a connection failure could lead to a retry pile effect if the broker connection is down – For example, many web server processes waiting to retry, blocking other incoming requests.

Connection Error Handling(连接错误处理)

发送任务并且消息传输连接丢失或无法启动连接时,将引发OperationalError错误:

>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 388, in delay
        return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 503, in apply_async
    **options
  File "celery/app/base.py", line 662, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/backends/rpc.py", line 275, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
    channel = self._channel = channel()
  File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
    self.sock.connect(sa)
  kombu.exceptions.OperationalError: [Errno 61] Connection refused

如果您已启用重试,则只有在重试耗尽或立即禁用后才会执行此操作。

You can handle this error too:

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception('Sending task raised: %r', exc)
Serializers(序列化)

客户端和工作者之间传输的数据需要序列化,因此Celery中的每条消息都有一个content_type标头,用于描述用于对其进行编码的序列化方法。

默认的序列化程序是JSON,但您可以使用task_serializer设置或每个单独的任务,甚至每条消息更改此设置。

内置支持JSONpickleYAMLmsgpack,您还可以添加自己的自定义序列化程序通过将它们注册到Kombu序列化程序注册表中

See also

Kombu用户指南中的Message Serialization(消息序列化)

每种选择都有其优点和缺点。

json – JSON is supported in many programming languages, is now

a standard part of Python (since 2.6), and is fairly fast to decode using the modern Python libraries, such as simplejson.

The primary disadvantage to JSON is that it limits you to the following data types: strings, Unicode, floats, Boolean, dictionaries, and lists. Decimals and dates are notably missing.

Binary data will be transferred using Base64 encoding, increasing the size of the transferred data by 34% compared to an encoding format where native binary types are supported.

However, if your data fits inside the above constraints and you need cross-language support, the default setting of JSON is probably your best choice.

See http://json.org for more information.

pickle – If you have no desire to support any language other than

Python, then using the pickle encoding will gain you the support of all built-in Python data types (except class instances), smaller messages when sending binary files, and a slight speedup over JSON processing.

See pickle for more information.

yaml – YAML has many of the same characteristics as json,

except that it natively supports more data types (including dates, recursive references, etc.).

However, the Python libraries for YAML are a good bit slower than the libraries for JSON.

If you need a more expressive set of data types and need to maintain cross-language compatibility, then YAML may be a better fit than the above.

See http://yaml.org/ for more information.

msgpack – msgpack is a binary serialization format that’s closer to JSON

in features. It’s very young however, and support should be considered experimental at this point.

See http://msgpack.org/ for more information.

The encoding used is available as a message header, so the worker knows how to deserialize any task. If you use a custom serializer, this serializer must be available for the worker.

The following order is used to decide the serializer used when sending a task:

  1. The serializer execution option.
  2. The Task.serializer attribute
  3. The task_serializer setting.

为单个任务调用设置自定义序列化程序的示例:

>>> add.apply_async((10, 10), serializer='json')
Compression(压缩)

Celery可以使用以下内置方案压缩消息:

  • brotli

    brotli针对Web进行了优化,特别是小型文本文档。 它对于提供静态内容(如字体和html页面)最为有效。

    To use it, install Celery with:

    $ pip install celery[brotli]
    
  • bzip2

    bzip2创建的文件比gzip小,但压缩和解压缩速度明显慢于gzip。

    To use it, please ensure your Python executable was compiled with bzip2 support.

    If you get the following ImportError:

    >>> import bz2
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'bz2'
    

    it means that you should recompile your Python version with bzip2 support.

  • gzip

    gzip适用于需要较小内存占用的系统,非常适合内存有限的系统。 It is often used to generate files with the “.tar.gz” extension.

    To use it, please ensure your Python executable was compiled with gzip support.

    If you get the following ImportError:

    >>> import gzip
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'gzip'
    

    it means that you should recompile your Python version with gzip support.

  • lzma

    lzma提供了良好的压缩比,并以较高的内存使用量为代价,以较快的压缩和解压缩速度执行。

    To use it, please ensure your Python executable was compiled with lzma support and that your Python version is 3.3 and above.

    If you get the following ImportError:

    >>> import lzma
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'lzma'
    

    it means that you should recompile your Python version with lzma support.

    Alternatively, you can also install a backport using:

    $ pip install celery[lzma]
    
  • zlib

    zlib是库形式的Deflate算法的抽象,它包括对gzip文件格式和API中的轻量级流格式的支持。 It is a crucial component of many software systems - Linux kernel and Git VCS just to name a few.

    To use it, please ensure your Python executable was compiled with zlib support.

    If you get the following ImportError:

    >>> import zlib
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'zlib'
    

    it means that you should recompile your Python version with zlib support.

  • zstd

    zstd针对zlib级别的实时压缩场景和更好的压缩率。 It’s backed by a very fast entropy stage, provided by Huff0 and FSE library.

    To use it, install Celery with:

    $ pip install celery[zstd]
    

您还可以创建自己的压缩方案,并在kombu 压缩 注册表中注册它们。

The following order is used to decide the compression scheme used when sending a task:

  1. The compression execution option.
  2. The Task.compression attribute.
  3. The task_compression attribute.

Example specifying the compression used when calling a task:

>>> add.apply_async((2, 2), compression='zlib')
Connections(连接)

您可以通过创建发布者来手动处理连接:

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

虽然这个特定的例子更好地表达为一组:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]
Routing options(路由选项)

Celery可以将任务路由到不同的队列。

Simple routing (name <-> name) is accomplished using the queue option:

add.apply_async(queue='priority.high')

然后,您可以使用workers -Q参数将worker分配给priority.high队列:

$ celery -A proj worker -l info -Q celery,priority.high

See also

建议不要在代码中使用硬编码队列名称,最佳做法是使用配置路由器(task_routes)。

要了解有关路由的更多信息,请参阅Routing Tasks(路由任务)

Results options(结果选项)

您可以使用task_ignore_result设置或使用ignore_result选项启用或禁用结果存储:

>>> result = add.apply_async(1, 2, ignore_result=True)
>>> result.get()
None

>>> # Do not ignore result (default)
...
>>> result = add.apply_async(1, 2, ignore_result=False)
>>> result.get()
3

如果您想在结果后端存储有关任务的其他元数据,请将result_extended设置为True

See also

有关任务的更多信息,请参阅Tasks(任务)

Advanced Options(高级选项)

这些选项适用于希望使用AMQP完整路由功能的高级用户。 有兴趣的人士可以阅读routinf guide(路线指南)

  • exchange

    Name of exchange (or a kombu.entity.Exchange) to send the message to.

  • routing_key

    Routing key used to determine.

  • priority

    A number between 0 and 255, where 255 is the highest priority.

    Supported by: RabbitMQ, Redis (priority reversed, 0 is highest).

Canvas:Designing Work-flows(画布:设计工作流程)

Signatures(签名)

New in version 2.0.

您刚学会了如何使用calling指南中的任务delay方法调用任务,这通常是您所需要的,但有时您可能希望传递签名任务调用到另一个进程或作为另一个函数的参数。

signature()以一种方式包装单个任务调用的参数,关键字参数和执行选项,以便它可以传递给函数,甚至可以序列化并通过线路发送。

  • 您可以使用如下名称为add任务创建签名:

    >>> from celery import signature
    >>> signature('tasks.add', args=(2, 2), countdown=10)
    tasks.add(2, 2)
    

    此任务具有arity 2(两个参数)的签名:(2, 2),并将倒计时执行选项设置为10。

  • 或者您可以使用任务的signature方法创建一个:

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    
  • 还有一个使用星形参数的快捷方式:

    >>> add.s(2, 2)
    tasks.add(2, 2)
    
  • 还支持关键字参数:

    >>> add.s(2, 2, debug=True)
    tasks.add(2, 2, debug=True)
    
  • 从任何singnature(签名)实例,您可以检查不同的字段:

    >>> s = add.signature((2, 2), {'debug': True}, countdown=10)
    >>> s.args
    (2, 2)
    >>> s.kwargs
    {'debug': True}
    >>> s.options
    {'countdown': 10}
    
  • 它支持delayapply_async等的“调用API”,包括直接调用(__ call __)。

    调用签名将在当前进程中内联执行任务:

    >>> add(2, 2)
    4
    >>> add.s(2, 2)()
    4
    

    delay是我们心爱的apply_async获取星型参数的快捷方式:

    >>> result = add.delay(2, 2)
    >>> result.get()
    4
    

    apply_async采用与app.Task.apply_async()方法相同的参数:

    >>> add.apply_async(args, kwargs, **options)
    >>> add.signature(args, kwargs, **options).apply_async()
    
    >>> add.apply_async((2, 2), countdown=1)
    >>> add.signature((2, 2), countdown=1).apply_async()
    
  • 您不能使用s()定义选项,但链接set调用会处理:

    >>> add.s(2, 2).set(countdown=1)
    proj.tasks.add(2, 2)
    
Partials(局部模板)

使用signature,您可以在worker中执行任务:

>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)

或者您可以在当前流程中直接调用它:

>>> add.s(2, 2)()
4

指定apply_async / delay的其他args,kwargs或选项会创建partials:

  • Any arguments added will be prepended to the args in the signature:

    >>> partial = add.s(2)          # incomplete signature
    >>> partial.delay(4)            # 4 + 2
    >>> partial.apply_async((4,))  # same
    
  • 添加的任何关键字参数都将与签名中的kwargs合并,新的关键字参数优先:

    >>> s = add.s(2, 2)
    >>> s.delay(debug=True)                    # -> add(2, 2, debug=True)
    >>> s.apply_async(kwargs={'debug': True})  # same
    
  • 添加的任何选项都将与签名中的选项合并,新选项优先:

    >>> s = add.signature((2, 2), countdown=10)
    >>> s.apply_async(countdown=1)  # countdown is now 1
    

You can also clone signatures to create derivatives:

>>> s = add.s(2)
proj.tasks.add(2)

>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)
Immutability(不变性)

New in version 3.0.

部分用于回调,任何链接的任务或和弦回调将与父任务的结果一起应用。 有时您希望指定不带其他参数的回调,在这种情况下,您可以将签名设置为不可变:

>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))

.si()快捷方式也可用于创建不可变签名:

>>> add.apply_async((2, 2), link=reset_buffers.si())

当签名不可变时,只能设置执行选项,因此无法使用部分args / kwargs调用签名。

Note

在本教程中,我有时会使用前缀运算符来签名。 您可能不应该在生产代码中使用它,但在Python shell中进行实验时它是一个方便的快捷方式:

>>> ~sig

>>> # is the same as
>>> sig.delay().get()
Callbacks(回调)

New in version 3.0.

可以使用apply_asynclink参数将回调添加到任何任务:

add.apply_async((2, 2), link=other_task.s())

只有在任务成功退出时才会应用回调,并且它将作为参数应用父任务的返回值。

正如我前面提到的,您添加到签名的任何参数都将被添加到签名本身指定的参数中!

如果你有签名:

>>> sig = add.s(10)

然后sig.delay(结果)变为:

>>> add.apply_async(args=(result, 10))

现在让我们使用部分参数调用我们的add任务:

>>> add.apply_async((2, 2), link=add.s(8))

正如预期的那样,这将首先启动一个任务计算2 + 2,然后另一个任务计算4 + 8

The Primitives

New in version 3.0.

Overview

  • group

    The group primitive is a signature that takes a list of tasks that should be applied in parallel.

  • chain

    The chain primitive lets us link together signatures so that one is called after the other, essentially forming a chain of callbacks.

  • chord

    A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

  • map

    The map primitive works like the built-in map function, but creates a temporary task where a list of arguments is applied to the task. For example, task.map([1, 2]) – results in a single task being called, applying the arguments in order to the task function so that the result is:

    res = [task(1), task(2)]
    
  • starmap

    Works exactly like map except the arguments are applied as *args. For example add.starmap([(2, 2), (4, 4)]) results in a single task calling:

    res = [add(2, 2), add(4, 4)]
    
  • chunks

    Chunking splits a long list of arguments into parts, for example the operation:

    >>> items = zip(xrange(1000), xrange(1000))  # 1000 items
    >>> add.chunks(items, 10)
    

    will split the list of items into chunks of 10, resulting in 100 tasks (each processing 10 items in sequence).

The primitives are also signature objects themselves, so that they can be combined in any number of ways to compose complex work-flows.

Here’s some examples:

  • Simple chain

    Here’s a simple chain, the first task executes passing its return value to the next task in the chain, and so on.

    >>> from celery import chain
    
    >>> # 2 + 2 + 4 + 8
    >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
    >>> res.get()
    16
    

    This can also be written using pipes:

    >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
    16
    
  • Immutable signatures

    Signatures can be partial so arguments can be added to the existing arguments, but you may not always want that, for example if you don’t want the result of the previous task in a chain.

    In that case you can mark the signature as immutable, so that the arguments cannot be changed:

    >>> add.signature((2, 2), immutable=True)
    

    There’s also a .si() shortcut for this, and this is the preffered way of creating signatures:

    >>> add.si(2, 2)
    

    Now you can create a chain of independent tasks instead:

    >>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
    >>> res.get()
    16
    
    >>> res.parent.get()
    8
    
    >>> res.parent.parent.get()
    4
    
  • Simple group

    You can easily create a group of tasks to execute in parallel:

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in xrange(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    
  • Simple chord

    The chord primitive enables us to add a callback to be called when all of the tasks in a group have finished executing. This is often required for algorithms that aren’t embarrassingly parallel:

    >>> from celery import chord
    >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
    >>> res.get()
    90
    

    The above example creates 10 task that all start in parallel, and when all of them are complete the return values are combined into a list and sent to the xsum task.

    The body of a chord can also be immutable, so that the return value of the group isn’t passed on to the callback:

    >>> chord((import_contact.s(c) for c in contacts),
    ...       notify_complete.si(import_id)).apply_async()
    

    Note the use of .si above; this creates an immutable signature, meaning any new arguments passed (including to return value of the previous task) will be ignored.

  • Blow your mind by combining

    Chains can be partial too:

    >>> c1 = (add.s(4) | mul.s(8))
    
    # (16 + 4) * 8
    >>> res = c1(16)
    >>> res.get()
    160
    

    this means that you can combine chains:

    # ((4 + 16) * 2 + 4) * 8
    >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
    
    >>> res = c2()
    >>> res.get()
    352
    

    Chaining a group together with another task will automatically upgrade it to be a chord:

    >>> c3 = (group(add.s(i, i) for i in xrange(10)) | xsum.s())
    >>> res = c3()
    >>> res.get()
    90
    

    Groups and chords accepts partial arguments too, so in a chain the return value of the previous task is forwarded to all tasks in the group:

    >>> new_user_workflow = (create_user.s() | group(
    ...                      import_contacts.s(),
    ...                      send_welcome_email.s()))
    ... new_user_workflow.delay(username='artv',
    ...                         first='Art',
    ...                         last='Vandelay',
    ...                         email='art@vandelay.com')
    

    If you don’t want to forward arguments to the group then you can make the signatures in the group immutable:

    >>> res = (add.s(4, 4) | group(add.si(i, i) for i in xrange(10)))()
    >>> res.get()
    <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
        bc01831b-9486-4e51-b046-480d7c9b78de,
        2650a1b8-32bf-4771-a645-b0a35dcc791b,
        dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
        59f92e0a-23ea-41ce-9fad-8645a0e7759c,
        26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
        2d10a5f4-37f0-41b2-96ac-a973b1df024d,
        e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
        104b2be0-7b75-44eb-ac8e-f9220bdfa140,
        c5c551a5-0386-4973-aa37-b65cbeb2624b,
        83f72d71-4b71-428e-b604-6f16599a9f37]>
    
    >>> res.parent.get()
    8
    
Chains

New in version 3.0.

Tasks can be linked together: the linked task is called when the task returns successfully:

>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
64

The linked task will be applied with the result of its parent task as the first argument. In the above case where the result was 64, this will result in mul(4, 16).

The results will keep track of any subtasks called by the original task, and this can be accessed from the result instance:

>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]

>>> res.children[0].get()
64

The result instance also has a collect() method that treats the result as a graph, enabling you to iterate over the results:

>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
 (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]

By default collect() will raise an IncompleteStream exception if the graph isn’t fully formed (one of the tasks hasn’t completed yet), but you can get an intermediate representation of the graph too:

>>> for result, value in res.collect(intermediate=True)):
....

You can link together as many tasks as you like, and signatures can be linked too:

>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())

You can also add error callbacks using the on_error method:

>>> add.s(2, 2).on_error(log_error.s()).delay()

This will result in the following .apply_async call when the signature is applied:

>>> add.apply_async((2, 2), link_error=log_error.s())

The worker won’t actually call the errback as a task, but will instead call the errback function directly so that the raw request, exception and traceback objects can be passed to it.

Here’s an example errback:

from __future__ import print_function

import os

from proj.celery import app

@app.task
def log_error(request, exc, traceback):
    with open(os.path.join('/var/errors', request.id), 'a') as fh:
        print('--\n\n{0} {1} {2}'.format(
            task_id, exc, traceback), file=fh)

To make it even easier to link tasks together there’s a special signature called chain that lets you chain tasks together:

>>> from celery import chain
>>> from proj.tasks import add, mul

>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)

Calling the chain will call the tasks in the current process and return the result of the last task in the chain:

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640

It also sets parent attributes so that you can work your way up the chain to get intermediate results:

>>> res.parent.get()
64

>>> res.parent.parent.get()
8

>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>

Chains can also be made using the | (pipe) operator:

>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
Graphs

In addition you can work with the result graph as a DependencyGraph:

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()

>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
    463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
    285fa253-fcf8-42ef-8b95-0078897e83e6(1)
        463afec2-5ed4-4036-b22d-ba067ec64f52(0)

You can even convert these graphs to dot format:

>>> with open('graph.dot', 'w') as fh:
...     res.parent.parent.graph.to_dot(fh)

and create images:

$ dot -Tpng graph.dot -o graph.png
_images/result_graph.png
Groups

New in version 3.0.

A group can be used to execute several tasks in parallel.

The group function takes a list of signatures:

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))

If you call the group, the tasks will be applied one after another in the current process, and a GroupResult instance is returned that can be used to keep track of the results, or tell how many tasks are ready and so on:

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]

Group also supports iterators:

>>> group(add.s(i, i) for i in xrange(100))()

A group is a signature object, so it can be used in combination with other signatures.

Group Results

The group task returns a special result too, this result works just like normal task results, except that it works on the group as a whole:

>>> from celery import group
>>> from tasks import add

>>> job = group([
...             add.s(2, 2),
...             add.s(4, 4),
...             add.s(8, 8),
...             add.s(16, 16),
...             add.s(32, 32),
... ])

>>> result = job.apply_async()

>>> result.ready()  # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]

GroupResult获取一个AsyncResult实例的列表,并对其进行操作,就好像这是一个单独的任务一样。

它支持以下操作:

  • successful()

    如果所有子任务都成功完成(例如,未引发异常),则返回True

  • failed()

    Return True if any of the subtasks failed.

  • waiting()

    Return True if any of the subtasks isn’t ready yet.

  • ready()

    Return True if all of the subtasks are ready.

  • completed_count()

    Return the number of completed subtasks.

  • revoke()

    Revoke all of the subtasks.

  • join()

    Gather the results of all subtasks and return them in the same order as they were called (as a list).

Chords

New in version 2.3.

Note

Tasks used within a chord must not ignore their results. If the result backend is disabled for any task (header or body) in your chord you should read “Important Notes.” Chords are not currently supported with the RPC result backend.

A chord is a task that only executes after all of the tasks in a group have finished executing.

Let’s calculate the sum of the expression 1 + 1 + 2 + 2 + 3 + 3 ... n + n up to a hundred digits.

First you need two tasks, add() and tsum() (sum() is already a standard function):

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

Now you can use a chord to calculate each addition step in parallel, and then get the sum of the resulting numbers:

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in xrange(100))(tsum.s()).get()
9900

This is obviously a very contrived example, the overhead of messaging and synchronization makes this a lot slower than its Python counterpart:

>>> sum(i + i for i in xrange(100))

The synchronization step is costly, so you should avoid using chords as much as possible. Still, the chord is a powerful primitive to have in your toolbox as synchronization is a required step for many parallel algorithms.

Let’s break the chord expression down:

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900

Remember, the callback can only be executed after all of the tasks in the header have returned. Each step in the header is executed as a task, in parallel, possibly on different nodes. The callback is then applied with the return value of each task in the header. The task id returned by chord() is the id of the callback, so you can wait for it to complete and get the final return value (but remember to never have a task wait for other tasks)

Error handling

So what happens if one of the tasks raises an exception?

The chord callback result will transition to the failure state, and the error is set to the ChordError exception:

>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "*/celery/result.py", line 120, in get
    interval=interval)
  File "*/celery/backends/amqp.py", line 150, in wait_for
    raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
    raised ValueError('something something',)

While the traceback may be different depending on the result backend used, you can see that the error description includes the id of the task that failed and a string representation of the original exception. You can also find the original traceback in result.traceback.

Note that the rest of the tasks will still execute, so the third task (add.s(8, 8)) is still executed even though the middle task failed. Also the ChordError only shows the task that failed first (in time): it doesn’t respect the ordering of the header group.

To perform an action when a chord fails you can therefore attach an errback to the chord callback:

@app.task
def on_chord_error(request, exc, traceback):
    print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
>>> c = (group(add.s(i, i) for i in range(10)) |
...      xsum.s().on_error(on_chord_error.s())).delay()
Important Notes

Tasks used within a chord must not ignore their results. In practice this means that you must enable a result_backend in order to use chords. Additionally, if task_ignore_result is set to True in your configuration, be sure that the individual tasks to be used within the chord are defined with ignore_result=False. This applies to both Task subclasses and decorated tasks.

Example Task subclass:

class MyTask(Task):
    ignore_result = False

Example decorated task:

@app.task(ignore_result=False)
def another_task(project):
    do_something()

By default the synchronization step is implemented by having a recurring task poll the completion of the group every second, calling the signature when ready.

Example implementation:

from celery import maybe_signature

@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
    if group.ready():
        return maybe_signature(callback).delay(group.join())
    raise self.retry(countdown=interval, max_retries=max_retries)

This is used by all result backends except Redis and Memcached: they increment a counter after each task in the header, then applies the callback when the counter exceeds the number of tasks in the set.

The Redis and Memcached approach is a much better solution, but not easily implemented in other backends (suggestions welcome!).

Note

Chords don’t properly work with Redis before version 2.2; you’ll need to upgrade to at least redis-server 2.2 to use them.

Note

If you’re using chords with the Redis result backend and also overriding the Task.after_return() method, you need to make sure to call the super method or else the chord callback won’t be applied.

def after_return(self, *args, **kwargs):
    do_something()
    super(MyTask, self).after_return(*args, **kwargs)
Map & Starmap

map and starmap are built-in tasks that calls the task for every element in a sequence.

They differ from group in that

  • only one task message is sent
  • the operation is sequential.

For example using map:

>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])
[45, 4950]

is the same as having a task doing:

@app.task
def temp():
    return [xsum(range(10)), xsum(range(100))]

and using starmap:

>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

is the same as having a task doing:

@app.task
def temp():
    return [add(i, i) for i in range(10)]

Both map and starmap are signature objects, so they can be used as other signatures and combined in groups etc., for example to call the starmap after 10 seconds:

>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
Chunks

Chunking lets you divide an iterable of work into pieces, so that if you have one million objects, you can create 10 tasks with hundred thousand objects each.

Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you’re avoiding the overhead of messaging it may considerably increase performance.

To create a chunks signature you can use app.Task.chunks():

>>> add.chunks(zip(range(100), range(100)), 10)

As with group the act of sending the messages for the chunks will happen in the current process when called:

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

while calling .apply_async will create a dedicated task so that the individual tasks are applied in a worker instead:

>>> add.chunks(zip(range(100), range(100)), 10).apply_async()

You can also convert chunks to a group:

>>> group = add.chunks(zip(range(100), range(100)), 10).group()

and with the group skew the countdown of each task by increments of one:

>>> group.skew(start=1, stop=10)()

This means that the first task will have a countdown of one second, the second task a countdown of two seconds, and so on.

Workers Guide

Starting the worker

您可以通过执行以下命令在前台启动worker:

$ celery -A proj worker -l info

有关可用命令行选项的完整列表,请参阅worker,或者只是执行以下操作:

$ celery worker --help

您可以在同一台计算机上启动多个工作程序,但请确保通过使用 - hostname参数指定节点名称来命名每个单独的工作程序:

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

hostname参数可以扩展以下变量:

  • %h: Hostname, including domain name.
  • %n: Hostname only.
  • %d: Domain name only.

如果当前主机名是george.example.com,这些将扩展为:

Variable Template Result
%h worker1@%h worker1@george.example.com
%n worker1@%n worker1@george
%d worker1@%d worker1@example.com

Note for supervisor users

必须通过添加第二个符号来转义符号:%% h

Stopping the worker

应使用TERM信号完成关机。

启动关闭后,工作人员将在实际终止之前完成所有当前正在执行的任务。 如果这些任务很重要,那么在做任何激烈的事情之前,你应该等待它完成,比如发送KILL信号。

如果工作人员在考虑周期之后不会关闭,因为卡在无限循环或类似环境中,您可以使用KILL信号强制终止工作人员:但要注意当前正在执行的任务将是丢失(即,除非任务设置了acks_late选项)。

由于进程无法覆盖KILL信号,工作人员将无法收获其子女;一定要手动完成。 这个命令通常可以解决问题:

$ pkill -9 -f 'celery worker'

如果您的系统上没有pkill命令,则可以使用稍长的版本:

$ ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
重新启动工作人员(Restarting the worker)

要重新启动worker,您应该发送TERM信号并启动一个新实例。 管理工作人员进行开发的最简单方法是使用celery multi

$ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

对于生产部署,您应该使用init-scripts或进程监督系统(请参阅Daemonization)。

除了停止,然后启动工作程序以重新启动,您还可以使用HUP信号重新启动工作程序。 请注意,工作人员将负责重新启动自身,因此这很容易出现问题,因此不建议在生产中使用:

$ kill -HUP $pid

Note

只有当worker在后台运行作为守护进程(它没有控制终端)时,才能通过HUP重新启动。

由于该平台的限制,在macOS上禁用了HUP

Process Signals(处理信号)

工人的主要过程会覆盖以下信号:

TERM 热关机,等待任务完成。
QUIT 冷关机,尽快终止
USR1 转储所有活动线程的回溯。
USR2 远程调试,请参阅celery.contrib.rdb
文件路径中的变量

- logfile - pidfile - statedb的文件路径参数可以包含worker将扩展的变量:

节点名称替换
  • %p: Full node name.
  • %h: Hostname, including domain name.
  • %n: Hostname only.
  • %d: Domain name only.
  • %i: Prefork pool process index or 0 if MainProcess.
  • %I: Prefork pool process index with separator.

例如,如果当前主机名是george@foo.example.com,那么这些将扩展为:

  • --logfile=%p.log -> george@foo.example.com.log
  • --logfile=%h.log -> foo.example.com.log
  • --logfile=%n.log -> george.log
  • --logfile=%d.log -> example.com.log
Prefork池进程索引

前叉池进程索引说明符将扩展为不同的文件名,具体取决于最终将需要打开文件的进程。

This can be used to specify one log file per child process.

Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. That is, the number is the process index not the process count or pid.

  • %i - Pool process index or 0 if MainProcess.

    Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:

    • worker1-0.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)
  • %I - Pool process index with separator.

    Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:

    • worker1.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)
并发(Concurrency)

默认情况下,多处理用于执行任务的并发执行,但您也可以使用Eventlet 可以使用 - concurrency参数更改工作进程/线程的数量,并且默认为计算机上可用的CPU数。

Number of processes (multiprocessing/prefork pool)

更多池进程通常更好,但是有一个截止点,添加更多池进程会以负面方式影响性能。 甚至有一些证据表明支持多个工作程序实例运行,可能比单个工作程序表现更好。 例如,每个有10个池进程的3个工作者。 您需要尝试找到最适合您的数字,因为这会因应用程序,工作负载,任务运行时间和其他因素而异。

Remote control(遥控器)

New in version 2.0.

pool support:prefork, eventlet, gevent, blocking:solo (see note)
broker support:amqp, redis

工作人员可以使用高优先级广播消息队列进行远程控制。 命令可以指向所有工作人员或特定工作人员列表。

命令也可以有回复。 然后,客户端可以等待并收集这些回复。 由于没有中央权限可以知道群集中有多少工作人员,因此也无法估计有多少工作人员可以发送回复,因此客户端具有可配置的超时 - 响应到达的截止时间(以秒为单位)。 此超时默认为一秒。 如果工作人员没有在截止日期内回复,则不一定意味着工作人员没有回复,或者更糟糕的是死亡,但可能仅仅是由于网络延迟或工作人员处理命令的速度慢,因此相应地调整超时。

除了超时,客户端还可以指定要等待的最大回复数。 如果指定了目标,则将此限制设置为目标主机的数量。

Note

solo池支持远程控制命令,但任何执行的任务都将阻止任何等待控制命令,因此如果工作人员非常繁忙,它的用途有限。 在这种情况下,您必须增加等待客户端中的回复的超时。

The broadcast() function

这是用于向工作人员发送命令的客户端功能。 某些远程控制命令在后台使用broadcast()也具有更高级别的接口,如rate_limit()ping()

发送rate_limit命令和关键字参数:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

这将异步发送命令,而不等待回复。 要请求回复,您必须使用reply(回复)参数:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

使用destination参数,您可以指定要接收命令的worker列表:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]

当然,使用更高级别的接口来设置速率限制要方便得多,但是只能使用broadcast()来请求命令。

Commands
revoke: Revoking tasks
pool support:all, terminate only supported by prefork
broker support:amqp, redis
command:celery -A proj control revoke <task_id>

All worker nodes keeps a memory of revoked task ids, either in-memory or persistent on disk (see Persistent revokes).

When a worker receives a revoke request it will skip executing the task, but it won’t terminate an already executing task unless the terminate option is set.

Note

The terminate option is a last resort for administrators when a task is stuck. It’s not for terminating the task, it’s for terminating the process that’s executing the task, and that process may have already started processing another task at the point when the signal is sent, so for this reason you must never call this programmatically.

If terminate is set the worker child process processing the task will be terminated. The default signal sent is TERM, but you can specify this using the signal argument. Signal can be the uppercase name of any signal defined in the signal module in the Python Standard Library.

Terminating a task also revokes it.

Example

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')
Revoking multiple tasks

New in version 3.1.

The revoke method also accepts a list argument, where it will revoke several tasks at once.

Example

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

The GroupResult.revoke method takes advantage of this since version 3.1.

Persistent revokes

Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.

The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker:

$ celery -A proj worker -l info --statedb=/var/run/celery/worker.state

or if you use celery multi you want to create one file per worker instance so use the %n format to expand the current node name:

celery multi start 2 -l info --statedb=/var/run/celery/%n.state

See also Variables in file paths

Note that remote control commands must be working for revokes to work. Remote control commands are only supported by the RabbitMQ (amqp) and Redis at this point.

Time Limits

New in version 2.0.

pool support:prefork/gevent

A single task can potentially run forever, if you have lots of tasks waiting for some event that’ll never happen you’ll block the worker from processing new tasks indefinitely. The best way to defend against this scenario happening is enabling time limits.

The time limit (–time-limit) is the maximum number of seconds a task may run before the process executing it is terminated and replaced by a new process. You can also enable a soft time limit (–soft-time-limit), this raises an exception the task can catch to clean up before the hard time limit kills it:

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

Time limits can also be set using the task_time_limit / task_soft_time_limit settings.

Note

Time limits don’t currently work on platforms that don’t support the SIGUSR1 signal.

Changing time limits at run-time

New in version 2.3.

broker support:amqp, redis

There’s a remote control command that enables you to change both soft and hard time limits for a task — named time_limit.

Example changing the time limit for the tasks.crawl_the_web task to have a soft time limit of one minute, and a hard time limit of two minutes:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

Only tasks that starts executing after the time limit change will be affected.

Rate Limits
Changing rate-limits at run-time

Example changing the rate limit for the myapp.mytask task to execute at most 200 tasks of that type every minute:

>>> app.control.rate_limit('myapp.mytask', '200/m')

The above doesn’t specify a destination, so the change request will affect all worker instances in the cluster. If you only want to affect a specific list of workers you can include the destination argument:

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['celery@worker1.example.com'])

Warning

This won’t affect workers with the worker_disable_rate_limits setting enabled.

Max tasks per child setting

New in version 2.0.

pool support:prefork

With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.

This is useful if you have memory leaks you have no control over for example from closed source C extensions.

The option can be set using the workers --max-tasks-per-child argument or using the worker_max_tasks_per_child setting.

Max memory per child setting

New in version 4.0.

pool support:prefork

With this option you can configure the maximum amount of resident memory a worker can execute before it’s replaced by a new process.

This is useful if you have memory leaks you have no control over for example from closed source C extensions.

The option can be set using the workers --max-memory-per-child argument or using the worker_max_memory_per_child setting.

Autoscaling

New in version 2.2.

pool support:prefork, gevent

The autoscaler component is used to dynamically resize the pool based on load:

  • The autoscaler adds more pool processes when there is work to do,
    • and starts removing processes when the workload is low.

It’s enabled by the --autoscale option, which needs two numbers: the maximum and minimum number of pool processes:

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

You can also define your own rules for the autoscaler by subclassing Autoscaler. Some ideas for metrics include load average or the amount of memory available. You can specify a custom autoscaler with the worker_autoscaler setting.

Queues

A worker instance can consume from any number of queues. By default it will consume from all queues defined in the task_queues setting (that if not specified falls back to the default queue named celery).

You can specify what queues to consume from at start-up, by giving a comma separated list of queues to the -Q option:

$ celery -A proj worker -l info -Q foo,bar,baz

If the queue name is defined in task_queues it will use that configuration, but if it’s not defined in the list of queues Celery will automatically generate a new queue for you (depending on the task_create_missing_queues option).

You can also tell the worker to start and stop consuming from a queue at run-time using the remote control commands add_consumer and cancel_consumer.

Queues: Adding consumers

The add_consumer control command will tell one or more workers to start consuming from a queue. This operation is idempotent.

To tell all workers in the cluster to start consuming from a queue named “foo” you can use the celery control program:

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

If you want to specify a specific worker you can use the --destination argument:

$ celery -A proj control add_consumer foo -d celery@worker1.local

The same can be accomplished dynamically using the app.control.add_consumer() method:

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

By now we’ve only shown examples using automatic queues, If you need more control you can also specify the exchange, routing_key and even other options:

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])
Queues: Canceling consumers

You can cancel a consumer by queue name using the cancel_consumer control command.

To force all workers in the cluster to cancel consuming from a queue you can use the celery control program:

$ celery -A proj control cancel_consumer foo

The --destination argument can be used to specify a worker, or a list of workers, to act on the command:

$ celery -A proj control cancel_consumer foo -d celery@worker1.local

You can also cancel consumers programmatically using the app.control.cancel_consumer() method:

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]
Queues: List of active queues

You can get a list of queues that a worker consumes from by using the active_queues control command:

$ celery -A proj inspect active_queues
[...]

Like all other remote control commands this also supports the --destination argument used to specify the workers that should reply to the request:

$ celery -A proj inspect active_queues -d celery@worker1.local
[...]

This can also be done programmatically by using the app.control.inspect.active_queues() method:

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]
Inspecting workers

app.control.inspect lets you inspect running workers. It uses remote control commands under the hood.

You can also use the celery command to inspect workers, and it supports the same commands as the app.control interface.

>>> # Inspect all nodes.
>>> i = app.control.inspect()

>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')
Dump of registered tasks

You can get a list of tasks registered in the worker using the registered():

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]
Dump of currently executing tasks

You can get a list of active tasks using active():

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]
Dump of scheduled (ETA) tasks

You can get a list of tasks waiting to be scheduled by using scheduled():

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

Note

These are tasks with an ETA/countdown argument, not periodic tasks.

Dump of reserved tasks

Reserved tasks are tasks that have been received, but are still waiting to be executed.

You can get a list of these using reserved():

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]
Statistics

远程控制命令inspect stats(或stats())将为您提供一长串有用的(或不太有用)有关该工人的统计信息:

$ celery -A proj inspect stats

The output will include the following fields:

  • broker

    Section for broker information.

    • connect_timeout

      建立新连接的超时时间(以秒为单位)(整数/浮点数)。

    • heartbeat

      当前心跳值(由客户端设置)。

    • hostname

      远程代理的节点名称。

    • insist

      No longer used.

    • login_method

      Login method used to connect to the broker.

    • port

      Port of the remote broker.

    • ssl

      SSL enabled/disabled.

    • transport

      Name of transport used (e.g., amqp or redis)

    • transport_options

      Options passed to transport.

    • uri_prefix

      Some transports expects the host name to be a URL.

      redis+socket:///tmp/redis.sock
      

      In this example the URI-prefix will be redis.

    • userid

      User id used to connect to the broker with.

    • virtual_host

      Virtual host used.

  • clock

    Value of the workers logical clock. This is a positive integer and should be increasing every time you receive statistics.

  • pid

    Process id of the worker instance (Main process).

  • pool

    Pool-specific section.

    • max-concurrency

      Max number of processes/threads/green threads.

    • max-tasks-per-child

      Max number of tasks a thread may execute before being recycled.

    • processes

      List of PIDs (or thread-id’s).

    • put-guarded-by-semaphore

      Internal

    • timeouts

      Default values for time limits.

    • writes

      Specific to the prefork pool, this shows the distribution of writes to each process in the pool when using async I/O.

  • prefetch_count

    Current prefetch count value for the task consumer.

  • rusage

    System usage statistics. The fields available may be different on your platform.

    From getrusage(2):

    • stime

      Time spent in operating system code on behalf of this process.

    • utime

      Time spent executing user instructions.

    • maxrss

      The maximum resident size used by this process (in kilobytes).

    • idrss

      Amount of non-shared memory used for data (in kilobytes times ticks of execution)

    • isrss

      Amount of non-shared memory used for stack space (in kilobytes times ticks of execution)

    • ixrss

      Amount of memory shared with other processes (in kilobytes times ticks of execution).

    • inblock

      Number of times the file system had to read from the disk on behalf of this process.

    • oublock

      Number of times the file system has to write to disk on behalf of this process.

    • majflt

      Number of page faults that were serviced by doing I/O.

    • minflt

      Number of page faults that were serviced without doing I/O.

    • msgrcv

      Number of IPC messages received.

    • msgsnd

      Number of IPC messages sent.

    • nvcsw

      Number of times this process voluntarily invoked a context switch.

    • nivcsw

      Number of times an involuntary context switch took place.

    • nsignals

      Number of signals received.

    • nswap

      The number of times this process was swapped entirely out of memory.

  • total

    Map of task names and the total number of tasks with that type the worker has accepted since start-up.

Additional Commands
Remote shutdown

This command will gracefully shut down the worker remotely:

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='worker1@example.com')
Ping

This command requests a ping from alive workers. The workers reply with the string ‘pong’, and that’s just about it. It will use the default one second timeout for replies unless you specify a custom timeout:

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping() also supports the destination argument, so you can specify the workers to ping:

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]
Enable/disable events

You can enable/disable events by using the enable_events, disable_events commands. This is useful to temporarily monitor a worker using celery events/celerymon.

>>> app.control.enable_events()
>>> app.control.disable_events()
Writing your own remote control commands

There are two types of remote control commands:

  • Inspect command

    Does not have side effects, will usually just return some value found in the worker, like the list of currently registered tasks, the list of active tasks, etc.

  • Control command

    Performs side effects, like adding a new queue to consume from.

Remote control commands are registered in the control panel and they take a single argument: the current ControlDispatch instance. From there you have access to the active Consumer if needed.

Here’s an example control command that increments the task prefetch count:

from celery.worker.control import control_command

@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

Make sure you add this code to a module that is imported by the worker: this could be the same module as where your Celery app is defined, or you can add the module to the imports setting.

Restart the worker so that the control command is registered, and now you can call your command using the celery control utility:

$ celery -A proj control increase_prefetch_count 3

You can also add actions to the celery inspect program, for example one that reads the current prefetch count:

from celery.worker.control import inspect_command

@inspect_command
def current_prefetch_count(state):
    return {'prefetch_count': state.consumer.qos.value}

After restarting the worker you can now query this value using the celery inspect program:

$ celery -A proj inspect current_prefetch_count

守护进程

如今,大多数Linux发行版都使用systemd来管理系统和用户服务的生命周期。

You can check if your Linux distribution uses systemd by typing:

$ systemd --version
systemd 237
+PAM +AUDIT +SELINUX +IMA +APPARMOR +SMACK +SYSVINIT +UTMP +LIBCRYPTSETUP +GCRYPT +GNUTLS +ACL +XZ +LZ4 +SECCOMP +BLKID +ELFUTILS +KMOD -IDN2 +IDN -PCRE2 default-hierarchy=hybrid

If you have output similar to the above, please refer to our systemd documentation for guidance.

However, the init.d script should still work in those Linux distributions as well since systemd provides the systemd-sysv compatiblity layer which generates services automatically from the init.d scripts we provide.

If you package Celery for multiple Linux distributions and some do not support systemd or to other Unix systems as well, you may want to refer to our init.d documentation.

Generic init-scripts

See the extra/generic-init.d/ directory Celery distribution.

This directory contains generic bash init-scripts for the celery worker program, these should run on Linux, FreeBSD, OpenBSD, and other Unix-like platforms.

Init-script: celeryd
Usage:/etc/init.d/celeryd {start|stop|restart|status}
Configuration file:
 /etc/default/celeryd

To configure this script to run the worker properly you probably need to at least tell it where to change directory to when it starts (to find the module containing your app, or your configuration module).

The daemonization script is configured by the file /etc/default/celeryd. This is a shell (sh) script where you can add environment variables like the configuration options below. To add real environment variables affecting the worker you must also export them (e.g., export DISPLAY=":0")

Superuser privileges required

The init-scripts can only be used by root, and the shell configuration file must also be owned by root.

Unprivileged users don’t need to use the init-script, instead they can use the celery multi utility (or celery worker --detach):

$ celery multi start worker1 \
    -A proj \
    --pidfile="$HOME/run/celery/%n.pid" \
    --logfile="$HOME/log/celery/%n%I.log"

$ celery multi restart worker1 \
    -A proj \
    --logfile="$HOME/log/celery/%n%I.log" \
    --pidfile="$HOME/run/celery/%n.pid

$ celery multi stopwait worker1 --pidfile="$HOME/run/celery/%n.pid"
Example configuration

This is an example configuration for a Python project.

/etc/default/celeryd:

# Names of nodes to start
#   most people will only start one node:
CELERYD_NODES="worker1"
#   but you can also start multiple and configure settings
#   for each in CELERYD_OPTS
#CELERYD_NODES="worker1 worker2 worker3"
#   alternatively, you can specify the number of nodes to start:
#CELERYD_NODES=10

# Absolute or relative path to the 'celery' command:
CELERY_BIN="/usr/local/bin/celery"
#CELERY_BIN="/virtualenvs/def/bin/celery"

# App instance to use
# comment out this line if you don't use an app
CELERY_APP="proj"
# or fully qualified:
#CELERY_APP="proj.tasks:app"

# Where to chdir at start.
CELERYD_CHDIR="/opt/Myproject/"

# Extra command-line arguments to the worker
CELERYD_OPTS="--time-limit=300 --concurrency=8"
# Configure node-specific settings by appending node name to arguments:
#CELERYD_OPTS="--time-limit=300 -c 8 -c:worker2 4 -c:worker3 2 -Ofair:worker1"

# Set logging level to DEBUG
#CELERYD_LOG_LEVEL="DEBUG"

# %n will be replaced with the first part of the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user.
#   You need to create this user manually (or you can choose
#   a user/group combination that already exists (e.g., nobody).
CELERYD_USER="celery"
CELERYD_GROUP="celery"

# If enabled pid and log directories will be created if missing,
# and owned by the userid/group configured.
CELERY_CREATE_DIRS=1
Using a login shell

You can inherit the environment of the CELERYD_USER by using a login shell:

CELERYD_SU_ARGS="-l"

Note that this isn’t recommended, and that you should only use this option when absolutely necessary.

Example Django configuration

Django用户现在使用与上面完全相同的模板,但是请确保定义您的Celery应用程序实例的模块还为 DJANGO_SETTINGS_MODULE设置了默认值,如示例Django中所示使用Django的第一步。

Available options
  • CELERY_APP

    App instance to use (value for --app argument).

  • CELERY_BIN

    celery程序的绝对或相对路径。 Examples:

    • celery
    • /usr/local/bin/celery
    • /virtualenvs/proj/bin/celery
    • /virtualenvs/proj/bin/python -m celery
  • CELERYD_NODES

    要开始的节点名称列表(以空格分隔)。

  • CELERYD_OPTS

    辅助程序的其他命令行参数,有关列表,请参见celery worker –help This also supports the extended syntax used by multi to configure settings for individual nodes. See celery multi –help for some multi-node configuration examples.

  • CELERYD_CHDIR

    Path to change directory to at start. Default is to stay in the current directory.

  • CELERYD_PID_FILE

    Full path to the PID file. Default is /var/run/celery/%n.pid

  • CELERYD_LOG_FILE

    工作者日志文件的完整路径。 Default is /var/log/celery/%n%I.log Note: Using %I is important when using the prefork pool as having multiple processes share the same log file will lead to race conditions.

  • CELERYD_LOG_LEVEL

    Worker log level. Default is INFO.

  • CELERYD_USER

    User to run the worker as. Default is current user.

  • CELERYD_GROUP

    Group to run worker as. Default is current user.

  • CELERY_CREATE_DIRS

    Always create directories (log directory and pid file directory). Default is to only create directories when no custom logfile/pidfile set.

  • CELERY_CREATE_RUNDIR

    始终创建pidfile目录。 默认情况下,仅当未设置自定义pidfile位置时才启用。

  • CELERY_CREATE_LOGDIR

    Always create logfile directory. By default only enable when no custom logfile location set.

Init-script: celerybeat
Usage:/etc/init.d/celerybeat {start|stop|restart}
Configuration file:
 /etc/default/celerybeat or /etc/default/celeryd.
Example configuration

This is an example configuration for a Python project:

/etc/default/celerybeat:

# Absolute or relative path to the 'celery' command:
CELERY_BIN="/usr/local/bin/celery"
#CELERY_BIN="/virtualenvs/def/bin/celery"

# App instance to use
# comment out this line if you don't use an app
CELERY_APP="proj"
# or fully qualified:
#CELERY_APP="proj.tasks:app"

# Where to chdir at start.
CELERYBEAT_CHDIR="/opt/Myproject/"

# Extra arguments to celerybeat
CELERYBEAT_OPTS="--schedule=/var/run/celery/celerybeat-schedule"
Example Django configuration

You should use the same template as above, but make sure the DJANGO_SETTINGS_MODULE variable is set (and exported), and that CELERYD_CHDIR is set to the projects directory:

export DJANGO_SETTINGS_MODULE="settings"

CELERYD_CHDIR="/opt/MyProject"
Available options
  • CELERY_APP

    App instance to use (value for --app argument).

  • CELERYBEAT_OPTS

    Additional arguments to celery beat, see celery beat --help for a list of available options.

  • CELERYBEAT_PID_FILE

    Full path to the PID file. Default is /var/run/celeryd.pid.

  • CELERYBEAT_LOG_FILE

    Full path to the log file. Default is /var/log/celeryd.log.

  • CELERYBEAT_LOG_LEVEL

    Log level to use. Default is INFO.

  • CELERYBEAT_USER

    User to run beat as. Default is the current user.

  • CELERYBEAT_GROUP

    Group to run beat as. Default is the current user.

  • CELERY_CREATE_DIRS

    Always create directories (log directory and pid file directory). Default is to only create directories when no custom logfile/pidfile set.

  • CELERY_CREATE_RUNDIR

    Always create pidfile directory. By default only enabled when no custom pidfile location set.

  • CELERY_CREATE_LOGDIR

    Always create logfile directory. By default only enable when no custom logfile location set.

Troubleshooting

如果您无法使init脚本正常工作,则应尝试以verbose mode-详细模式运行它们:

# sh -x /etc/init.d/celeryd start

This can reveal hints as to why the service won’t start.

If the worker starts with “OK” but exits almost immediately afterwards and there’s no evidence in the log file, then there’s probably an error but as the daemons standard outputs are already closed you’ll not be able to see them anywhere. For this situation you can use the C_FAKEFORK environment variable to skip the daemonization step:

# C_FAKEFORK=1 sh -x /etc/init.d/celeryd start

and now you should be able to see the errors.

Commonly such errors are caused by insufficient permissions to read from, or write to a file, and also by syntax errors in configuration modules, user modules, third-party libraries, or even from Celery itself (if you’ve found a bug you should report it).

Usage systemd
Usage:systemctl {start|stop|restart|status} celery.service
Configuration file:
 /etc/conf.d/celery
Service file: celery.service

This is an example systemd file:

/etc/systemd/system/celery.service:

[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking
User=celery
Group=celery
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/celery
ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \
  -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
  --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \
  --pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \
  -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
  --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'

[Install]
WantedBy=multi-user.target

Once you’ve put that file in /etc/systemd/system, you should run systemctl daemon-reload in order that Systemd acknowledges that file. You should also run that command each time you modify it.

To configure user, group, chdir change settings: User, Group, and WorkingDirectory defined in /etc/systemd/system/celery.service.

You can also use systemd-tmpfiles in order to create working directories (for logs and pid).

file:/etc/tmpfiles.d/celery.conf
d /var/run/celery 0755 celery celery -
d /var/log/celery 0755 celery celery -
Example configuration

This is an example configuration for a Python project:

/etc/conf.d/celery:

# Name of nodes to start
# here we have a single node
CELERYD_NODES="w1"
# or we could have three nodes:
#CELERYD_NODES="w1 w2 w3"

# Absolute or relative path to the 'celery' command:
CELERY_BIN="/usr/local/bin/celery"
#CELERY_BIN="/virtualenvs/def/bin/celery"

# App instance to use
# comment out this line if you don't use an app
CELERY_APP="proj"
# or fully qualified:
#CELERY_APP="proj.tasks:app"

# How to call manage.py
CELERYD_MULTI="multi"

# Extra command-line arguments to the worker
CELERYD_OPTS="--time-limit=300 --concurrency=8"

# - %n will be replaced with the first part of the nodename.
# - %I will be replaced with the current child process index
#   and is important when using the prefork pool to avoid race conditions.
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="INFO"

# you may wish to add these options for Celery Beat
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"
Service file: celerybeat.service

This is an example systemd file for Celery Beat:

/etc/systemd/system/celerybeat.service:

[Unit]
Description=Celery Beat Service
After=network.target

[Service]
Type=simple
User=celery
Group=celery
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/celery
ExecStart=/bin/sh -c '${CELERY_BIN} beat  \
  -A ${CELERY_APP} --pidfile=${CELERYBEAT_PID_FILE} \
  --logfile=${CELERYBEAT_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL}'

[Install]
WantedBy=multi-user.target
Running the worker with superuser privileges (root)

Running the worker with superuser privileges is a very dangerous practice. There should always be a workaround to avoid running as root. Celery may run arbitrary code in messages serialized with pickle - this is dangerous, especially when run as root.

By default Celery won’t run workers as root. The associated error message may not be visible in the logs but may be seen if C_FAKEFORK is used.

To force Celery to run workers as root use C_FORCE_ROOT.

When running as root without C_FORCE_ROOT the worker will appear to start with “OK” but exit immediately after with no apparent errors. This problem may appear when running the project in a new development or production environment (inadvertently) as root.

周期性任务

Introduction

celery beat是一个调度员;它定期启动任务,然后由群集中的可用工作节点执行。

默认情况下,条目取自beat_schedule设置,但也可以使用自定义存储,例如将条目存储在SQL数据库中。

您必须确保一次只有一个调度程序正在运行,否则您最终会遇到重复的任务。 使用集中式方法意味着不必同步调度,并且服务可以在不使用锁的情况下运行。

Time Zones

默认情况下,定期任务计划使用UTC时区,但您可以使用timezone设置更改使用的时区。

An example time zone could be Europe/London:

timezone = 'Europe/London'

This setting must be added to your app, either by configuring it directly using (app.conf.timezone = 'Europe/London'), or by adding it to your configuration module if you have set one up using app.config_from_object. See Configuration for more information about configuration options.

默认调度程序(将调度存储在celerybeat-schedule文件中)将自动检测时区已更改,因此将重置调度本身,但其他调度程序可能不那么智能(例如, Django数据库调度程序,见下文),在这种情况下,您将不得不手动重置计划。

Django Users

Celery推荐并与Django 1.4中引入的新USE_TZ设置兼容。

对于Django用户,将使用TIME_ZONE设置中指定的时区,或者您可以使用timezone设置单独为Celery指定自定义时区。

与时区相关的设置更改时,数据库计划程序不会重置,因此您必须手动执行此操作:

$ python manage.py shell
>>> from djcelery.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)

Django-Celery仅支持Celery 4.0及更低版本,对于Celery 4.0及更高版本,请执行以下操作:

$ python manage.py shell
>>> from django_celery_beat.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)
Entries

要定期调用任务,您必须在节拍时间表列表中添加一个条目。

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

on_after_configure处理程序中设置它们意味着在使用test.s()时我们不会在模块级别评估应用程序。

add_periodic_task()函数会将条目添加到幕后的beat_schedule设置中,同样的设置也可用于手动设置周期性任务:

Example: Run the tasks.add task every 30 seconds.

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

Note

如果您想知道这些设置应该去哪里,请参阅配置 您可以直接在应用程序上设置这些选项,也可以保留单独的模块进行配置。

如果要为args使用单个项元组,请不要忘记构造函数是逗号,而不是一对括号。

Using a timedelta for the schedule means the task will be sent in 30 second intervals (the first task will be sent 30 seconds after celery beat starts, and then every 30 seconds after the last run).

A Crontab like schedule also exists, see the section on Crontab schedules.

Like with cron, the tasks may overlap if the first task doesn’t complete before the next. If that’s a concern you should use a locking strategy to ensure only one instance can run at a time (see for example Ensuring a task is only executed one at a time).

Available Fields
  • task

    要执行的任务的名称。

  • schedule

    执行频率。

    这可以是秒数,可以是整数,timedeltacrontab 您还可以通过扩展schedule的界面来定义自己的自定义时间表类型。

  • args

    Positional arguments (list or tuple).

  • kwargs

    Keyword arguments (dict).

  • options

    Execution options (dict).

    This can be any argument supported by apply_async()exchange, routing_key, expires, and so on.

  • relative

    If relative is true timedelta schedules are scheduled “by the clock.” This means the frequency is rounded to the nearest second, minute, hour or day depending on the period of the timedelta.

    By default relative is false, the frequency isn’t rounded and will be relative to the time when celery beat was started.

Crontab schedules

如果您想要更好地控制任务执行的时间,例如,特定时间或一周中的某天,您可以使用crontab计划类型:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

这些Crontab表达式的语法非常灵活。

Some examples:

Example Meaning
crontab() 每分钟执行一次。
crontab(minute=0, hour=0) 每天午夜执行。
crontab(minute=0, hour='*/3') 每三个小时执行一次:午夜,凌晨3点,早上6点,上午9点,中午,下午3点,下午6点,晚上9点。
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*', day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22', day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0, hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0, day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/2')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0, day_of_month='11',
month_of_year='5')
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute every day on the first month of every quarter.

有关更多文档,请参见celery.schedules.crontab

Solar schedules

如果您有应根据日出,日落,黎明或黄昏执行的任务,则可以使用solar计划类型:

from celery.schedules import solar

app.conf.beat_schedule = {
    # Executes at sunset in Melbourne
    'add-at-melbourne-sunset': {
        'task': 'tasks.add',
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16),
    },
}

The arguments are simply: solar(event, latitude, longitude)

Be sure to use the correct sign for latitude and longitude:

Sign Argument Meaning
+ latitude North
- latitude South
+ longitude East
- longitude West

Possible event types are:

Event Meaning
dawn_astronomical Execute at the moment after which the sky is no longer completely dark. This is when the sun is 18 degrees below the horizon.
dawn_nautical Execute when there’s enough sunlight for the horizon and some objects to be distinguishable; formally, when the sun is 12 degrees below the horizon.
dawn_civil Execute when there’s enough light for objects to be distinguishable so that outdoor activities can commence; formally, when the Sun is 6 degrees below the horizon.
sunrise Execute when the upper edge of the sun appears over the eastern horizon in the morning.
solar_noon Execute when the sun is highest above the horizon on that day.
sunset Execute when the trailing edge of the sun disappears over the western horizon in the evening.
dusk_civil Execute at the end of civil twilight, when objects are still distinguishable and some stars and planets are visible. Formally, when the sun is 6 degrees below the horizon.
dusk_nautical Execute when the sun is 12 degrees below the horizon. Objects are no longer distinguishable, and the horizon is no longer visible to the naked eye.
dusk_astronomical Execute at the moment after which the sky becomes completely dark; formally, when the sun is 18 degrees below the horizon.

所有太阳活动均使用UTC计算,因此不受时区设置的影响。

In polar regions, the sun may not rise or set every day. The scheduler is able to handle these cases (i.e., a sunrise event won’t run on a day when the sun doesn’t rise). The one exception is solar_noon, which is formally defined as the moment the sun transits the celestial meridian, and will occur every day even if the sun is below the horizon.

Twilight is defined as the period between dawn and sunrise; and between sunset and dusk. You can schedule an event according to “twilight” depending on your definition of twilight (civil, nautical, or astronomical), and whether you want the event to take place at the beginning or end of twilight, using the appropriate event from the list above.

See celery.schedules.solar for more documentation.

Starting the Scheduler

要启动celery beat服务:

$ celery -A proj beat

您还可以通过启用workers -B选项在工作者中嵌入beat,如果您永远不会运行多个工作节点,这很方便,但它不常用因此不建议用于生产用途:

$ celery -A proj worker -B

Beat需要将任务的最后运行时间存储在本地数据库文件中(默认名称为celerybeat-schedule),因此需要访问当前目录中的写入,或者您可以指定自定义此文件的位置:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

Note

To daemonize beat see Daemonization.

Using custom scheduler classes

可以在命令行上指定自定义调度程序类( - scheduler参数)。

默认调度程序是celery.beat.PersistentScheduler,它只是在本地shelf数据库文件中跟踪上次运行时间。

There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

要安装和使用此扩展:

  1. Use pip to install the package:

    $ pip install django-celery-beat
    
  2. Add the django_celery_beat module to INSTALLED_APPS in your Django project’ settings.py:

    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    

    请注意,模块名称中没有短划线,只有下划线。

  3. 应用Django数据库迁移,以便创建必要的表:

    $ python manage.py migrate
    
  4. Start the celery beat service using the django_celery_beat.schedulers:DatabaseScheduler scheduler:

    $ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    注意:您也可以直接将其添加为设置选项。

  5. 访问Django-Admin界面以设置一些定期任务。

路由任务

Note

Alternate routing concepts like topic and fanout is not available for all transports, please consult the transport comparison table.

Basics
Automatic routing

The simplest way to do routing is to use the task_create_missing_queues setting (on by default).

With this setting on, a named queue that’s not already defined in task_queues will be created automatically. This makes it easy to perform simple routing tasks.

Say you have two servers, x, and y that handle regular tasks, and one server z, that only handles feed related tasks. You can use this configuration:

task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

With this route enabled import feed tasks will be routed to the “feeds” queue, while all other tasks will be routed to the default queue (named “celery” for historical reasons).

Alternatively, you can use glob pattern matching, or even regular expressions, to match all tasks in the feed.tasks name-space:

app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

If the order of matching patterns is important you should specify the router in items format instead:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

Note

The task_routes setting can either be a dictionary, or a list of router objects, so in this case we need to specify the setting as a tuple containing a list.

After installing the router, you can start server z to only process the feeds queue like this:

user@z:/$ celery -A proj worker -Q feeds

You can specify as many queues as you want, so you can make this server process the default queue as well:

user@z:/$ celery -A proj worker -Q feeds,celery
Changing the name of the default queue

You can change the name of the default queue by using the following configuration:

app.conf.task_default_queue = 'default'
How the queues are defined

The point with this feature is to hide the complex AMQP protocol for users with only basic needs. However – you may still be interested in how these queues are declared.

A queue named “video” will be created with the following settings:

{'exchange': 'video',
 'exchange_type': 'direct',
 'routing_key': 'video'}

The non-AMQP backends like Redis or SQS don’t support exchanges, so they require the exchange to have the same name as the queue. Using this design ensures it will work for them as well.

Manual routing

Say you have two servers, x, and y that handle regular tasks, and one server z, that only handles feed related tasks, you can use this configuration:

from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
task_default_exchange = 'tasks'
task_default_exchange_type = 'topic'
task_default_routing_key = 'task.default'

task_queues is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the task_default_exchange and task_default_exchange_type settings.

To route a task to the feed_tasks queue, you can add an entry in the task_routes setting:

task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

You can also override this using the routing_key argument to Task.apply_async(), or send_task():

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

To make server z consume from the feed queue exclusively you can start it with the celery worker -Q option:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

Servers x and y must be configured to consume from the default queue:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

If you want, you can even have your feed processing worker handle regular tasks as well, maybe in times when there’s a lot of work to do:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

If you have another queue but on another exchange you want to add, just specify a custom exchange and exchange type:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

If you’re confused about these terms, you should read up on AMQP.

See also

In addition to the Redis Message Priorities below, there’s Rabbits and Warrens, an excellent blog post describing queues and exchanges. There’s also The CloudAMQP tutorial, For users of RabbitMQ the RabbitMQ FAQ could be useful as a source of information.

Special Routing Options
RabbitMQ Message Priorities
supported transports:
 RabbitMQ

New in version 4.0.

Queues can be configured to support priorities by setting the x-max-priority argument:

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10}),
]

A default value for all queues can be set using the task_queue_max_priority setting:

app.conf.task_queue_max_priority = 10

A default priority for all tasks can also be specified using the task_default_priority setting:

app.conf.task_default_priority = 5
Redis Message Priorities
supported transports:
 Redis

While the Celery Redis transport does honor the priority field, Redis itself has no notion of priorities. Please read this note before attempting to implement priorities with Redis as you may experience some unexpected behavior.

The priority support is implemented by creating n lists for each queue. This means that even though there are 10 (0-9) priority levels, these are consolidated into 4 levels by default to save resources. This means that a queue named celery will really be split into 4 queues:

['celery0', 'celery3', 'celery6', 'celery9']

If you want more priority levels you can set the priority_steps transport option:

app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
}

That said, note that this will never be as good as priorities implemented at the server level, and may be approximate at best. But it may still be good enough for your application.

AMQP Primer
Messages

A message consists of headers and a body. Celery uses headers to store the content type of the message and its content encoding. The content type is usually the serialization format used to serialize the message. The body contains the name of the task to execute, the task id (UUID), the arguments to apply it with and some additional meta-data – like the number of retries or an ETA.

This is an example task message represented as a Python dictionary:

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}
Producers, consumers, and brokers

The client sending messages is typically called a publisher, or a producer, while the entity receiving messages is called a consumer.

The broker is the message server, routing messages from producers to consumers.

You’re likely to see these terms used a lot in AMQP related material.

Exchanges, queues, and routing keys
  1. Messages are sent to exchanges.
  2. An exchange routes messages to one or more queues. Several exchange types exists, providing different ways to do routing, or implementing different messaging scenarios.
  3. The message waits in the queue until someone consumes it.
  4. The message is deleted from the queue when it has been acknowledged.

The steps required to send and receive messages are:

  1. Create an exchange
  2. Create a queue
  3. Bind the queue to the exchange.

Celery automatically creates the entities necessary for the queues in task_queues to work (except if the queue’s auto_declare setting is set to False).

Here’s an example queue configuration with three queues; One for video, one for images, and one default queue for everything else:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'
Exchange types

The exchange type defines how the messages are routed through the exchange. The exchange types defined in the standard are direct, topic, fanout and headers. Also non-standard exchange types are available as plug-ins to RabbitMQ, like the last-value-cache plug-in by Michael Bridgen.

Direct exchanges

Direct exchanges match by exact routing keys, so a queue bound by the routing key video only receives messages with that routing key.

Topic exchanges

Topic exchanges matches routing keys using dot-separated words, and the wild-card characters: * (matches a single word), and # (matches zero or more words).

With routing keys like usa.news, usa.weather, norway.news, and norway.weather, bindings could be *.news (all news), usa.# (all items in the USA), or usa.weather (all USA weather items).

Hands-on with the API

Celery comes with a tool called celery amqp that’s used for command line access to the AMQP API, enabling access to administration tasks like creating/deleting queues and exchanges, purging queues or sending messages. It can also be used for non-AMQP brokers, but different implementation may not implement all commands.

You can write commands directly in the arguments to celery amqp, or just start with no arguments to start it in shell-mode:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

Here 1> is the prompt. The number 1, is the number of commands you have executed so far. Type help for a list of commands available. It also supports auto-completion, so you can start typing a command and then hit the tab key to show a list of possible matches.

Let’s create a queue you can send messages to:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

This created the direct exchange testexchange, and a queue named testqueue. The queue is bound to the exchange using the routing key testkey.

From now on all messages sent to the exchange testexchange with routing key testkey will be moved to this queue. You can send a message by using the basic.publish command:

4> basic.publish 'This is a message!' testexchange testkey
ok.

Now that the message is sent you can retrieve it again. You can use the basic.get` command here, that polls for new messages on the queue in a synchronous manner (this is OK for maintenance tasks, but for services you want to use basic.consume instead)

Pop a message off the queue:

5> basic.get testqueue
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP uses acknowledgment to signify that a message has been received and processed successfully. If the message hasn’t been acknowledged and consumer channel is closed, the message will be delivered to another consumer.

Note the delivery tag listed in the structure above; Within a connection channel, every received message has a unique delivery tag, This tag is used to acknowledge the message. Also note that delivery tags aren’t unique across connections, so in another client the delivery tag 1 might point to a different message than in this channel.

You can acknowledge the message you received using basic.ack:

6> basic.ack 1
ok.

To clean up after our test session you should delete the entities you created:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.
Routing Tasks
Defining queues

In Celery available queues are defined by the task_queues setting.

Here’s an example queue configuration with three queues; One for video, one for images, and one default queue for everything else:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

Here, the task_default_queue will be used to route tasks that doesn’t have an explicit route.

The default exchange, exchange type, and routing key will be used as the default routing values for tasks, and as the default values for entries in task_queues.

Multiple bindings to a single queue are also supported. Here’s an example of two routing keys that are both bound to the same queue:

from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media.video'),
        binding(media_exchange, routing_key='media.image'),
    ]),
)
Specifying task destination

The destination for a task is decided by the following (in order):

  1. The routing arguments to Task.apply_async().
  2. Routing related attributes defined on the Task itself.
  3. The Routers defined in task_routes.

It’s considered best practice to not hard-code these settings, but rather leave that as configuration options by using Routers; This is the most flexible approach, but sensible defaults can still be set as task attributes.

Routers

A router is a function that decides the routing options for a task.

All you need to define a new router is to define a function with the signature (name, args, kwargs, options, task=None, **kw):

def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

If you return the queue key, it’ll expand with the defined settings of that queue in task_queues:

{'queue': 'video', 'routing_key': 'video.compress'}

becomes –>

{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

You install router classes by adding them to the task_routes setting:

task_routes = (route_task,)

Router functions can also be added by name:

task_routes = ('myapp.routers.route_task',)

For simple task name -> route mappings like the router example above, you can simply drop a dict into task_routes to get the same behavior:

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

The routers will then be traversed in order, it will stop at the first router returning a true value, and use that as the final route for the task.

You can also have multiple routers defined in a sequence:

task_routes = [
    route_task,
    {
        'myapp.tasks.compress_video': {
            'queue': 'video',
            'routing_key': 'video.compress',
    },
]

The routers will then be visited in turn, and the first to return a value will be chosen.

If you’re using Redis or RabbitMQ you can also specify the queue’s default priority in the route.

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
        'priority': 10,
    },
}

Similarly, calling apply_async on a task will override that default priority.

task.apply_async(priority=0)

Priority Order and Cluster Responsiveness

It is important to note that, due to worker prefetching, if a bunch of tasks submitted at the same time they may be out of priority order at first. Disabling worker prefetching will prevent this issue, but may cause less than ideal performance for small, fast tasks. In most cases, simply reducing worker_prefetch_multiplier to 1 is an easier and cleaner way to increase the responsiveness of your system without the costs of disabling prefetching entirely.

Note that priorities values are sorted in reverse: 0 being highest priority.

Broadcast

Celery can also support broadcast routing. Here is an example exchange broadcast_tasks that delivers copies of tasks to all workers connected to it:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

Now the tasks.reload_cache task will be sent to every worker consuming from this queue.

Here is another example of broadcast routing, this time with a celery beat schedule:

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

Broadcast & Results

Note that Celery result doesn’t define what happens if two tasks have the same task_id. If the same task is distributed to more than one worker, then the state history may not be preserved.

It’s a good idea to set the task.ignore_result attribute in this case.

Monitoring and Management Guide

Introduction

There are several tools available to monitor and inspect Celery clusters.

This document describes some of these, as as well as features related to monitoring, like events and broadcast commands.

Workers
Management Command-line Utilities (inspect/control)

celery can also be used to inspect and manage worker nodes (and to some degree tasks).

To list all the commands available do:

$ celery help

or to get help for a specific command do:

$ celery <command> --help
Commands
  • shell: Drop into a Python shell.

    The locals will include the celery variable: this is the current app. Also all known tasks will be automatically added to locals (unless the --without-tasks flag is set).

    Uses Ipython, bpython, or regular python in that order if installed. You can force an implementation using --ipython, --bpython, or --python.

  • status: List active nodes in this cluster

    $ celery -A proj status
    
  • result: Show the result of a task

    $ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
    

    Note that you can omit the name of the task as long as the task doesn’t use a custom result backend.

  • purge: Purge messages from all configured task queues.

    This command will remove all messages from queues configured in the CELERY_QUEUES setting:

    Warning

    There’s no undo for this operation, and messages will be permanently deleted!

    $ celery -A proj purge
    

    You can also specify the queues to purge using the -Q option:

    $ celery -A proj purge -Q celery,foo,bar
    

    and exclude queues from being purged using the -X option:

    $ celery -A proj purge -X celery
    
  • inspect active: List active tasks

    $ celery -A proj inspect active
    

    These are all the tasks that are currently being executed.

  • inspect scheduled: List scheduled ETA tasks

    $ celery -A proj inspect scheduled
    

    These are tasks reserved by the worker when they have an eta or countdown argument set.

  • inspect reserved: List reserved tasks

    $ celery -A proj inspect reserved
    

    This will list all tasks that have been prefetched by the worker, and is currently waiting to be executed (doesn’t include tasks with an ETA value set).

  • inspect revoked: List history of revoked tasks

    $ celery -A proj inspect revoked
    
  • inspect registered: List registered tasks

    $ celery -A proj inspect registered
    
  • inspect stats: Show worker statistics (see Statistics)

    $ celery -A proj inspect stats
    
  • inspect query_task: Show information about task(s) by id.

    Any worker having a task in this set of ids reserved/active will respond with status and information.

    $ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
    

    You can also query for information about multiple tasks:

    $ celery -A proj inspect query_task id1 id2 ... idN
    
  • control enable_events: Enable events

    $ celery -A proj control enable_events
    
  • control disable_events: Disable events

    $ celery -A proj control disable_events
    
  • migrate: Migrate tasks from one broker to another (EXPERIMENTAL).

    $ celery -A proj migrate redis://localhost amqp://localhost
    

    This command will migrate all the tasks on one broker to another. As this command is new and experimental you should be sure to have a backup of the data before proceeding.

Note

All inspect and control commands supports a --timeout argument, This is the number of seconds to wait for responses. You may have to increase this timeout if you’re not getting a response due to latency.

Specifying destination nodes

By default the inspect and control commands operates on all workers. You can specify a single, or a list of workers by using the --destination argument:

$ celery -A proj inspect -d w1@e.com,w2@e.com reserved

$ celery -A proj control -d w1@e.com,w2@e.com enable_events
Flower: Real-time Celery web-monitor

Flower是celery基于Web的实时监视和管理工具。 它正在积极开发中,但是已经是必不可少的工具。 作为Celery的推荐监视器,它淘汰了Django-Admin监视器,celerymon和基于ncurses的监视器。

Flower is pronounced like “flow”, but you can also use the botanical version if you prefer.

Features
  • Real-time monitoring using Celery Events

    • Task progress and history
    • Ability to show task details (arguments, start time, run-time, and more)
    • Graphs and statistics
  • Remote Control

    • View worker status and statistics
    • Shutdown and restart worker instances
    • Control worker pool size and autoscale settings
    • View and modify the queues a worker instance consumes from
    • View currently running tasks
    • View scheduled tasks (ETA/countdown)
    • View reserved and revoked tasks
    • Apply time and rate limits
    • Configuration viewer
    • Revoke or terminate tasks
  • HTTP API

    • List workers
    • Shut down a worker
    • Restart worker’s pool
    • Grow worker’s pool
    • Shrink worker’s pool
    • Autoscale worker pool
    • Start consuming from a queue
    • Stop consuming from a queue
    • List tasks
    • List (seen) task types
    • Get a task info
    • Execute a task
    • Execute a task by name
    • Get a task result
    • Change soft and hard time limits for a task
    • Change rate limit for a task
    • Revoke a task
  • OpenID authentication

Screenshots

_images/dashboard.png
_images/monitor.png

More screenshots:

Usage

You can use pip to install Flower:

$ pip install flower

Running the flower command will start a web-server that you can visit:

$ celery -A proj flower

The default port is http://localhost:5555, but you can change this using the –port argument:

$ celery -A proj flower --port=5555

Broker URL can also be passed through the --broker argument :

$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

Then, you can visit flower in your web browser :

$ open http://localhost:5555

Flower has many more features than are detailed here, including authorization options. Check out the official documentation for more information.

celery events: Curses Monitor

New in version 2.0.

celery events is a simple curses monitor displaying task and worker history. You can inspect the result and traceback of tasks, and it also supports some management commands like rate limiting and shutting down workers. This monitor was started as a proof of concept, and you probably want to use Flower instead.

Starting:

$ celery -A proj events

You should see a screen like:

_images/celeryevshotsm1.jpg

celery events is also used to start snapshot cameras (see Snapshots:

$ celery -A proj events --camera=<camera-class> --frequency=1.0

and it includes a tool to dump events to stdout:

$ celery -A proj events --dump

For a complete list of options use --help:

$ celery events --help
RabbitMQ

To manage a Celery cluster it is important to know how RabbitMQ can be monitored.

RabbitMQ ships with the rabbitmqctl(1) command, with this you can list queues, exchanges, bindings, queue lengths, the memory usage of each queue, as well as manage users, virtual hosts and their permissions.

Note

The default virtual host ("/") is used in these examples, if you use a custom virtual host you have to add the -p argument to the command, for example: rabbitmqctl list_queues -p my_vhost

Inspecting queues

Finding the number of tasks in a queue:

$ rabbitmqctl list_queues name messages messages_ready \
                          messages_unacknowledged

Here messages_ready is the number of messages ready for delivery (sent but not received), messages_unacknowledged is the number of messages that’s been received by a worker but not acknowledged yet (meaning it is in progress, or has been reserved). messages is the sum of ready and unacknowledged messages.

Finding the number of workers currently consuming from a queue:

$ rabbitmqctl list_queues name consumers

Finding the amount of memory allocated to a queue:

$ rabbitmqctl list_queues name memory
Tip:Adding the -q option to rabbitmqctl(1) makes the output easier to parse.
Redis

If you’re using Redis as the broker, you can monitor the Celery cluster using the redis-cli(1) command to list lengths of queues.

Inspecting queues

Finding the number of tasks in a queue:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

The default queue is named celery. To get all available queues, invoke:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*

Note

Queue keys only exists when there are tasks in them, so if a key doesn’t exist it simply means there are no messages in that queue. This is because in Redis a list with no elements in it is automatically removed, and hence it won’t show up in the keys command output, and llen for that list returns 0.

Also, if you’re using Redis for other purposes, the output of the keys command will include unrelated values stored in the database. The recommended way around this is to use a dedicated DATABASE_NUMBER for Celery, you can also use database numbers to separate Celery applications from each other (virtual hosts), but this won’t affect the monitoring events used by for example Flower as Redis pub/sub commands are global rather than database based.

Munin

This is a list of known Munin plug-ins that can be useful when maintaining a Celery cluster.

Events

The worker has the ability to send a message whenever some event happens. These events are then captured by tools like Flower, and celery events to monitor the cluster.

Snapshots

New in version 2.1.

Even a single worker can produce a huge amount of events, so storing the history of all events on disk may be very expensive.

A sequence of events describes the cluster state in that time period, by taking periodic snapshots of this state you can keep all history, but still only periodically write it to disk.

To take snapshots you need a Camera class, with this you can define what should happen every time the state is captured; You can write it to a database, send it by email or something else entirely.

celery events is then used to take snapshots with the camera, for example if you want to capture state every 2 seconds using the camera myapp.Camera you run celery events with the following arguments:

$ celery -A proj events -c myapp.Camera --frequency=2.0
Custom Camera

Cameras can be useful if you need to capture events and do something with those events at an interval. For real-time event processing you should use app.events.Receiver directly, like in Real-time processing.

Here is an example camera, dumping the snapshot to screen:

from pprint import pformat

from celery.events.snapshot import Polaroid

class DumpCam(Polaroid):
    clear_after = True  # clear after flush (incl, state.event_count).

    def on_shutter(self, state):
        if not state.event_count:
            # No new events since last snapshot.
            return
        print('Workers: {0}'.format(pformat(state.workers, indent=4)))
        print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
        print('Total: {0.event_count} events, {0.task_count} tasks'.format(
            state))

See the API reference for celery.events.state to read more about state objects.

Now you can use this cam with celery events by specifying it with the -c option:

$ celery -A proj events -c myapp.DumpCam --frequency=2.0

Or you can use it programmatically like this:

from celery import Celery
from myapp import DumpCam

def main(app, freq=1.0):
    state = app.events.State()
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': state.event})
        with DumpCam(state, freq=freq):
            recv.capture(limit=None, timeout=None)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    main(app)
Real-time processing

To process events in real-time you need the following

  • An event consumer (this is the Receiver)

  • A set of handlers called when events come in.

    You can have different handlers for each event type, or a catch-all handler can be used (‘*’)

  • State (optional)

    app.events.State is a convenient in-memory representation of tasks and workers in the cluster that’s updated as events come in.

    It encapsulates solutions for many common things, like checking if a worker is still alive (by verifying heartbeats), merging event fields together as events come in, making sure time-stamps are in sync, and so on.

Combining these you can easily process events in real-time:

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Note

The wakeup argument to capture sends a signal to all workers to force them to send a heartbeat. This way you can immediately see workers when the monitor starts.

You can listen to specific events by specifying the handlers:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)
Event Reference

This list contains the events sent by the worker, and their arguments.

Task Events
task-sent
signature:task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)

Sent when a task message is published and the task_send_sent_event setting is enabled.

task-received
signature:task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)

Sent when the worker receives a task.

task-started
signature:task-started(uuid, hostname, timestamp, pid)

Sent just before the worker executes the task.

task-succeeded
signature:task-succeeded(uuid, result, runtime, hostname, timestamp)

Sent if the task executed successfully.

Run-time is the time it took to execute the task using the pool. (Starting from the task is sent to the worker pool, and ending when the pool result handler callback is called).

task-failed
signature:task-failed(uuid, exception, traceback, hostname, timestamp)

Sent if the execution of the task failed.

task-rejected
signature:task-rejected(uuid, requeued)

The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.

task-revoked
signature:task-revoked(uuid, terminated, signum, expired)

Sent if the task has been revoked (Note that this is likely to be sent by more than one worker).

  • terminated is set to true if the task process was terminated,
    and the signum field set to the signal used.
  • expired is set to true if the task expired.
task-retried
signature:task-retried(uuid, exception, traceback, hostname, timestamp)

Sent if the task failed, but will be retried in the future.

Worker Events
worker-online
signature:worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

The worker has connected to the broker and is online.

  • hostname: Nodename of the worker.
  • timestamp: Event time-stamp.
  • freq: Heartbeat frequency in seconds (float).
  • sw_ident: Name of worker software (e.g., py-celery).
  • sw_ver: Software version (e.g., 2.2.0).
  • sw_sys: Operating System (e.g., Linux/Darwin).
worker-heartbeat
signature:worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)

Sent every minute, if the worker hasn’t sent a heartbeat in 2 minutes, it is considered to be offline.

  • hostname: Nodename of the worker.
  • timestamp: Event time-stamp.
  • freq: Heartbeat frequency in seconds (float).
  • sw_ident: Name of worker software (e.g., py-celery).
  • sw_ver: Software version (e.g., 2.2.0).
  • sw_sys: Operating System (e.g., Linux/Darwin).
  • active: Number of currently executing tasks.
  • processed: Total number of tasks processed by this worker.
worker-offline
signature:worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

The worker has disconnected from the broker.

Security

Introduction

While Celery is written with security in mind, it should be treated as an unsafe component.

Depending on your Security Policy, there are various steps you can take to make your Celery installation more secure.

Areas of Concern
Broker

It’s imperative that the broker is guarded from unwanted access, especially if accessible to the public. By default, workers trust that the data they get from the broker hasn’t been tampered with. 有关如何使代理连接更可信的信息,请参见 Message Signing-消息签名

第一道防线应该是在代理之前放置防火墙,仅允许白名单中的计算机访问它。

Keep in mind that both firewall misconfiguration, and temporarily disabling the firewall, is common in the real world. Solid security policy includes monitoring of firewall equipment to detect if they’ve been disabled, be it accidentally or on purpose.

In other words, one shouldn’t blindly trust the firewall either.

If your broker supports fine-grained access control, like RabbitMQ, this is something you should look at enabling. See for example http://www.rabbitmq.com/access-control.html.

If supported by your broker backend, you can enable end-to-end SSL encryption and authentication using broker_use_ssl.

Client

In Celery, “client” refers to anything that sends messages to the broker, for example web-servers that apply tasks.

Having the broker properly secured doesn’t matter if arbitrary messages can be sent through a client.

[Need more text here]

Worker

The default permissions of tasks running inside a worker are the same ones as the privileges of the worker itself. This applies to resources, such as; memory, file-systems, and devices.

An exception to this rule is when using the multiprocessing based task pool, which is currently the default. In this case, the task will have access to any memory copied as a result of the fork() call, and access to memory contents written by parent tasks in the same worker child process.

Limiting access to memory contents can be done by launching every task in a subprocess (fork() + execve()).

Limiting file-system and device access can be accomplished by using chroot, jail, sandboxing, virtual machines, or other mechanisms as enabled by the platform or additional software.

Note also that any task executed in the worker will have the same network access as the machine on which it’s running. If the worker is located on an internal network it’s recommended to add firewall rules for outbound traffic.

Serializers

The default serializer is JSON since version 4.0, but since it has only support for a restricted set of types you may want to consider using pickle for serialization instead.

The pickle serializer is convenient as it can serialize almost any Python object, even functions with some work, but for the same reasons pickle is inherently insecure [*], and should be avoided whenever clients are untrusted or unauthenticated.

You can disable untrusted content by specifying a white-list of accepted content-types in the accept_content setting:

New in version 3.0.18.

Note

This setting was first supported in version 3.0.18. If you’re running an earlier version it will simply be ignored, so make sure you’re running a version that supports it.

accept_content = ['json']

This accepts a list of serializer names and content-types, so you could also specify the content type for json:

accept_content = ['application/json']

Celery also comes with a special auth serializer that validates communication between Celery clients and workers, making sure that messages originates from trusted sources. Using Public-key cryptography the auth serializer can verify the authenticity of senders, to enable this read Message Signing for more information.

Message Signing

Celery can use the cryptography library to sign message using Public-key cryptography, where messages sent by clients are signed using a private key and then later verified by the worker using a public certificate.

Optimally certificates should be signed by an official Certificate Authority, but they can also be self-signed.

To enable this you should configure the task_serializer setting to use the auth serializer. Enforcing the workers to only accept signed messages, you should set accept_content to [‘auth’]. For additional signing of the event protocol, set event_serializer to auth. Also required is configuring the paths used to locate private keys and certificates on the file-system: the security_key, security_certificate, and security_cert_store settings respectively. You can tweak the signing algorithm with security_digest.

With these configured it’s also necessary to call the celery.setup_security() function. Note that this will also disable all insecure serializers so that the worker won’t accept messages with untrusted content types.

This is an example configuration using the auth serializer, with the private key and certificate files located in /etc/ssl.

app = Celery()
app.conf.update(
    security_key='/etc/ssl/private/worker.key'
    security_certificate='/etc/ssl/certs/worker.pem'
    security_cert_store='/etc/ssl/certs/*.pem',
    security_digest='sha256',
    task_serializer='auth',
    event_serializer='auth',
    accept_content=['auth']
)
app.setup_security()

Note

While relative paths aren’t disallowed, using absolute paths is recommended for these files.

Also note that the auth serializer won’t encrypt the contents of a message, so if needed this will have to be enabled separately.

Intrusion Detection

The most important part when defending your systems against intruders is being able to detect if the system has been compromised.

Logs

Logs are usually the first place to look for evidence of security breaches, but they’re useless if they can be tampered with.

A good solution is to set up centralized logging with a dedicated logging server. Access to it should be restricted. In addition to having all of the logs in a single place, if configured correctly, it can make it harder for intruders to tamper with your logs.

This should be fairly easy to setup using syslog (see also syslog-ng and rsyslog). Celery uses the logging library, and already has support for using syslog.

A tip for the paranoid is to send logs using UDP and cut the transmit part of the logging server’s network cable :-)

Tripwire

Tripwire is a (now commercial) data integrity tool, with several open source implementations, used to keep cryptographic hashes of files in the file-system, so that administrators can be alerted when they change. This way when the damage is done and your system has been compromised you can tell exactly what files intruders have changed (password files, logs, back-doors, root-kits, and so on). Often this is the only way you’ll be able to detect an intrusion.

Some open source implementations include:

Also, the ZFS file-system comes with built-in integrity checks that can be used.

Footnotes

[*]https://blog.nelhage.com/2011/03/exploiting-pickle/

Optimizing

Introduction

The default configuration makes a lot of compromises. It’s not optimal for any single case, but works well enough for most situations.

There are optimizations that can be applied based on specific use cases.

Optimizations can apply to different properties of the running environment, be it the time tasks take to execute, the amount of memory used, or responsiveness at times of high load.

Ensuring Operations

In the book Programming Pearls, Jon Bentley presents the concept of back-of-the-envelope calculations by asking the question;

❝ How much water flows out of the Mississippi River in a day?

The point of this exercise [*] is to show that there’s a limit to how much data a system can process in a timely manner. Back of the envelope calculations can be used as a means to plan for this ahead of time.

In Celery; If a task takes 10 minutes to complete, and there are 10 new tasks coming in every minute, the queue will never be empty. This is why it’s very important that you monitor queue lengths!

A way to do this is by using Munin. You should set up alerts, that’ll notify you as soon as any queue has reached an unacceptable size. This way you can take appropriate action like adding new worker nodes, or revoking unnecessary tasks.

General Settings
librabbitmq

If you’re using RabbitMQ (AMQP) as the broker then you can install the librabbitmq module to use an optimized client written in C:

$ pip install librabbitmq

The ‘amqp’ transport will automatically use the librabbitmq module if it’s installed, or you can also specify the transport you want directly by using the pyamqp:// or librabbitmq:// prefixes.

Broker Connection Pools

从2.5版开始,默认情况下启用代理连接池。

You can tweak the broker_pool_limit setting to minimize contention, and the value should be based on the number of active threads/green-threads using broker connections.

Using Transient Queues

Queues created by Celery are persistent by default. This means that the broker will write messages to disk to ensure that the tasks will be executed even if the broker is restarted.

But in some cases it’s fine that the message is lost, so not all tasks require durability. You can create a transient queue for these tasks to improve performance:

from kombu import Exchange, Queue

task_queues = (
    Queue('celery', routing_key='celery'),
    Queue('transient', Exchange('transient', delivery_mode=1),
          routing_key='transient', durable=False),
)

or by using task_routes:

task_routes = {
    'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}
}

The delivery_mode changes how the messages to this queue are delivered. A value of one means that the message won’t be written to disk, and a value of two (default) means that the message can be written to disk.

To direct a task to your new transient queue you can specify the queue argument (or use the task_routes setting):

task.apply_async(args, queue='transient')

For more information see the routing guide.

Worker Settings
Prefetch Limits

Prefetch is a term inherited from AMQP that’s often misunderstood by users.

The prefetch limit is a limit for the number of tasks (messages) a worker can reserve for itself. If it is zero, the worker will keep consuming messages, not respecting that there may be other available worker nodes that may be able to process them sooner [†], or that the messages may not even fit in memory.

The workers’ default prefetch count is the worker_prefetch_multiplier setting multiplied by the number of concurrency slots [‡] (processes/threads/green-threads).

If you have many tasks with a long duration you want the multiplier value to be one: meaning it’ll only reserve one task per worker process at a time.

However – If you have many short-running tasks, and throughput/round trip latency is important to you, this number should be large. The worker is able to process more tasks per second if the messages have already been prefetched, and is available in memory. You may have to experiment to find the best value that works for you. Values like 50 or 150 might make sense in these circumstances. Say 64, or 128.

If you have a combination of long- and short-running tasks, the best option is to use two worker nodes that are configured separately, and route the tasks according to the run-time (see Routing Tasks).

Reserve one task at a time

The task message is only deleted from the queue after the task is acknowledged, so if the worker crashes before acknowledging the task, it can be redelivered to another worker (or the same after recovery).

When using the default of early acknowledgment, having a prefetch multiplier setting of one, means the worker will reserve at most one extra task for every worker process: or in other words, if the worker is started with -c 10, the worker may reserve at most 20 tasks (10 acknowledged tasks executing, and 10 unacknowledged reserved tasks) at any time.

Often users ask if disabling “prefetching of tasks” is possible, but what they really mean by that, is to have a worker only reserve as many tasks as there are worker processes (10 unacknowledged tasks for -c 10)

That’s possible, but not without also enabling late acknowledgment. Using this option over the default behavior means a task that’s already started executing will be retried in the event of a power failure or the worker instance being killed abruptly, so this also means the task must be idempotent

You can enable this behavior by using the following configuration options:

task_acks_late = True
worker_prefetch_multiplier = 1
Prefork pool prefetch settings

The prefork pool will asynchronously send as many tasks to the processes as it can and this means that the processes are, in effect, prefetching tasks.

This benefits performance but it also means that tasks may be stuck waiting for long running tasks to complete:

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3

The worker will send tasks to the process as long as the pipe buffer is writable. The pipe buffer size varies based on the operating system: some may have a buffer as small as 64KB but on recent Linux versions the buffer size is 1MB (can only be changed system wide).

You can disable this prefetching behavior by enabling the -O fair worker option:

$ celery -A proj worker -l info -O fair

With this option enabled the worker will only write to processes that are available for work, disabling the prefetch behavior:

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send T3 to process B
# B executes T3

<- T3 complete sent by process B
<- T1 complete sent by process A

Footnotes

[*]The chapter is available to read for free here: The back of the envelope. The book is a classic text. Highly recommended.
[†]RabbitMQ and other brokers deliver messages round-robin, so this doesn’t apply to an active system. If there’s no prefetch limit and you restart the cluster, there will be timing delays between nodes starting. If there are 3 offline nodes and one active node, all messages will be delivered to the active node.
[‡]This is the concurrency setting; worker_concurrency or the celery worker -c option.

Debugging

Debugging Tasks Remotely (using pdb)
Basics

celery.contrib.rdb is an extended version of pdb that enables remote debugging of processes that doesn’t have terminal access.

Example usage:

from celery import task
from celery.contrib import rdb

@task()
def add(x, y):
    result = x + y
    rdb.set_trace()  # <- set break-point
    return result

set_trace() sets a break-point at the current location and creates a socket you can telnet into to remotely debug your task.

The debugger may be started by multiple processes at the same time, so rather than using a fixed port the debugger will search for an available port, starting from the base port (6900 by default). The base port can be changed using the environment variable CELERY_RDB_PORT.

By default the debugger will only be available from the local host, to enable access from the outside you have to set the environment variable CELERY_RDB_HOST.

When the worker encounters your break-point it’ll log the following information:

[INFO/MainProcess] Received task:
    tasks.add[d7261c71-4962-47e5-b342-2448bedd20e8]
[WARNING/PoolWorker-1] Remote Debugger:6900:
    Please telnet 127.0.0.1 6900.  Type `exit` in session to continue.
[2011-01-18 14:25:44,119: WARNING/PoolWorker-1] Remote Debugger:6900:
    Waiting for client...

If you telnet the port specified you’ll be presented with a pdb shell:

$ telnet localhost 6900
Connected to localhost.
Escape character is '^]'.
> /opt/devel/demoapp/tasks.py(128)add()
-> return result
(Pdb)

Enter help to get a list of available commands, It may be a good idea to read the Python Debugger Manual if you have never used pdb before.

To demonstrate, we’ll read the value of the result variable, change it and continue execution of the task:

(Pdb) result
4
(Pdb) result = 'hello from rdb'
(Pdb) continue
Connection closed by foreign host.

The result of our vandalism can be seen in the worker logs:

[2011-01-18 14:35:36,599: INFO/MainProcess] Task
    tasks.add[d7261c71-4962-47e5-b342-2448bedd20e8] succeeded
    in 61.481s: 'hello from rdb'
Tips
Enabling the break-point signal

If the environment variable CELERY_RDBSIG is set, the worker will open up an rdb instance whenever the SIGUSR2 signal is sent. This is the case for both main and worker processes.

For example starting the worker with:

$ CELERY_RDBSIG=1 celery worker -l info

You can start an rdb session for any of the worker processes by executing:

$ kill -USR2 <pid>

Concurrency

Release:4.3
Date:Apr 02, 2019
Concurrency with Eventlet
Introduction

The Eventlet homepage describes it as a concurrent networking library for Python that allows you to change how you run your code, not how you write it.

  • It uses epoll(4) or libevent for highly scalable non-blocking I/O.
  • Coroutines ensure that the developer uses a blocking style of programming that’s similar to threading, but provide the benefits of non-blocking I/O.
  • The event dispatch is implicit: meaning you can easily use Eventlet from the Python interpreter, or as a small part of a larger application.

Celery supports Eventlet as an alternative execution pool implementation and in some cases superior to prefork. However, you need to ensure one task doesn’t block the event loop too long. Generally, CPU-bound operations don’t go well with Evenetlet. Also note that some libraries, usually with C extensions, cannot be monkeypatched and therefore cannot benefit from using Eventlet. Please refer to their documentation if you are not sure. For example, pylibmc does not allow cooperation with Eventlet but psycopg2 does when both of them are libraries with C extensions.

The prefork pool can take use of multiple processes, but how many is often limited to a few processes per CPU. With Eventlet you can efficiently spawn hundreds, or thousands of green threads. In an informal test with a feed hub system the Eventlet pool could fetch and process hundreds of feeds every second, while the prefork pool spent 14 seconds processing 100 feeds. Note that this is one of the applications async I/O is especially good at (asynchronous HTTP requests). You may want a mix of both Eventlet and prefork workers, and route tasks according to compatibility or what works best.

Enabling Eventlet

You can enable the Eventlet pool by using the celery worker -P worker option.

$ celery -A proj worker -P eventlet -c 1000
Examples

See the Eventlet examples directory in the Celery distribution for some examples taking use of Eventlet support.

Signals

Signals allow decoupled applications to receive notifications when certain actions occur elsewhere in the application.

Celery ships with many signals that your application can hook into to augment behavior of certain actions.

Basics

Several kinds of events trigger signals, you can connect to these signals to perform actions as they trigger.

Example connecting to the after_task_publish signal:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

Some signals also have a sender you c