Пишем универсальный прототип бэкенд-приложения: Litestar, FastStream, dishka

1fa436df1cb38075864f538b66b7792a.jpg

Привет, Хабр! Меня зовут Сергей, я техлид в команде PT BlackBox. Мы с коллегами разрабатываем продукт, который позволяет обнаруживать уязвимости в приложениях методом черного ящика. Фактически мы сами и пишем веб-приложения, и именно о них пойдет речь в статье.

Я бы хотел с вами поделиться своими наработками по теме бэкенд-приложений и предоставить вам шаблон-прототип, который, как мне кажется, может закрыть подавляющее большинство потребностей при их разработке.

Для написания прототипа я буду использовать Litestar, FastStream и dishka. Эта статья будет особенно полезна тем, кто пишет на Tornado, Django, Flask или AIOHTTP и хочет перейти на более актуальные технологии для дальнейшего развития своих проектов на современных рельсах.

Для быстрого перехода к разделам

Использование Litestar, FastStream и dishka позволяет успешно следовать принципам чистой архитектуры (сlean architecture) и почти не допускать «протекания» абстракций. В самом прототипе также будет использоваться Pydantic и SQLAlchemy, но я не буду уделять им особого внимания ввиду того, что эти технологии довольно популярны и в сообществе уже есть огромное количество гайдов по их применению.

Litestar

Litestar — это легковесный ASGI-фреймворк с подробной и проработанной документацией, на данный момент один из самых производительных ASGI-фреймворков.

fc3cd4ced855fa5042b40ba6c1e20d93.png

Litestar — перспективный бэкенд-фреймворк с большим количеством «батареек» на разные случаи жизни. Фреймворк активно разрабатывается и поддерживается сообществом, но при всем этом обделен вниманием на Хабре, не считая анонса доклада на PHDays Fest 2.

9d751181db56c7acc52b8c97bc62e911.png

FastStream

FastStream — это event source фреймворк который упрощает и унифицирует работу с брокерами, а также избавляет от необходимости каждый раз изобретать велосипед.
В нем есть валидация, автогенерация AsyncAPI, CLI-утилита поверх ChatGPT, которая генерирует готовые FastStream-сервисы по вашему описанию. Фреймворк поддерживает такие брокеры, как Kafka, RabbitMQ, NATS, Redis Stream.

Чистая архитектура (SOLID)

Как уже упоминалось выше, прототип построен на принципах чистой архитектуры. При его написании я руководствовался принципами SOLID и делением приложения на слои.

Для тех, кто не знаком с концепцией чистой архитектуры или хочет освежить ее в памяти, я подготовил это раздел. Хочу заметить, что это не ультимативный гайд по чистой архитектуре и некоторые вещи могут быть намеренно упрощены. Вот иллюстрация чистой архитектуры, созданная Робертом Мартином:

источник

Каждая окружность соответствует различным составляющим приложения. Чем ближе слой к центру, тем менее он подвержен изменениям, поэтому нижестоящие слои не должны зависеть от вышестоящих.

Frameworks and Drivers

В этом слое размещены внешние сущности, например конкретная база данных или интерфейс вашего приложения. В моем прототипе я не завязывал бизнес-логику приложения на этот слой.

Interface Adapters

Здесь размещены конкретные реализации, имплементирующие общение с внешними сервисами, например с базой данных. Можно представить это так:

class BooksGateway:
    def get_books(self) -> list[Books]:
        query = 'SELECT * FROM books'
        rows = self.psql.execute(query)
        return [Books(**r) for r in rows]

Application business rules

В этом слое размещен код для реализации пользовательских сценариев. Здесь находятся сущности, которые описывают конкретный use case, агрегировав в себе необходимые зависимости. Схематично это можно изобразить так:

def interactor(self) -> bool:
    """ User buy books """
    books = self.get_books_from_card()
    full_price = sum([b.price for b in books])
    self.write_off_money(full_price)
    self.send_books_to_user(books)

Enterprise business rules

