Propan — Python фреймворк для написания микросервисов с использованием брокеров сообщений

Так исторически сложилось, что последние 5 лет своей продуктовой разработки я работаю с микросервисами вокруг брокеров сообщений (преимущественно RabbitMQ и Kafka).

И все это время меня не покидало чувство некой неудовлетворенности и неполноценности инструментария, который был мне доступен.

Приходя из мира HTTP фреймворков, ты чувствуешь себя как будто на костылях — ни тебе hotreload'а, который есть практически в любом wsgi-asgi сервере, хочешь тестировать — поднимай контейнеры окружения или мокай зависимости (особенно удобно в CI, ага), не забудь о реконнектах, логировании, трассировке и тд и тп.

И вот, таская от сервиса к сервису ворох всех этих проблем (и код который эти проблемы решает), до меня дошла гениальная идея: оформить весь однотипный код, общий для всех сервисов в единый пакет!

Так появился на свет фреймворк Propan.

11e643bc417af0bbee951d55f10afdc5.jpg

Зачем вам использовать Propan

Фреймворк слеплен по образу и подобию FastAPI, но для работы с брокерами сообщений и с учетом лично моих болей, которые возникали при работе с этим HTTP-фреймворком. Propan в большей степени открыт для расширения и не диктует вам, как именно его использовать.

От FastAPI мы имеем:

  • Валидацию и приведение типов входящих сообщений с помощью Pydantic

  • Систему внедрения зависимостей

  • Максимально простой и понятный всем и каждому способ написания приложения

Особенности, которые выделяют Propan из ряда других фреймворков и нативных библиотек для работы с брокерами сообщений:

  • Независимость от брокеров — код, который вы пишете, не зависит от используемого брокера. Вы можете легко мигрировать с RabbitMQ на Kafka или Nats при росте нагрузки.

  • RPC поверх MQ — вам не нужно думать, как превратить Messaging в RPC. Если запрос ожидает ответ — фреймворк сделает все за вас.

  • Тестируемость — фреймворк позволяет эмулировать поведение брокеров и тестировать ваши сервисы без необходимости подключения к внешним зависимостям.

  • Собственный CLI — позволяет управлять настройками приложения, количеством запущенных инстансов, генерирует шаблоны проекта, а также перезагружает ваш проект при изменениях в коде (мне этого очень не хаватало при локальной разработке).

На текущий момент фрейморк поддерживает работу с RabbitMQ, Redis Pub/Sub, Nats. Поддержка Kafka ожидается в ближайший месяц, затем — работа над генерацией схемы в соответсвии с AsyncAPI.

Пример

Давайте немного опустим все эти сухие слова и перейдем к практическому примеру.

Например, вот код «приложения» для работы с RabbitMQ с использование aio-pika

import asyncio
import aio_pika

async def main():
	connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")

	async with connection:
		channel = await connection.channel()

		queue = await channel.declare_queue("test_queue")
		async with queue.iterator() as queue_iter:
			async for message in queue_iter:
				async with message.process():
					print(message.body)

asyncio.run(main())

А вот, тот же пример с использование Propan

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

@broker.handle("test_queue")
async def base_handler(body: str):  # тело сообщения будет приведено к `str`
	print(body)

В нем вам не нужно самостоятельно объявлять все подключения, очереди, заниматься обработкой сообщения: вы просто пишете код, а фреймворк делает все за вас. Однако, при необходимости, у вас остается возможность управлять этим поведением вручную.

При этом, пример с использованием Propan дает вам все преимущества, описанные в предыдущем разделе.

Пример с использованием Redis или Nats идентичен с точностью до аргументов функций.

from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")

app = PropanApp(broker)

@broker.handle("test")
async def base_handler(body: str):
	print(body)

Использование в HTTP сервисах

Очень часто нам нужно использовать логику работы с брокерами сообщений в рамках приложений, которые обрабатывает и HTTP-запросы.

Без проблем! Просто запустите и остановите Propan вместе с вашим приложением.

Пример с aiohttp

from aiohttp import web
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.handle("test")
async def base_handler(body):
	print(body)

async def start_broker(app):
	await broker.start()

async def stop_broker(app):
	await broker.close()

app = web.Application()
app.on_startup.append(start_broker)  # запускаем Propan при старте
app.on_cleanup.append(stop_broker)   # и останавливаем вместе с приложением
web.run_app(app)

Кроме этого, если вы используете FastAPI, вы можете использовать Propan напрямую — как часть вашего приложения FastAPI.

from fastapi import FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

app = FastAPI()
router = RabbitRouter("amqp://guest:guest@localhost:5672")

class Incoming(BaseModel):
	...
  
@router.event("test")
async def hello(body: Incoming):
	print(body)

app.include_router(router)

Заключение

Не вижу смысла пересказывать всю документацию фреймворка, вы можете найти ее здесь. Если по ходу чтения статьи у вас возникли какие-то вопросы, возможно, ответы вы также найдете там.

Сейчас фреймворк активно развивается только моими силами. Если вы заинтересованы в его дальнейшем развитии, я буду крайне благодарен за любую помощь: указание на неточности/непонятные моменты в документации, написание кода, тестирование.

В любом случае, я буду рад любому фидбеку: критике, предложениям для реализации. Если вы решите использовать фреймворк в своих проектах — буду рад услышать о вашем опыте.

© Habrahabr.ru