[Перевод] Асинхронные задания в Django с Celery
Перевод статьи подготовлен в преддверии старта курса «Web-разработчик на Python».
Если в вашем приложении есть какой-то длительный процесс, вы можете обрабатывать его не в стандартном потоке запросов/ответов, а в фоновом режиме.
К примеру, в вашем приложении пользователь должен отправить картинку-миниатюру (которую, скорее всего, нужно будет отредактировать) и подтвердить адрес электронной почты. Если ваше приложение обрабатывает изображение, а потом отправляет письмо для подтверждения в обработчике запросов, то конечному пользователю придется зачем-то ждать завершения выполнения обеих задач перед тем, как перезагрузить или закрыть страницу. Вместо этого, вы можете передать эти операции в очередь задач и оставить на обработку отдельному процессу, чтобы немедленно отправить пользователю ответ. В таком случае, конечный пользователь сможет заниматься другими делами на стороне клиента во время выполнения обработки в фоновом режиме. Ваше приложение в таком случае также сможет свободно отвечать на запросы других пользователей и клиентов.
Сегодня мы поговорим о процессе настройки и конфигурирования Celery и Redis для обработки длительных процессов в приложении на Django, чтобы решать такие задачи. Также мы воспользуемся Docker и Docker Compose, чтобы связать все части вместе, и рассмотрим, как тестировать задания Celery с помощью модульных и интеграционных тестов.
К концу этого руководства мы научимся:
- Интегрировать Celery в Django, чтобы создавать фоновые задания.
- Упаковывать Django, Celery и Redis с помощью Docker.
- Запускать процессы в фоновом режиме с помощью отдельного рабочего процесса.
- Сохранять логи Celery в файл.
- Настраивать Flower для мониторинга и администрирования заданий и воркеров (worker) Celery.
- Тестировать задания Celery с помощью модульных и интеграционных тестов.
Фоновые задачи
Для улучшения пользовательского опыта, продолжительные процессы должны выполняться в фоновом режиме вне обычного потока HTTP-запросов/ответов.
Например:
- Отправка писем для подтверждения;
- Веб-скейпинг и краулинг;
- Анализ данных;
- Обработка изображений;
- Генерация отчетов.
При создании приложения, старайтесь отделять задачи, которые должны выполняться в течение жизненного цикла запроса/ответа, например CRUD-операции, от задач, которые должны выполняться в фоновом режиме.
Рабочий процесс
Наша цель – разработать приложение на Django, которое для обработки продолжительных процессов вне цикла запрос/ответ использует Celery.
- Конечный пользователь генерирует новое задание, отправляя POST-запрос на сервер.
- В этом представлении задание добавляется в очередь, а id задания отправляется обратно на клиент.
- С помощью AJAX клиент продолжает опрашивать сервер, чтобы проверить состояние задания, в том время как само задание выполняется в фоновом режиме.
Создание проекта
Клонируйте проект из репозитория django-celery и выполните checkout по тегу v1 в ветке master:
$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master
Поскольку в общей сложности нам нужно работать с тремя процессами (Django, Redis, воркер), мы используем Docker для упрощения работы, соединив их так, чтобы мы могли все запустить одной командой в одном окне терминала.
Из корня проекта создайте образы и запустите Docker-контейнеры:
$ docker-compose up -d --build
Когда сборка завершится, перейдите на localhost:1337:
Убедитесь в том, что тесты проходят успешно:
$ docker-compose exec web python -m pytest
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item
tests/test_tasks.py . [100%]
========================================= 1 passed in 0.47s =========================================
Давайте взглянем на структуру проекта перед тем, как двигаться дальше:
├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── pytest.ini
├── requirements.txt
├── static
│ ├── bulma.min.css
│ ├── jquery-3.4.1.min.js
│ ├── main.css
│ └── main.js
├── tasks
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── templates
│ │ └── home.html
│ └── views.py
└── tests
├── __init__.py
└── test_tasks.py
Запуск задания
Обработчик событий в project/static/main.js
подписан на нажатие на кнопку. По клику на сервер отправляет AJAX POST-запрос с соответствующим типом задания: 1
, 2
или 3
.
$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
На стороне сервера уже настроено представление для обработки запроса в project/tasks/views.py
:
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
А теперь начинается самое интересное: привязываем Celery!
Настройка Celery
Начнем с того, что добавим Celery и Redis в файл project/requirements.txt
:
celery==4.4.1
Django==3.0.4
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Celery использует брокер сообщений — RabbitMQ, Redis или AWS Simple Queue Service (SQS) – чтобы упростить коммуникацию между воркером Celery и веб-приложением. Сообщения направляются к брокеру, а после обрабатываются воркером. После этого результаты отправляются на бэкенд.
Redis будет одновременно и брокером и бэкендом. Добавьте Redis и воркера Celery в файл docker-compose.yml
следующим образом:
version: '3.7'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:5-alpine
Обратите внимание на celery worker --app=core --loglevel=info
:
celery worker
используется для запуска воркера Celery;--app=core
используется для запускаcore
приложения Celery (которое мы коротко определим);--loglevel=info
определяет уровень логирования информации.
В модуль настроек проекта добавьте следующее, чтобы Celery использовала Redis в качестве брокера и бэкенда:
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Затем создайте файл sample_tasks.py
в project/tasks
:
# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
Здесь, с помощью декоратора shared_task мы определили новую функцию-задание Celery, которая называется create_task
.
Помните о том, что само задание не будет выполняться из процесса Django, оно будет выполнено воркером Celery.
А теперь добавьте файл celery.py
в "project/core"
:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Что тут происходит?
- Для начала нужно установить значение по умолчанию для среды
DJANGO_SETTINGS_MODULE
, чтобы Celery знала, как найти проект Django. - Затем мы создали экземпляр Celery с именем
core
и поместили в переменнуюapp
. - Затем мы загрузили значения конфигурации Celery из объекта настроек из
django.conf
. Мы использовали namespace=«CELERY» для предотвращения коллизий с другими настройками Django. Таким образом, все настройки конфигурации для Celery должны начинаться с префиксаCELERY_
. - Наконец,
app.autodiscover_tasks()
говорит Celery искать задания из приложений, определенных вsettings.INSTALLED_APPS
.
Измените project/core/__init__.py
, чтобы приложение Celery автоматически импортировалось при запуске Django:
from .celery import app as celery_app
__all__ = ("celery_app",)
Запуск задания
Обновите представление, чтобы начать выполнение задания и отправить id:
@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
Не забудьте импортировать задание:
from tasks.sample_tasks import create_task
Соберите образы и разверните новые контейнеры:
$ docker-compose up -d --build
Для запуска нового задания, выполните:
$ curl -F type=0 http://localhost:1337/tasks/
Вы увидите что-то вроде этого:
{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
Статус задания
Вернитесь к обработчику событий на стороне клиента:
$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Когда от AJAX-запроса вернется ответ, мы будем слать getStatus()
с id задания каждую секунду:
function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
Если ответ положительный, то новая строка будет добавлена к таблице DOM. Обновите представление get_status
, чтобы вернуть статус:
@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
Импортируйте AsyncResult:
from celery.result import AsyncResult
Обновите контейнеры:
$ docker-compose up -d --build
Запустите новое задание:
$ curl -F type=1 http://localhost:1337/tasks/
Затем извлеките task_id
из ответа и вызовите обновленный get_status
, чтобы увидеть статус:
$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": true
}
Ту же информацию вы можете посмотреть в браузере:
Логи Celery
Обновите сервис celery
в docker-compose.yml
так, чтобы логи Celery отправились в отдельный файл:
celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Добавьте новую директорию в “project” и назовите ее “logs”. Затем добавьте в этот новый каталог положите файл celery.log
.
Обновите:
$ docker-compose up -d --build
Вы должны видеть, как файл с логами локально заполняется после настройки volume:
[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
/usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
succeeded in 10.027462100006233s: True
Панель мониторинга Flower
Flower – это легкий веб-инструмент для мониторинга Celery в режиме реального времени. Вы можете отслеживать запущенные задания, увеличивать или уменьшать пул воркеров, отображать графики и статистику, например.
Добавьте его в requirements.txt
:
celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Затем добавьте новый сервис в docker-compose.yml
:
dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
И протестируйте:
$ docker-compose up -d --build
Перейдите на localhost:5555 для просмотра панели мониторинга. Вы должны увидеть одного воркера:
Запустите еще несколько заданий, чтобы протестировать панель мониторинга:
Попробуйте добавить больше воркеров и посмотреть, как это скажется на производительности:
$ docker-compose up -d --build --scale celery=3
Тесты
Давайте начнем с самого простого теста:
def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
Добавьте тест-кейс выше в project/tests/test_tasks.py
и допишите следующий импорт:
from tasks import sample_tasks
Запустите этот тест:
$ docker-compose exec web python -m pytest -k "test_task and not test_home"
Выполнение данного теста займет около минуты:
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py . [100%]
============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================
Стоит отметить, что в assert’ах выше мы использовали метод .run
вместо .delay
для непосредственного запуска задач, без использования воркера Celery.
Хотите использовать заглушки(mock), чтобы ускорить процесс?
@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
Импортируйте:
from unittest.mock import patch, call
Протестируйте:
$ docker-compose exec web python -m pytest -k "test_mock_task"
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected
tests/test_tasks.py . [100%]
================================== 1 passed, 2 deselected in 1.13s ==================================
Видите? Теперь гораздо быстрее!
Как насчет полноценного интеграционного тестирования?
def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
Помните, что этот тест использует того же брокера и бэкенд, что и в разработке. Вы можете создать новый экземпляр приложения Celery для тестирования:
app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
Добавьте импорт:
import json
И убедитесь в том, что тесты прошли успешно.
Заключение
Сегодня мы познакомились с базовой настройкой Celery для выполнения долгосрочных заданий в приложении на Django. Вы должны отправлять в очередь обработки любые процессы, которые могут замедлить работу кода на стороне пользователя.
Также Celery можно использовать для выполнения повторяющихся задач и декомпозиции сложных ресурсоемких задач, чтобы распределить вычислительную нагрузку на несколько машин и уменьшить время выполнения и нагрузку на машину, которая обрабатывает запросы клиента.
Весь код вы можете найти в этом репозитории.
→ Успеть на курс