Propan — Python фреймворк для написания микросервисов с использованием брокеров сообщений
Так исторически сложилось, что последние 5 лет своей продуктовой разработки я работаю с микросервисами вокруг брокеров сообщений (преимущественно RabbitMQ и Kafka).
И все это время меня не покидало чувство некой неудовлетворенности и неполноценности инструментария, который был мне доступен.
Приходя из мира HTTP фреймворков, ты чувствуешь себя как будто на костылях — ни тебе hotreload'а, который есть практически в любом wsgi-asgi сервере, хочешь тестировать — поднимай контейнеры окружения или мокай зависимости (особенно удобно в CI, ага), не забудь о реконнектах, логировании, трассировке и тд и тп.
И вот, таская от сервиса к сервису ворох всех этих проблем (и код который эти проблемы решает), до меня дошла гениальная идея: оформить весь однотипный код, общий для всех сервисов в единый пакет!
Так появился на свет фреймворк Propan.
Зачем вам использовать Propan
Фреймворк слеплен по образу и подобию FastAPI, но для работы с брокерами сообщений и с учетом лично моих болей, которые возникали при работе с этим HTTP-фреймворком. Propan в большей степени открыт для расширения и не диктует вам, как именно его использовать.
От FastAPI мы имеем:
Валидацию и приведение типов входящих сообщений с помощью Pydantic
Систему внедрения зависимостей
Максимально простой и понятный всем и каждому способ написания приложения
Особенности, которые выделяют Propan из ряда других фреймворков и нативных библиотек для работы с брокерами сообщений:
Независимость от брокеров — код, который вы пишете, не зависит от используемого брокера. Вы можете легко мигрировать с RabbitMQ на Kafka или Nats при росте нагрузки.
RPC поверх MQ — вам не нужно думать, как превратить Messaging в RPC. Если запрос ожидает ответ — фреймворк сделает все за вас.
Тестируемость — фреймворк позволяет эмулировать поведение брокеров и тестировать ваши сервисы без необходимости подключения к внешним зависимостям.
Собственный CLI — позволяет управлять настройками приложения, количеством запущенных инстансов, генерирует шаблоны проекта, а также перезагружает ваш проект при изменениях в коде (мне этого очень не хаватало при локальной разработке).
На текущий момент фрейморк поддерживает работу с RabbitMQ, Redis Pub/Sub, Nats. Поддержка Kafka ожидается в ближайший месяц, затем — работа над генерацией схемы в соответсвии с AsyncAPI.
Пример
Давайте немного опустим все эти сухие слова и перейдем к практическому примеру.
Например, вот код «приложения» для работы с RabbitMQ с использование aio-pika
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("test_queue")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
asyncio.run(main())
А вот, тот же пример с использование Propan
from propan import PropanApp, RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)
@broker.handle("test_queue")
async def base_handler(body: str): # тело сообщения будет приведено к `str`
print(body)
В нем вам не нужно самостоятельно объявлять все подключения, очереди, заниматься обработкой сообщения: вы просто пишете код, а фреймворк делает все за вас. Однако, при необходимости, у вас остается возможность управлять этим поведением вручную.
При этом, пример с использованием Propan дает вам все преимущества, описанные в предыдущем разделе.
Пример с использованием Redis или Nats идентичен с точностью до аргументов функций.
from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")
app = PropanApp(broker)
@broker.handle("test")
async def base_handler(body: str):
print(body)
Использование в HTTP сервисах
Очень часто нам нужно использовать логику работы с брокерами сообщений в рамках приложений, которые обрабатывает и HTTP-запросы.
Без проблем! Просто запустите и остановите Propan вместе с вашим приложением.
Пример с aiohttp
from aiohttp import web
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
async def start_broker(app):
await broker.start()
async def stop_broker(app):
await broker.close()
app = web.Application()
app.on_startup.append(start_broker) # запускаем Propan при старте
app.on_cleanup.append(stop_broker) # и останавливаем вместе с приложением
web.run_app(app)
Кроме этого, если вы используете FastAPI, вы можете использовать Propan напрямую — как часть вашего приложения FastAPI.
from fastapi import FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter
app = FastAPI()
router = RabbitRouter("amqp://guest:guest@localhost:5672")
class Incoming(BaseModel):
...
@router.event("test")
async def hello(body: Incoming):
print(body)
app.include_router(router)
Заключение
Не вижу смысла пересказывать всю документацию фреймворка, вы можете найти ее здесь. Если по ходу чтения статьи у вас возникли какие-то вопросы, возможно, ответы вы также найдете там.
Сейчас фреймворк активно развивается только моими силами. Если вы заинтересованы в его дальнейшем развитии, я буду крайне благодарен за любую помощь: указание на неточности/непонятные моменты в документации, написание кода, тестирование.
В любом случае, я буду рад любому фидбеку: критике, предложениям для реализации. Если вы решите использовать фреймворк в своих проектах — буду рад услышать о вашем опыте.