Доменный слой приложения, в котором располагается основа приложения.

SOLID

Чистая архитектура предполагает не наличие слоев как таковых, а следование принципам SOLID. Слои — это всего лишь следствие, вытекающее из SOLID и парочки других принципов.

Это принципы построения любых приложений, они универсальны и не зависят от языка. Можно назвать их принципами вытекающими из ООП, хотя это не совсем корректно, потому что все эти принципы подходят для проектирования как микросервисной архитектуры, так и конкретного приложения. Исключительно к ООП можно отнести только один из пяти принципов.

S — Single responsibility principle

Принцип единой ответственности: модуль должен иметь одну и только одну причину для изменения.

O — Open-closed principle

Принцип открытости — закрытости: он означает, что должна быть возможность добавлять новые компоненты без изменения уже существующих.

L — Liskov substitution principle

Принцип подстановки Барбары Лисков заключается в том, что дочерний класс должен соответствовать поведению родителя. Это, собственно, тот принцип, который актуален только для ООП.

I — Interface segregation principle

Принцип разделения интерфейсов: компоненты не должны зависеть от методов, которые они не используют.

D — Dependency inversion principle

Принцип инверсии зависимостей: компоненты должны зависеть от абстрактных интерфейсов, а не от конкретных реализаций.

Dependency injection (dishka)

Dependency injection — это принцип, который позволяет уменьшить связанность (coupling) и увеличить сплоченность (cohesion) сущностей в приложении. Это происходит за счет следующего: если сущность А требует В для своей работы, она должна получить ее извне, а не создавать самостоятельно.

Для лучшего понимания можно провести следующую аналогию: связанность — это суперклей, которым мы соединили части нашего приложения, а сплоченность скорее похожа на конструктор. Подробнее об этом принципе можно узнать из статьи.

Пример кода с высокой связанностью:

import os

class ApiClient:

    def __init__(self) -> None:
        self.api_key = os.getenv("API_KEY")  # <-- dependency
        self.timeout = int(os.getenv("TIMEOUT"))  # <-- dependency

class Service:

    def __init__(self) -> None:
        self.api_client = ApiClient()  # <-- dependency

def main() -> None:
    service = Service()  # <-- dependency
    ...

if __name__ == "__main__":
    main()

Тот же самый код, но с использованием принципа внедрения зависимостей:

import os

class ApiClient:

    def __init__(self, api_key: str, timeout: int) -> None:
        self.api_key = api_key  # <-- dependency is injected
        self.timeout = timeout  # <-- dependency is injected

class Service:

    def __init__(self, api_client: ApiClient) -> None:
        self.api_client = api_client  # <-- dependency is injected

def main(service: Service) -> None:  # <-- dependency is injected
    ...

if __name__ == "__main__":
    main(
        service=Service(
            api_client=ApiClient(
                api_key=os.getenv("API_KEY"),
                timeout=int(os.getenv("TIMEOUT")),
            ),
        ),
    )

По ходу написания прототипа я придерживался принципа внедрения зависимостей.

Dishka — IoC-контейнер, фреймворк, который позволяет управлять зависимостями в проекте. Несмотря на то что и у Litestar, и у FastStream есть свои решения для этого, использование внешней технологии позволит запустить их в рамках одной кодовой базы.

Подробнее с принципом можно ознакомиться тут. В том же материале можно прочитать о том, что такое IoC-контейнер.

Реализация прототипа

В рамках этой статьи я написал прототип приложения, которое позволяет хранить информацию о книгах. Оно «слушает» очередь, в которую приходят события (events) и записывает информацию из этих событий в PostgreSQL.

Ниже я не буду подробно останавливаться на таких вещах, как написание docker compose или настройка миграций — на GitHub размещена готовая реализация, в которой все это сделано.

.
├── docker-compose.yaml — здесь поднимем RabbitMQ и Postgress
├── .env — здесь опишем переменное окружение
└── book-club
    ├── domain — доменные сущности
    ├── application — здесь у нас будет лежать бизнес-логика приложения
    ├── infrastructure — здесь у нас будут хранится адаптеры и миграции
    ├── handlers — HTTP- и AMQP-контроллеры и DTO для них
    ├── ioc.py — контейнер с зависимостями 
    ├── config.py — конфигурация проекта
    ├── main.py — точка входа в приложение

Первое, с чего хотелось бы начать, — описание домена. Домен является основой любого приложения — того, ради чего оно пишется. Например: приложение по доставке пиццы пишется ради пиццы, приложение автосервиса — ради машины.

Для лучшего понимания я специально сильно упрощаю. На самом деле домен приложения куда шире, он может включать в себя клиентов, которые покупают пиццу, сотрудников, которые обслуживают машину, побочные домены для продажи напитков и салонных елочек. Для книжного клуба я ограничился одной доменной сущностью — BookDM.

# book_club/domain/entities.py
from dataclasses import dataclass   

@dataclass(slots=True)  
class BookDM:  # DM - Domain model
    uuid: str  
    title: str  
    pages: int  
    is_read: bool

Следующим шагом я описал бизнес-логику приложения. Это то, чем приложение занимается. Приложение может сохранить информацию о книге и выдать ее по запросу. Рассмотрим DTO, который необходим для слоя бизнес-логики.

# book_club/application/dto.py
from dataclasses import dataclass  
    
@dataclass(slots=True)  
class NewBookDTO:  
    title: str  
    pages: int  
    is_read: bool

DTO — это паттерн, который реализует обмен информацией между слоями. Внутренний слой не должен зависеть от внешнего, поэтому он не может работать с объектами из внешних слоев приложения, такими как pydantic model или объект request. NewBookDTO необходим, чтобы получать информацию о книгах из внешних слоев приложения.

Далее, исходя из бизнес-логики, я реализовал use-case interactor и описал интерфейсы, от которых они зависят.

# book_club/application/interfaces.py
from abc import abstractmethod  
from typing import Protocol  
from uuid import UUID  
  
from book_club.domain.entities import BookDM  
  
  
class BookSaver(Protocol):  
    @abstractmethod  
    async def save(self, book: BookDM) -> None:  
        ...  
  
  
class BookReader(Protocol):  
    @abstractmethod  
    async def read_by_uuid(self, uuid: str) -> BookDM | None:  
        ...  
  
  
class UUIDGenerator(Protocol):  
    def __call__(self) -> UUID:  
        ...  
  
  
class DBSession(Protocol):  
    @abstractmethod  
    async def commit(self) -> None:  
        ...  
  
    @abstractmethod  
    async def flush(self) -> None:  
        ...

BookSaver — сохранение информации о книгах.

BookReader — чтение информации о книгах.

UUIDGenerator — генерация UUID.

DBSession — интерфейс для работы с транзакциями. 

# book_club/application/interactors.py
from book_club.application import interfaces  
from book_club.application.dto import NewBookDTO  
from book_club.domain import entities  
  
  
class GetBookInteractor:  
    def __init__(  
            self,  
            book_gateway: interfaces.BookReader,  
    ) -> None:  
        self._book_gateway = book_gateway  
  
    async def __call__(self, uuid: str) -> entities.BookDM | None:  
        return await self._book_gateway.read_by_uuid(uuid)  
  
  
class NewBookInteractor:  
    def __init__(  
            self,  
            db_session: interfaces.DBSession,  
            book_gateway: interfaces.BookSaver,  
            uuid_generator: interfaces.UUIDGenerator,  
    ) -> None:  
        self._db_session = db_session  
        self._book_gateway = book_gateway  
        self._uuid_generator = uuid_generator  
  
    async def __call__(self, dto: NewBookDTO) -> str:  
        uuid = str(self._uuid_generator())  
        book = entities.BookDM(  
            uuid=uuid,  
            title=dto.title,  
            pages=dto.pages,  
            is_read=dto.is_read  
        )  
  
        await self._book_gateway.save(book)  
        await self._db_session.commit()  
        return uuid

