FastStream — новый убийца Celery?

FastStream — это относительно новая блестящая игрушка в руках Python’истов, которая создана специально для работы с брокерами сообщений.

В Python сложилось устойчивое убеждение, что если мы работаем с MQ — то нам нужен Celery, но он слегка устарел. Именно поэтому люди пытаются выкинуть «деда» и затащить вместо него любой новый многообещающий MQ-инструмент. Кроме того, культ Celery настолько силен в умах, что практически все новые библиотеки для работы с MQ пытаются стать его «убийцей» и заменой.

Однако, это не совсем верно. Существует огромный пласт проектов, которым нужен не фреймворк для менеджмента задач, а просто «голый» функционал Kafka/RabbitMQ/NATS/whatever для межсервисного взаимодействия. И все эти проекты вынуждены довольствоваться «сырыми» python-клиентами к своим брокерам, а всю обвязку вокруг этих клиентов писать самостоятельно. FastStream целится как раз в эту нишу.

В рамках статьи я хочу убедить вас, что не Celery мы едины, и для альтернативных инструментов найдется место под солнцем. А также рассмотрим фичи FastStream, которые он привносит в застоявшийся мир MQ-инструментов.

Кто я?

Привет! Я — Никита — создатель Propan и ныне core-developer FastStream. Возможно, вы уже читали мои предыдущие статьи о развитии проекта или даже видели выступление на Podlodka Python Crew #3.

Прошлый раз мы виделись почти год назад, когда FastStream только-только вышел. Поэтому сейчас я хотел бы суммаризировать и опубликовать единым списком преимущества, которые должны убедить вас использовать попробовать FastStream. Проект не стоит на месте и мне есть о чем вам рассказать.

Что FastStream за фреймворк?

FastStream — это python-фреймворк для работы с брокерами сообщений. Он был создан для максимального упрощения разработки event-driven систем.

Если просто — это очень толстый клиент для брокеров, который позволяет вам писать меньше инфраструктурного кода и сконцентрироваться на бизнес-логике ваших приложений. Поэтому гораздо уместнее будет сравнение с aiokafka/aio-pika или Kombu (чьим логическим продолжением и является FastStream). Однако, мы предоставляем дополнительный функционал, напрямую не связанный с кодом, но крайне необходимый для современных систем: автодокументирование проекта, удобство тестирования и Observability из коробки.

Давайте же, наконец, разбираться, зачем FastStream нужен именно вам?

Базовый API

В первую очередь — это очень простой API. На данный момент FastStream поддерживает Kafka, RabbitMQ, NATS и Redis в качестве брокера сообщений (список постоянно растет, на текущий момент у нас 8 Issues от комьюнити на поддержку новых брокеров).

Фреймворк предоставляет как унифицированный способ взаимодействия с брокерами, так и специфичные для каждого из них методы. Типичный эндпоинт для сообщений с использованием FastStream выглядит следующим образом:

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("test-queue")  # название очереди RMQ
async def handle(msg: str):
    print(msg)

Запускается это все тоже донельзя просто:

faststream run main:app

Данный синтаксис работает для всех поддерживаемых брокеров и отличается разве что импортами.

Основное преимущество FastStream перед «сырыми» клиентами в данном аспекте — это декларативный синтаксис. Мы переходим от написания императивщины («подключись к X», «создай очередь», «создай потребителя», «привяжи к потребителю функцию») к декларативному «мне нужен подписчик для X». Также все вопросы сериализации/десериализации сообщений FastStream берет на себя — он приводит тело сообщения к запрашиваемому в аннотации типу с помощью pydantic.

Как видите, процесс создания потребителей максимально упрощен относительно привычных MQ библиотек и напоминает другой очень популярный Python фреймворк.

Дальше предлагаю разобраться с тем, какие преимущества мы получаем помимо «сахара над сахаром».

Вдохновение для FastStream

Как я уже сказал, FastStream не стремится быть копией/заменой Celery, а кроме него у нас были только «сырые» клиенты. Получается, среди MQ инструментов вдохновения мы не найдем.

Но мы нашли его среди популярных HTTP-инструментов! А именно — FastAPI. Этот фреймворк в свое время показал, каким требованиям должен отвечать современный инструмент. Давайте же их перечислим:

  • Интуитивно понятный API

  • Сериализация данных на основе аннотации типов

  • Автоматическая документация

  • Удобное тестирование

Ничего из этого у нас не было в MQ мире до этого. Теперь есть!

И если с первыми двумя пунктами мы уже разобрались, то давайте посмотрим на остальные фичи.

Автоматическое документирование

Документация в текущих реалиях крайне важна для проекта. Как вы передадите в смежную команду контракты, которые ожидает ваш сервис? Как будете взаимодействовать с тестировщиками? На что будет смотреть новый член команды при онбординге?

Для HTTP сервисов решение существует давно — OpenAPI + SwaggerUI. А что делать с асинхронными сервисами? Конечно, вы можете написать документацию самостоятельно в том или ином виде, но как тогда поддерживать ее актульность коду?

FastStream решает эту проблему абсолютно прозрачным и привычным способом — он генерирует AsyncAPI спецификацию из кода вашего сервиса. Вы можете разослать полученный файл, разместить его в своей Wiki или просто захостить его HTML представление прямо рядом с сервисом.

Так, для следующего сервиса из одного подписчика и продюсера:

from pydantic import BaseModel
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

class InputData(BaseModel):
    data: bytes

