[Перевод] Асинхронные задачи с FastAPI и Celery

Если в приложении есть длительные процессы, то вместо того, чтобы блокировать основной поток, вы должны обрабатывать их в фоновом режиме.

Допустим, веб-приложение требует, чтобы пользователи отправляли фотографию (размер которой, вероятно, потребуется изменить) и подтверждали свой адрес электронной почты при регистрации. Если приложение обработало изображение и отправило электронное письмо с подтверждением непосредственно в обработчике запроса, то конечному пользователю пришлось бы ждать, пока они оба завершат обработку. Вместо этого можно передать эти процессы в очередь задач и позволить отдельному рабочему процессу разобраться с ними. В это время пользователь может выполнять другие операции на стороне клиента, пока происходит обработка. Приложение также может свободно отвечать на запросы других пользователей и клиентов.

Чтобы достичь этого, мы расскажем вам о процессе настройки Celery и Redis для обработки длительно выполняющихся процессов в приложении FastAPI. Мы также будем использовать Docker и Docker Compose, чтобы связать все воедино. Наконец, мы рассмотрим, как протестировать задачи Celery с помощью модульных (unit) и интеграционных тестов.

Цель

К концу этого урока вы сможете:

  • Интегрируйте Celery в приложение Fast API и создадите задачи.

  • Поместите в контейнер FastAPI, Celery и Redis с помощью Docker.

  • Запустите процессы в фоновом режиме с помощью отдельного рабочего процесса.

  • Сохраните логи Celery в файл.

  • Настройте Flower для мониторинга и администрирования задач Celery.

  • Протестируйте задачу Celery как с помощью модульных, так и с помощью интеграционных тестов.

Фоновые задачи

Для улучшения пользовательского взаимодействия, длительно выполняющиеся процессы должны выполняться вне обычного потока HTTP-запросов/ответов, т.е в фоновом режиме.

Примеры:

  1. Запуск моделей машинного обучения

  2. Отправка электронных писем с подтверждением

  3. Веб-скраперы

  4. Анализ данных

  5. Обработка изображений

  6. Создание отчетов

При создании приложения постарайтесь отличать задачи, которые должны выполняться в течение жизненного цикла запроса / ответа, например CRUD-операции, от тех, которые должны выполняться в фоновом режиме.

Стоит отметить, что вы можете использовать класс BackgroundTasks из FastAPI, который поставляется непосредственно из Starlette, для выполнения задач в фоновом режиме.

Например:

from fastapi import BackgroundTasks


def send_email(email, message):
    pass


@app.get("/")
async def ping(background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email, "email@address.com", "Hi!")
    return {"message": "pong!"}

Итак, когда вам следует использовать Celery вместо BackgroundTasks?

  1. Задачи с интенсивным использованием процессора: Celery следует использовать для задач, которые выполняют тяжелые фоновые вычисления. В то время как, BackgroundTasks выполняется в том же цикле событий, который обрабатывает запросы вашего приложения.

  2. Очередь задач: Если вам требуется очередь для управления задачами и воркерами, вам следует использовать Celery. Часто вам захочется получить статус задания, а затем выполнить какое-либо действие на основе статуса — например, отправить электронное письмо с ошибкой, запустить другую фоновую задачу или повторить попытку выполнения задачи. Celery справится со всем этим за вас.

Принцип работы

Наша цель — разработать быстрое API-приложение, которое работает совместно с Celery для обработки длительных процессов вне обычного цикла запроса / ответа.

  1. Конечный пользователь запускает новую задачу с помощью POST-запроса на стороне сервера.

  2. В обработчике POST-запроса задача добавляется в очередь, и идентификатор задачи отправляется обратно на клиентскую сторону.

  3. Используя AJAX, клиент продолжает опрашивать сервер, чтобы проверить статус задачи, в то время как сама задача выполняется в фоновом режиме.

c816e843c25177aef678060c63643a5f.png

Настройка проекта

Склонируйте базовый проект из репозитория fastapi-celery, а затем перейдите на тег v1 в master ветке:

git clone https://github.com/testdrivenio/fastapi-celery --branch v1 --single-branch
cd fastapi-celery
git checkout v1 -b master

Поскольку нам нужно будет управлять тремя процессами (FastAPI, Redis, Celery worker), мы будем использовать Docker для упрощения нашего рабочего процесса. Подключим их таким образом, чтобы все они могли запускаться из одного окна терминала с помощью одной команды.

Из корневого каталога проекта создайте образы и запустите контейнеры Docker:

docker-compose up -d --build

Как только сборка будет завершена, перейдите к http://localhost:8004:

5314e7cf88174c499382227ac3571f9b.png

Убедитесь, что тесты также прошли успешно:

$ docker-compose exec web python -m pytest

=============================== test session starts ================================
platform linux -- Python 3.11.2, pytest-7.2.2, pluggy-1.0.0
rootdir: /usr/src/app
plugins: anyio-3.6.2
collected 1 item

tests/test_tasks.py .                                                        [100%]

================================ 1 passed in 0.20s =================================

Прежде чем двигаться дальше, взгляните на структуру проекта:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── main.py
    ├── requirements.txt
    ├── static
    │   ├── main.css
    │   └── main.js
    ├── templates
    │   ├── _base.html
    │   ├── footer.html
    │   └── home.html
    └── tests
        ├── __init__.py
        ├── conftest.py
        └── test_tasks.py

Запуск задачи

Обработчик события onclick в project/templates/home.html настроен так, что прослушивает нажатие кнопки:

onclick вызывает handleClick, расположенный в project/static/main.js, который отправляет AJAX POST-запрос на сервер с соответствующим типом задачи: 1, 2 или 3.

function handleClick(type) {
  fetch('/tasks', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ type: type }),
  })
  .then(response => response.json())
  .then(res => getStatus(res.data.task_id));
}

На стороне сервера маршрут уже настроен для обработки запроса в project/main.py:

@app.post("/tasks", status_code=201)
def run_task(payload = Body(...)):
    task_type = payload["type"]
    return JSONResponse(task_type)

Теперь начинается самое интересное — настройка Celery!

Установка Celery

Начните с добавления Celery и Redis в requirements.txt:

aiofiles==23.1.0
celery==5.2.7
fastapi==0.95.0
Jinja2==3.1.2
pytest==7.2.2
redis==4.5.4
requests==2.28.2
uvicorn==0.21.1
httpx==0.23.3

В Celery используется брокер сообщений — RabbitMQ,  Redis или AWS Simple Queue Service (SQS) — для облегчения взаимодействия между воркером Celery и веб-приложением. Сообщения добавляются к брокеру, которые затем обрабатываются воркерами. После выполнения результаты отправляются на бэкенд (Redis backend).

Redis будет использоваться как в качестве брокера, так и в качестве бэкенда (Redis backend). Добавьте Redis и worker Celery в файл docker-compose.yml следующим образом:

version: '3.8'

services:

  web:
    build: ./project
    ports:
      - 8004:8000
    command: uvicorn main:app --host 0.0.0.0 --reload
    volumes:
      - ./project:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  worker:
    build: ./project
    command: celery -A worker.celery worker --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:7

Опишем команду celery -A worker.celery worker --loglevel=info:

  1. celery worker используется для запуска воркера Celery

  2. -A worker.celery запускает приложение Celery (которое мы вскоре определим)

  3. --loglevel=info устанавливает уровень журналирования на info

Создайте новый файл с именем worker.py в «project»:

import os
import time

from celery import Celery


celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")


@celery.task(name="create_task")
def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True

Здесь мы создали новый экземпляр Celery и, используя декоратор задачи, определили новую вызываемую функцию задачи Celery create_task.

Имейте в виду, что сама задача будет выполняться воркером Celery.

Запуск задачи

Обновите эндпоинты в main.py для запуска задачи и ответа с идентификатором задачи:

@app.post("/tasks", status_code=201)
def run_task(payload = Body(...)):
    task_type = payload["type"]
    task = create_task.delay(int(task_type))
    return JSONResponse({"task_id": task.id})

Не забудьте импортировать задачу:

from worker import create_task

Запустите новые контейнеры:

docker-compose up -d --build

Чтобы запустить новую задачу, выполните:

curl http://localhost:8004/tasks -H "Content-Type: application/json" --data '{"type": 0}'

Вы должны увидеть что-то вроде:

{
  "task_id": "14049663-6257-4a1f-81e5-563c714e90af"
}

Статус задачи

Вернемся к handleClick функции на стороне клиента:

function handleClick(type) {
  fetch('/tasks', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ type: type }),
  })
  .then(response => response.json())
  .then(res => getStatus(res.data.task_id));
}

Когда возвращается ответ от исходного AJAX-запроса, мы продолжаем вызывать getStatus() с идентификатором задачи каждую секунду:

function getStatus(taskID) {
  fetch(`/tasks/${taskID}`, {
    method: 'GET',
    headers: {
      'Content-Type': 'application/json'
    },
  })
  .then(response => response.json())
  .then(res => {
    const html = `
      
        ${taskID}
        ${res.data.task_status}
        ${res.data.task_result}
      `;
    document.getElementById('tasks').prepend(html);
    const newRow = document.getElementById('table').insertRow();
    newRow.innerHTML = html;
    const taskStatus = res.data.task_status;
    if (taskStatus === 'finished' || taskStatus === 'failed') return false;
    setTimeout(function() {
      getStatus(res.data.task_id);
    }, 1000);
  })
  .catch(err => console.log(err));
}

Если ответ успешен, в DOM таблицу добавляется новая строка.

Обновите эндпоинт get_status, чтобы вернуть статус:

@app.get("/tasks/{task_id}")
def get_status(task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JSONResponse(result)

Импортируйте AsyncResult:

from celery.result import AsyncResult

Обновите контейнеры:

docker-compose up -d --build

Запустите новую задачу:

curl http://localhost:8004/tasks -H "Content-Type: application/json" --data '{"type": 1}'

Затем возьмите task_id из ответа и вызовите обновленный эндпоинт, чтобы просмотреть состояние:

$ curl http://localhost:8004/tasks/f3ae36f1-58b8-4c2b-bf5b-739c80e9d7ff

{
  "task_id": "455234e0-f0ea-4a39-bbe9-e3947e248503",
  "task_result": true,
  "task_status": "SUCCESS"
}

Также проверьте это в браузере:

ec6c918197ce4a8081ae8e1e7e6146dc.png

Логирование Celery

Обновите сервис worker в docker-compose.yml, чтобы логи Celery сохранялись в файл:

worker:
  build: ./project
  command: celery -A worker.celery worker --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - CELERY_BROKER_URL=redis://redis:6379/0
    - CELERY_RESULT_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Добавьте новый каталог в «project» под названием «logs». Затем добавьте новый файл с именем celery.log в этот созданный каталог.

Обновите контейнеры:

docker-compose up -d --build

Вы должны увидеть, что файл логов заполняется локально, поскольку мы пробросили директорию проекта:

[2023-04-05 16:10:33,257: INFO/MainProcess] Connected to redis://redis:6379/0
[2023-04-05 16:10:33,262: INFO/MainProcess] mingle: searching for neighbors
[2023-04-05 16:10:34,271: INFO/MainProcess] mingle: all alone
[2023-04-05 16:10:34,283: INFO/MainProcess] celery@6ea5007507db ready.
[2023-04-05 16:11:49,400: INFO/MainProcess]
  Task create_task[7f0022ec-bcc8-4eff-b825-bde60d15f824] received
[2023-04-05 16:11:59,418: INFO/ForkPoolWorker-7]
  Task create_task[7f0022ec-bcc8-4eff-b825-bde60d15f824]
  succeeded in 10.015363933052868s: True

Панель управления Flower

Flower — это легкий веб-инструмент мониторинга Celery в режиме реального времени. Вы можете отслеживать текущие задачи, увеличивать или уменьшать пул воркеров, просматривать графики и ряд статистических данных.

Обновите файл requirements.txt:

aiofiles==23.1.0
celery==5.2.7
fastapi==0.95.0
flower==1.2.0
Jinja2==3.1.2
pytest==7.2.2
redis==4.5.4
requests==2.28.2
uvicorn==0.21.1
httpx==0.23.3

Затем добавьте новый сервис в docker-compose.yml:

dashboard:
  build: ./project
  command: celery --broker=redis://redis:6379/0 flower --port=5555
  ports:
    - 5556:5555
  environment:
    - CELERY_BROKER_URL=redis://redis:6379/0
    - CELERY_RESULT_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - worker

Перезапустите контейнеры:

docker-compose up -d --build

Перейдите в http://localhost:5556 для просмотра информационной панели. Вы должны увидеть одного воркера, готового к работе:

679f85ad38010be4398173e77058142e.png

Запустите еще несколько задач, чтобы полностью протестировать панель мониторинга:

078d29b85c9aa810b55ecfe560ca3e30.png

Попробуйте добавить еще несколько воркеров, чтобы посмотреть, как это повлияет на ситуацию:

docker-compose up -d --build --scale worker=3

Тесты

Давайте начнем с самого простого теста:

def test_task():
    assert create_task.run(1)
    assert create_task.run(2)
    assert create_task.run(3)

Добавьте приведенный выше пример в project/tests/test_tasks.py , а затем добавьте следующий импорт:

from worker import create_task

Запустите этот тест отдельно:

docker-compose exec web python -m pytest -k "test_task and not test_home"

Выполнение должно занять около одной минуты:

=============================== test session starts ================================
platform linux -- Python 3.11.2, pytest-7.2.2, pluggy-1.0.0
rootdir: /usr/src/app, configfile: pytest.ini
plugins: anyio-3.6.2
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py .                                                        [100%]
==================== 1 passed, 1 deselected in 60.07s (0:01:00) ====================

Стоит отметить, что в приведенных выше утверждениях мы использовали .run метод (а не .delay). Это нужно, чтобы запустить задачи напрямую, без участия воркера Celery.

Хотите замокать .run метод, чтобы ускорить процесс?

@patch("worker.create_task.run")
def test_mock_task(mock_run):
    assert create_task.run(1)
    create_task.run.assert_called_once_with(1)

    assert create_task.run(2)
    assert create_task.run.call_count == 2

    assert create_task.run(3)
    assert create_task.run.call_count == 3

Добавьте импорт:

from unittest.mock import patch

Протестируем:

$ docker-compose exec web python -m pytest -k "test_mock_task"

=============================== test session starts ================================
platform linux -- Python 3.11.2, pytest-7.2.2, pluggy-1.0.0
rootdir: /usr/src/app, configfile: pytest.ini
plugins: anyio-3.6.2
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py .                                                        [100%]
========================= 1 passed, 1 deselected in 0.10s ==========================

Намного быстрее!

Как насчет полного интеграционного теста?

def test_task_status(test_app):
    response = test_app.post(
        "/tasks",
        data=json.dumps({"type": 1})
    )
    content = response.json()
    task_id = content["task_id"]
    assert task_id

    response = test_app.get(f"tasks/{task_id}")
    content = response.json()
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = test_app.get(f"tasks/{task_id}")
        content = response.json()
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Имейте в виду, что в этом тесте используется тот же брокер и веб-приложение, что и при версии для разработки. Возможно, вы захотите создать новый экземпляр Celery для тестирования.

Добавте импорт:

import json

Убедитесь, что тест пройден успешно.

Заключение

Это было базовое руководство по настройке Celery для выполнения длительных задач в приложении FastAPI. Это позволит очереди обрабатывать любые процессы, которые могут блокировать или замедлять работу пользовательского кода.

Celery можно использовать для выполнения повторяющихся задач. Также можно разделить сложные, ресурсоемкие задачи, чтобы вычислительную нагрузку распределить на несколько машин. Это позволит сократить (1) время и (2) нагрузку на машину, обрабатывающую клиентские запросы.

Код из репозитория.

© Habrahabr.ru