Use-case interactor — это реализация конкретного пользовательского действия. У пользователя только два доступных действия, которые и описаны в теле интеракторов.

На этом этапе основа приложения уже готова, теперь рассмотрим детали реализации.

Config приложения:

# book_club/config.py
from os import environ as env

from pydantic import Field, BaseModel


class RabbitMQConfig(BaseModel):
    host: str = Field(alias='RABBITMQ_HOST')
    port: int = Field(alias='RABBITMQ_PORT')
    login: str = Field(alias='RABBITMQ_USER')
    password: str = Field(alias='RABBITMQ_PASS')


class PostgresConfig(BaseModel):
    host: str = Field(alias='POSTGRES_HOST')
    port: int = Field(alias='POSTGRES_PORT')
    login: str = Field(alias='POSTGRES_USER')
    password: str = Field(alias='POSTGRES_PASSWORD')
    database: str = Field(alias='POSTGRES_DB')


class Config(BaseModel):
    rabbitmq: RabbitMQConfig = Field(default_factory=lambda: RabbitMQConfig(**env))
    postgres: PostgresConfig = Field(default_factory=lambda: PostgresConfig(**env))

Обращаю внимание, что я разделил config на отдельные составляющие, а потом объединил их путем композиции в классе Config. Такой подход позволяет более эффективно проводить инъекцию зависимостей. Если класс зависит от конкретных конфигурационных данных, то нужно предоставить ему только эти данные вместо конфигурации всего приложения.

# book_club/infrastructure/database.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker  
from sqlalchemy.ext.asyncio import create_async_engine  
  
from book_club.config import PostgresConfig  
  
  
def new_session_maker(psql_config: PostgresConfig) -> async_sessionmaker[AsyncSession]:  
    database_uri = 'postgresql+psycopg://{login}:{password}@{host}:{port}/{database}'.format(  
        login=psql_config.login,  
        password=psql_config.password,  
        host=psql_config.host,  
        port=psql_config.port,  
        database=psql_config.database,  
    )  
  
    engine = create_async_engine(  
        database_uri,  
        pool_size=15,  
        max_overflow=15,  
        connect_args={  
            "connect_timeout": 5,  
        },  
    )  
    return async_sessionmaker(engine, class_=AsyncSession, autoflush=False, expire_on_commit=False)

Обратите внимание, new_session_maker зависит только от PostgresConfig, он ничего не знает о RabbitMQConfig.

# book_club/infrastructure/broker.py
from faststream.rabbit import RabbitBroker  
from faststream.security import SASLPlaintext  
  
from book_club.config import RabbitMQConfig  
  
  
def new_broker(rabbitmq_config: RabbitMQConfig) -> RabbitBroker:  
    return RabbitBroker(  
        host=rabbitmq_config.host,  
        port=rabbitmq_config.port,  
        security=SASLPlaintext(  
            username=rabbitmq_config.login,  
            password=rabbitmq_config.password,  
        ),  
        virtualhost="/",  
    )

Аналогичная ситуация и для new_broker.

# book_club/infrastructure/gateways.py
from sqlalchemy.ext.asyncio import AsyncSession  
from sqlalchemy.sql import text  
  
from book_club.application.interfaces import BookReader, BookSaver  
from book_club.domain.entities import BookDM  
  
  
class BookGateway(  
    BookReader,  
    BookSaver,  
):  
    def __init__(self, session: AsyncSession):  
        self._session = session  
  
    async def read_by_uuid(self, uuid: str) -> BookDM | None:  
        query = text("SELECT * FROM books WHERE uuid = :uuid")  
        result = await self._session.execute(  
            statement=query,  
            params={"uuid": uuid},  
        )  
        row = result.fetchone()  
        if not row:  
            return None  
        return BookDM(  
            uuid=row.uuid,  
            title=row.title,  
            pages=row.pages,  
            is_read=row.is_read,  
        )  
  
    async def save(self, book: BookDM) -> None:  
        query = text("INSERT INTO books (uuid, title, pages, is_read) VALUES (:uuid, :title, :pages, :is_read)")  
        await self._session.execute(  
            statement=query,  
            params={  
                "uuid": book.uuid,  
                "title": book.title,  
                "pages": book.pages,  
                "is_read": book.is_read,  
            },  
        )

Gateway инкапсулирует в себе работу с данными и позволяет работать с ними как с коллекцией, не вдаваясь в детали реализации.

Отмечу, что BookGateway удовлетворяет упомянутым ранее интерфейсам BookSaver и BookReader.

После реализации домена и адаптеров, я поместил зависимости в контейнер для их последующей инъекции.

# book_club/ioc.py
from typing import AsyncIterable  
from uuid import uuid4  
  
from dishka import Provider, Scope, provide, AnyOf, from_context  
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker  
  
from book_club.application import interfaces  
from book_club.application.interactors import (  
    GetBookInteractor,  
    NewBookInteractor  
)  
from book_club.config import Config  
from book_club.infrastructure.database import new_session_maker  
from dbook_club.infrastructure.gateways import BookGateway  
  
  
class AppProvider(Provider):
    ...

Таким образом, я описал провайдер для interfaces.UUIDGenerator, у него есть параметр scope. Этот параметр определяет срок жизни зависимости (его подробное описание можно найти в документации). Зависимость, у которой область видимости APP, не нуждается в пересоздании. Здесь я использую функцию uuid4 как объект первого типа, он удовлетворяет ранее описанному interfaces.UUIDGenerator. В случае с uuid4 достаточно создать объект один раз за все время жизни приложения, поэтому у него scope=Scope.APP.

...
@provide(scope=Scope.APP)  
def get_uuid_generator(self) -> interfaces.UUIDGenerator:  
    return uuid4
...

Далее я описал provider для config, у него тоже выставлен параметр scope=Scope.APP. Как и в случае с uuid4, нам достаточно создать объект один раз за все время жизни приложения. Однако from_context означает, что объект Config должен быть создан где-то вовне и передан при создании AppProvider.

...
config = from_context(provides=Config, scope=Scope.APP)
...

Следующий шаг — это создание sessionmaker из SqlAlchemy.

...
@provide(scope=Scope.APP)  
def get_session_maker(self, config: Config) -> async_sessionmaker[AsyncSession]: 
    return new_session_maker(config.postgres)
...

Тут я тоже выставил scope=Scope.APP, поскольку класс async_sessionmaker нам больше пересоздавать не требуется. Этот провайдер зависит от Config, что я и указал в сигнатуре функции; async_sessionmaker нужен для создания AsyncSession.

...
@provide(scope=Scope.REQUEST)  
async def get_session(self, session_maker: async_sessionmaker[AsyncSession]) -> AsyncIterable[AnyOf[  
    AsyncSession,  
    interfaces.DBSession,  
]]:  
    async with session_maker() as session:  
        yield session
...

Здесь уже scope=Scope.REQUEST. Этот объект будет создаваться каждый раз при вызове, по сути, после на каждый запрос пользователя будет создаваться новая сессия алхимии. Сигнатура этого провайдера говорит о том, что он зависит от async_sessionmaker[AsyncSession] и возвращаемый объект должен удовлетворять сигнатуре сразу двух интерфейсов: sqlalchemy.AsyncSession и interfaces.DBSession.
Использование генераторов позволяет хранить состояние сессии, а после ответа на запрос пользователя — вернуться в этот провайдер и выйти из контекстного менеджера, тем самым вернуть ресуры в connection pool.

...
book_gateway = provide(  
    BookGateway,  
    scope=Scope.REQUEST,  
    provides=AnyOf[interfaces.BookReader, interfaces.BookSaver]  
)
...

Этот провайдер отвечает за создание BookGateway, который зависит от sqlalchemy.AsyncSession и должен удовлетворять интерфейсам interfaces.BookReader и interfaces.BookSaver. От этих интерфейсов зависят interactors, создание которых выглядит так:

...
get_book_interactor = provide(GetBookInteractor, scope=Scope.REQUEST)  
create_new_book_interactor = provide(NewBookInteractor, scope=Scope.REQUEST)
...

Под капотом dishka вызывает конструкторы для BookGateway, GetBookInteractor и NewBookInteractor, сигнатура которых зависит от interfaces.BookReader, interfaces.BookSaver, interfaces.DBSession, interfaces.UUIDGenerator. В случае если у зависимостей выставлен параметр Scope.REQUEST, они будут созданы, если же выставлен Scope.APP, то dishka попытается найти их в своем хранилище. Если в нем нет зависимостей — dishka создаст их и разместит там.

При получении какого-либо события (будь то запрос по HTTP или сообщение из очереди) вызывается обработчик, который запускает цепочку создания зависимостей, а после окончания события происходит их финализация. В такой схеме на каждое событие будет только одно подключение к базе данных для всех зависимостей в этой цепочке, что позволит достигнуть атомарности транзакций и эффективного использования ресурсов пула подключений.

Теперь обратим внимание на реализацию слоя контроллеров.

BookSchema — это DTO, который мы используем для общения между слоем контроллеров и внешним слоем. Несмотря на то что в прототипе два контроллера, нам нужна всего одна модель, которая одинаково хорошо поддерживается как FastStream, так и Litestar.
На основе только этой модели строится валидация и автогенерация документации.

# controllers/schemas.py
from pydantic import BaseModel  
  
  
class BookSchema(BaseModel):  
    title: str  
    pages: int  
    is_read: bool
# book_club/controllers/amqp.py
from dishka.integrations.base import FromDishka as Depends  
from faststream.rabbit import RabbitRouter  
  
from book_club.application.dto import NewBookDTO  
from book_club.application.interactors import NewBookInteractor  
from book_club.controllers.schemas import BookSchema  
  
AMQPBookController = RabbitRouter()  


@AMQPBookController.subscriber("create_book")  
@AMQPBookController.publisher("book_statuses")  
async def handle(data: BookSchema, interactor: Depends[NewBookInteractor]) -> str:  
    dto = NewBookDTO(  
        title=data.title,  
        pages=data.pages,  
        is_read=data.is_read  
    )  
    uuid = await interactor(dto)  
    return uuid

Выше приведена реализация контроллера для сохранения информации о книге. Как видно из кода, контроллер получает на вход BookSchema, контроллер «слушает» очередь create_book и, в случае наличия события в очереди, пытается превратить данные оттуда в BookSchema. Если не получается, то вызывается исключение pydantic.ValidationError.

В случае успеха происходит вызов handle, который зависит от NewBookInteractor, и создание цепочки зависимостей, о которой я говорил выше. После этого отрабатывает интерактор и в очередь book_statuses отправляется UUID созданной записи о книге.

# book_club/controllers/http.py
from typing import Annotated  
from uuid import UUID  
  
from dishka.integrations.base import FromDishka as Depends  
from dishka.integrations.litestar import inject  
from litestar import Controller, route, HttpMethod  
from litestar import status_codes  
from litestar.exceptions import HTTPException  
from litestar.params import Body  
  
from book_club.application.interactors import GetBookInteractor  
from book_club.controllers.schemas import BookSchema  
  
  
class HTTPBookController(Controller):  
    path = "/book"  
  
    @route(http_method=HttpMethod.GET, path="/{book_id:uuid}")  
    @inject  
    async def get_book(  
            self,  
            book_id: Annotated[UUID, Body(description="Book ID", title="Book ID")],  
            interactor: Depends[GetBookInteractor],  
    ) -> BookSchema:  
        book_dm = await interactor(uuid=str(book_id))  
        if not book_dm:  
            raise HTTPException(status_code=status_codes.HTTP_404_NOT_FOUND, detail="Book not found")  
        return BookSchema(  
            title=book_dm.title,  
            pages=book_dm.pages,  
            is_read=book_dm.is_read,  
        )

Выше представлена реализация контроллера, который отвечает за выдачу информации о книгах. Логика построения цепочки зависимостей абсолютно такая же, как и в примере выше. На вход ожидается book_id в виде UUID, по которому идет поиск книги в базе данных.

Теперь можно все собрать, и точка входа в приложение будет выглядеть следующим образом:

# book_club/main.py
from dishka import make_async_container  
from dishka.integrations import faststream as faststream_integration  
from dishka.integrations import litestar as litestar_integration  
from faststream import FastStream  
from litestar import Litestar  
  
from book_club.config import Config  
from book_club.controllers.amqp import AMQPBookController  
from book_club.controllers.http import HTTPBookController  
from book_club.infrastructure.broker import new_broker  
from book_club.ioc import AppProvider  
  
config = Config()  
container = make_async_container(AppProvider(), context={Config: config})  
  
  
def get_faststream_app() -> FastStream:  
    broker = new_broker(config.rabbitmq)  
    faststream_app = FastStream(broker)  
    faststream_integration.setup_dishka(container, faststream_app, auto_inject=True)  
    broker.include_router(AMQPBookController)  
    return faststream_app  
  
  
def get_litestar_app() -> Litestar:  
    litestar_app = Litestar(  
        route_handlers=[HTTPBookController],  
    )  
    litestar_integration.setup_dishka(container, litestar_app)  
    return litestar_app  
  
  
def get_app():  
    faststream_app = get_faststream_app()  
    litestar_app = get_litestar_app()  
  
    litestar_app.on_startup.append(faststream_app.broker.start)  
    litestar_app.on_shutdown.append(faststream_app.broker.close)  
  
    return litestar_app

Config и container вынесены в глобальную область видимости, это необходимо, чтобы была возможность запустить HTTP и AMQP-часть изолированно друг от друга в разных процессах, если вдруг возникнет такая необходимость.

Теперь это приложение полностью готово, и его можно запустить и проверить.

Обратите внимание, что для корректной работы приложения необходимы контейнеры c RabbitMQ и Postgress, виртуальное окружение и миграции. Подробные инструкции, docker-compose.yaml и миграции вы найдете в исходном коде проекта.

Запускаем приложение:

uvicorn --factory book_club.main:get_app --reload

Пишем в очередь create_book сообщение:

rabbitmqadmin -u $RABBITMQ_USER -p $RABBITMQ_PASS \
publish exchange=amq.default routing_key=create_book \
payload='{"title": "The Brothers Karamazov", "pages": 928, "is_read": true}'

Читаем из очереди book_statuses сообщение и получаем UUID:

rabbitmqadmin -u $RABBITMQ_USER -p $RABBITMQ_PASS get queue=book_statuses count=1
+---------------+--------------------------------------+
|  routing_key  |               payload                | 
+---------------+--------------------------------------+
| book_statuses | bc5fe42d-bedd-42e4-b518-89d088808237 |
+---------------+--------------------------------------+

С этим UUID идем на HTTP API и забираем информацию о книге:

curl http://localhost:8000/book/bc5fe42d-bedd-42e4-b518-89d088808237
{"title":"The Brothers Karamazov","pages":928,"is_read":true}

Если вы хотите запустить HTTP и AMQP в разных процессах, это можно сделать вот так:

faststream --factory book_club.main:get_faststream_app --reload

uvicorn --factory book_club.main:get_litestar_app --reload

Заключение

Спасибо за внимание! Призываю вас зайти в исходный код прототипа и изучить его самостоятельно. Если этот туториал окажется полезным (я это пойму по плюсикам к статье), выпущу вторую часть с демонстрацией, как описанный проект тестировать. Также вступайте в комьюнити — там вы можете встретить контрибьюторов технологий, о которых шла речь в статье, и задать им интересующие вас вопросы: Russian ASGI Community, FastStream Community, Reagento Community.

© Habrahabr.ru