class Prediction(BaseModel):
    result: float

@broker.subscriber("test-subject")
@broker.publisher("out-subject")
async def handle_prediction(
    msg: InputData
) -> Prediction:
    predict = model.predict(msg)
    return predict

Мы получим следующую страничку, где видны:

  • адрес брокера, с которым работает система

  • input очередь и ожидаемый формат данных

  • output очередь и формат предоставляемых данных

AsyncAPI service schema

AsyncAPI service schema

Тестирование сервиса

Вопрос с тестированием асинхронных сервисов стоит особенно остро. Безусловно, мы можем тестировать бизнес-логику (если достаточно качественно декомпозировали код), но что делать с контрактами? Поднимать брокер каждый раз в CI и тестировать на реальной внешней зависимости?

С HTTP клиентами все просто — мы мокаем транспорт и отправляем вместо реального запроса его эмуляцию «в памяти». Почему бы не сделать то же самое для брокеров сообщений? Тем более, что Kombu уже имел такой функционал. Жаль, что про старика все забыли.

Так, тесты на сервис из первого примера будут выглядеть следующим образом:

import pytest
from pydantic import ValidationError
from faststream.rabbit import TestRabbitBroker

@pytest.mark.asyncio()
async def test_correct():
    async with TestRabbitBroker(broker) as br:
        await br.publish("Hi!", "test-queue")

@pytest.mark.asyncio()
async def test_invalid():
    async with TestRabbitBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish(1, "test-queue")

При этом все взаимодействи будет происходить «в памяти», что обеспечивает простоту и воспроизводимость тестирования в CI.

Но если вы хотите переиспользовать те же тесты для реального брокера, то нет ничего проще:

async with TestRabbitBroker(broker, with_real=True) as br:

Observability

Мониторинг системы — это чрезвычайно важный аспект эксплуатации ваших сервисов. Для того, чтобы спать спокойно, пока код работает, вам нужны

  • Логи

  • Метрики

  • Трейсы

  • Healthcheck’и

На тевущий момент FastStream имеет встроенный функционал для интеграции с любой системой логирования, а также готовое решение для OpenTelemetry, с помощью которого вы можете построить сквозные трейсы по всей вашей системе.

О логировании вы можете прочитать в нашей документации, как и о трейсинге. Я также настоятельно рекомендую заглянуть вот в этот репозиторий (спасибо нашему контрибутору Роману), чтобы посмотреть на готовый пример сервисов и инфраструктуры для того, чтобы получить вот такую красивую картинку ваших трейсов:

38e4d9fb6614c337f7f4a4f5c0ff13ff.png

Работа над поддержкой Prometheus метрик и удобного API для k8s проб (healthcheck’ов) уже ведется и сейчас это приоритетная задача, так что эпопея с Observability скоро должна завершиться.

Другие фичи

Не хочу пересказывать всю документацию, но тезисно обозначу фичи, что у нас есть:

  • Декомпозиция приложения через Router’ы

  • Гибкая система Middlewares

  • Своя система внедрения зависимостей на базе Depends и Context

  • CLI с хотрелоадом (а как иначе разрабатывать-то?)

  • Возможность отключить pydantic и использовать собственный формат сообщений (msgpack, protobuf, avro, etc)

  • Тесная интеграция с FastAPI

  • Интеграция с taskiq для плановой публикации сообщений

В общем, у нас есть все для вашей комфортной разработки.

Заключение

В заключение хотелось бы сказать, что большинство задач, решаемые сейчас с помощью Celery могут решаться (и даже эффективнее) с помощью FastStream. Но это не потому что мы — «убийца Сelery», а просто потому что вы воткнули сельдерей туда, где он вам не нужен.

Если вам нужно взаимодействие между сервисами на разных языках и технологиях поверх брокеров — это FastStream.

Если вам нужны асинхронные «задачи» на той же кодовой базе — со всеми их «статусами», мониторингом через flower и тд — вам к Celery и его аналогам.

Вот такой простой вывод, прошу прощение за кликбейтный заголовок :)

Послесловие

Не смотря на свой относительно молодой возраст (релиз был в сентябре 2023), проект уже приобрел некоторую популярность. Все, кто присматривался — уже присмотрелись и стали активно внедрять. Я не вправе заявлять от лица компаний, что они используют наш инструмент, но интерес сообщества подкрепляется следующими фактами:

  • 2000 звезд на GitHub с момента релиза (меньше года)

  • Больше 100к установок в месяц

  • Включение в список Top-100 OpenSource Achievements 2023 года

  • Поддержка развития фреймворка со стороны команд RabbitMQ, NATS и AsyncAPI

  • Непрекращающийся поток запросов на новые фичи со стороны комьюнити

FastStream показал, каким должен быть современный MQ инструмент — и, надеемся, остальные библиотеки потянутся за нами. Например, создатель pydantic уже анонсировал Roadmap на arq, в котором хочет завезти фичи, которые показали мы.

Вы также можете поддержать наш проект:

  • [ ] поставив звезду на GitHub

  • [ ] рассказав о нем своим коллегам и знакомым

  • [ ] вступив в наше RU-telergram соощество, где можно принять непосредственное участие в обсуждении и проектировании нового функционала фреймворка

Также, если вам интересно будущее проекта, вы можете ознакомиться с нашим Roadmap по этой ссылке:

https://github.com/airtai/faststream/issues/1510

© Habrahabr.ru