Django’s Tasks framework¶
For a web application, there’s often more than just turning HTTP requests into HTTP responses. For some functionality, it may be beneficial to run code outside the request-response cycle.
That’s where background Tasks come in.
Background Tasks can offload work to be run outside the request-response cycle, to be run elsewhere, potentially at a later date. This keeps requests fast, reduces latency, and improves the user experience. For example, a user shouldn’t have to wait for an email to send before their page finishes loading.
Django’s new Tasks framework makes it easy to define and enqueue such work. It does not provide a worker mechanism to run Tasks. The actual execution must be handled by infrastructure outside Django, such as a separate process or service.
Background Task fundamentals¶
When work needs to be done in the background, Django creates a Task
, which
is stored in the Queue Store. This Task
contains all the metadata needed to
execute it, as well as a unique identifier for Django to retrieve the result
later.
A Worker will look at the Queue Store for new Tasks to run. When a new Task is added, a Worker claims the Task, executes it, and saves the status and result back to the Queue Store. These workers run outside the request-response lifecycle.
Configuring a Task backend¶
The Task backend determines how and where Tasks are stored for execution and how they are executed. Different Task backends have different characteristics and configuration options, which may impact the performance and reliability of your application. Django comes with a number of built-in backends. Django does not provide a generic way to execute Tasks, only enqueue them.
Task backends are configured using the TASKS
setting in your
settings file. Whilst most applications will only need a single backend,
multiple are supported.
Immediate execution¶
This is the default backend if another is not specified in your settings file.
The ImmediateBackend
runs enqueued Tasks immediately, rather than in
the background. This allows background Task functionality to be slowly added to
an application, before the required infrastructure is available.
To use it, set BACKEND
to
"django.tasks.backends.immediate.ImmediateBackend"
:
TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}
The ImmediateBackend
may also be useful in tests, to bypass the need
to run a real background worker in your tests.
Dummy backend¶
The DummyBackend
doesn’t execute enqueued Tasks at all, instead
storing results for later use. Task results will forever remain in the
READY
state.
This backend is not intended for use in production - it is provided as a convenience that can be used during development and testing.
To use it, set BACKEND
to
"django.tasks.backends.dummy.DummyBackend"
:
TASKS = {"default": {"BACKEND": "django.tasks.backends.dummy.DummyBackend"}}
The results for enqueued Tasks can be retrieved from the backend’s
results
attribute:
>>> from django.tasks import default_task_backend
>>> my_task.enqueue()
>>> len(default_task_backend.results)
1
Stored results can be cleared using the
clear()
method:
>>> default_task_backend.clear()
>>> len(default_task_backend.results)
0
Using a custom backend¶
While Django includes support for a number of Task backends out-of-the-box,
sometimes you might want to customize the Task backend. To use an external Task
backend with Django, use the Python import path as the BACKEND
of the TASKS
setting, like so:
TASKS = {
"default": {
"BACKEND": "path.to.backend",
}
}
A Task backend is a class that inherits
BaseTaskBackend
. At a minimum, it must
implement BaseTaskBackend.enqueue()
. If you’re building your own
backend, you can use the built-in Task backends as reference implementations.
You’ll find the code in the django/tasks/backends/ directory of the
Django source.
Asynchronous support¶
Django has developing support for asynchronous Task backends.
BaseTaskBackend
has async variants of all
base methods. By convention, the asynchronous versions of all methods are
prefixed with a
. The arguments for both variants are the same.
Retrieving backends¶
Backends can be retrieved using the task_backends
connection handler:
from django.tasks import task_backends
task_backends["default"] # The default backend
task_backends["reserve"] # Another backend
The “default” backend is available as default_task_backend
:
from django.tasks import default_task_backend
Defining Tasks¶
Tasks are defined using the django.tasks.task()
decorator on a
module-level function:
from django.core.mail import send_mail
from django.tasks import task
@task
def email_users(emails, subject, message):
return send_mail(
subject=subject, message=message, from_email=None, recipient_list=emails
)
The return value of the decorator is a Task
instance.
Task
attributes can be customized via the @task
decorator arguments:
from django.core.mail import send_mail
from django.tasks import task
@task(priority=2, queue_name="emails")
def email_users(emails, subject, message):
return send_mail(
subject=subject, message=message, from_email=None, recipient_list=emails
)
By convention, Tasks are defined in a tasks.py
file, however this is not
enforced.
Task context¶
Sometimes, the running Task
may need to know context about how it was
enqueued, and how it is being executed. This can be accessed by taking a
context
argument, which is an instance of
TaskContext
.
To receive the Task context as an argument to your Task function, pass
takes_context
when defining it:
import logging
from django.core.mail import send_mail
from django.tasks import task
logger = logging.getLogger(__name__)
@task(takes_context=True)
def email_users(context, emails, subject, message):
logger.debug(
f"Attempt {context.attempt} to send user email. Task result id: {context.task_result.id}."
)
return send_mail(
subject=subject, message=message, from_email=None, recipient_list=emails
)
Modifying Tasks¶
Before enqueueing Tasks, it may be necessary to modify certain parameters of the Task. For example, to give it a higher priority than it would normally.
A Task
instance cannot be modified directly. Instead, a modified instance
can be created with the using()
method, leaving the
original as-is. For example:
>>> email_users.priority
0
>>> email_users.using(priority=10).priority
10
Enqueueing Tasks¶
To add the Task to the queue store, so it will be executed, call the
enqueue()
method on it. If the Task takes arguments,
these can be passed as-is. For example:
result = email_users.enqueue(
emails=["user@example.com"],
subject="You have a message",
message="Hello there!",
)
This returns a TaskResult
, which can be used to retrieve
the result of the Task once it has finished executing.
To enqueue Tasks in an async
context, aenqueue()
is available as an async
variant of enqueue()
.
Because both Task arguments and return values are serialized to JSON, they must be JSON-serializable:
>>> process_data.enqueue(datetime.now())
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable
Arguments must also be able to round-trip through a json.dumps()
/
json.loads()
cycle without changing type. For example, consider this
Task:
@task()
def double_dictionary(key):
return {key: key * 2}
With the ImmediateBackend
configured as the default backend:
>>> result = double_dictionary.enqueue((1, 2, 3))
>>> result.status
FAILED
>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: unhashable type: 'list'
The double_dictionary
Task fails because after the JSON round-trip the
tuple (1, 2, 3)
becomes the list [1, 2, 3]
, which cannot be used as a
dictionary key.
In general, complex objects such as model instances, or built-in types like
datetime
and tuple
cannot be used in Tasks without additional
conversion.
Transactions¶
For most backends, Tasks are run in a separate process, using a different database connection. When using a transaction, without waiting for it to commit, workers could start to process a Task which uses objects it can’t access yet.
For example, consider this simplified example:
@task
def my_task(thing_num):
Thing.objects.get(num=thing_num)
with transaction.atomic():
Thing.objects.create(num=1)
my_task.enqueue(thing_num=1)
To prevent the scenario where my_task
runs before the Thing
is
committed to the database, use transaction.on_commit()
, binding all arguments to
enqueue()
via functools.partial()
:
from functools import partial
from django.db import transaction
with transaction.atomic():
Thing.objects.create(num=1)
transaction.on_commit(partial(my_task.enqueue, thing_num=1))
Task results¶
When enqueueing a Task
, you receive a TaskResult
,
however it’s likely useful to retrieve the result from somewhere else (for
example another request or another Task).
Each TaskResult
has a unique id
, which can
be used to identify and retrieve the result once the code which enqueued the
Task has finished.
The get_result()
method can retrieve a result based on
its id
:
# Later, somewhere else...
result = email_users.get_result(result_id)
To retrieve a TaskResult
, regardless of which kind of Task
it was from,
use the get_result()
method on the backend:
from django.tasks import default_task_backend
result = default_task_backend.get_result(result_id)
To retrieve results in an async
context,
aget_result()
is available as an async
variant of
get_result()
on both the backend and Task
.
Some backends, such as the built-in ImmediateBackend
do not support
get_result()
. Calling get_result()
on these backends will
raise NotImplementedError
.
Updating results¶
A TaskResult
contains the status of a Task’s execution at the point it was
retrieved. If the Task finishes after get_result()
is
called, it will not update.
To refresh the values, call the django.tasks.TaskResult.refresh()
method:
>>> result.status
RUNNING
>>> result.refresh() # or await result.arefresh()
>>> result.status
SUCCESSFUL
Return values¶
If your Task function returns something, it can be retrieved from the
django.tasks.TaskResult.return_value
attribute:
>>> result.status
SUCCESSFUL
>>> result.return_value
42
If the Task has not finished executing, or has failed, ValueError
is
raised.
>>> result.status
RUNNING
>>> result.return_value
Traceback (most recent call last):
...
ValueError: Task has not finished yet
Errors¶
If the Task doesn’t succeed, and instead raises an exception, either as part of
the Task or as part of running it, the exception and traceback are saved to the
django.tasks.TaskResult.errors
list.
Each entry in errors
is a TaskError
containing
information about error raised during the execution:
>>> result.errors[0].exception_class
<class 'ValueError'>
Note that this is just the type of exception, and contains no other values. The traceback information is reduced to a string which you can use to help debugging:
>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable