Mela: асинхронный фреймворк на Python для сервисов, работающих с RabbitMQ
WARNING: длинная вступительная часть. Если хотите перейти сразу к делу — листайте до Getting Started.
Вступление
В 2023 году писать сервисы, взаимодействующие друг с другом через RabbitMQ, всё ещё неоправданно сложно. Ещё больше сложностей возникает с тестированием бизнес-логики в них, с согласованием контрактов между ними, с организацией монорепозиториев.
В недрах компании Alem Research, ещё года с 19-го, я начал писать легковесный фреймворк, который по задумке должен был стать чем-то вроде Flask или FastAPI для работы с RabbitMQ.
Удалось ли? Пожалуй, да. При помощи этого фреймворка наши дата-саентисты смогли самостоятельно писать и упаковывать в докер сервисы, которые лёгким движением руки и парой перебиндов встраиваются в трубу.
Впрочем, первая версия работала ещё с kombu, а позже ядро было переписано на aio-pika, и фреймворк стал ещё и асинхронным. Что даёт асинхронность? Например, можно написать фетчер HTTP-страниц, который при увеличении prefetch_count
будет фетчить параллельно огромное количество страниц, и даже сможет утилизировать сеть практически полностью, чего без асинхронности было бы довольно сложно добиться.
Кроме того, асинхронность позволяет легче лёгкого реализовать паттерн RPC через RabbitMQ в рамках одного процесса, что довольно удобно для тестирования. Да и интеграция с FastAPI становится очень удобной. А скорость обработки растёт в десятки раз.
В 2020-м мы совместно с фаундером и тогдашним генеральным директором решили, что фреймворк смело можно выкладывать в open-source. И в целом, казалось бы, всё хорошо.
Однако, есть и проблемы. Главная из них — это нехватка живых пользователей и контрибьюторов. Я ушёл из Alem Research, и в компании продолжать работать над фреймворком в данный момент некому. Иногда кто-то из компании пишет мне запросы по фреймворку, но мне очень лень выкатывать обновления для одной-единственной компании: это получается своего рода работа на работодателя, который тебе больше не платит :)
Я давно уже собирался написать питчинг-статью про Mela, чтобы привлечь новых пользователей, а возможно даже и контрибьюторов, но не решался, потому что код был очень далёк от идеала. Да-да, меня просто жрал внутренний перфекционизм и стыд за то, чтобы выкатить что-то не очень красивое. Но сегодня я понял, что стыдиться мне особо нечего: да, архитектура ядра не идеальна, но за последние пару месяцев, пока я сидел без работы, я уже продумал её со всех сторон, и знаю, что делать дальше. В общем, красивая версия не готова, но у меня есть план.
У меня есть план
И я решил, что будет неправильно (и долго) реализовывать свой план в одно лицо, лучше сразу попробовать привлечь к нему потенциальных контрибьюторов. В общем, если вас заинтересует то, что описано ниже — добро пожаловать на борт!
Извините за долгое вступление. Мы наконец-то отправляемся в путь.
Getting started
Конечно же, всё начинается с:
pip install mela=1.1.1
Версию я указал чтобы статья оставалась релевантной даже после обновлений.
Давайте напишем сервис, который будет получать сообщение из очереди, выводить его body распарсенный в json, и возвращать сообщение обратно.
# app.py
from mela import Mela
app = Mela(__name__)
@app.service("printer")
def printer(body, message):
print(body)
return body
if __name__ == '__main__':
app.run()
Как вы можете догадаться, это ещё не всё, что требуется для запуска приложения: здесь нет никакой информации об очередях и подключении. Это потому что она хранится в файле application.yml
.
# application.yml
connections:
default:
host: localhost
port: 5672
username: user
password: bitnami
services:
printer:
consumer:
exchange: general-sentiment-x
routing_key: general-sentiment-q
queue: general-sentiment-q
publisher:
exchange: general-sentiment-x
routing_key: general-sentiment-q
Вот и всё.
Этот пример знакомит нас сразу с тремя самыми основными высокоуровневыми концепциями Мелы: Publisher, Consumer и Service.
Если с паблишером и консьюмером всё понятно, то про Service, пожалуй, поясню, что это элемент трубы, который состоит из комбинации консьюмера и паблишера: он консьюмит сообщения из определённой очереди, обрабатывает его, и паблишит в указанный эксчейндж. По сути, сервис состоит из паблишера и консьюмера, что в том числе отражено в Yaml-файле.
Функция, которая находится под декоратором @app.service(...)
, как не сложно догадаться, является коллбэком консьюмера, а то, что она возвращает — будет отправлено в паблишер, привязанный к сервису.
Pydantic
Какой современный фреймворк без Pydantic, правда? И их есть у меня.
# app.py
from pydantic import BaseModel
from datetime import datetime
from mela import Mela
app = Mela(__name__)
class Document(BaseModel):
text: str
url: str
date: datetime
@app.service('validator')
def validator(body: Document) -> Document:
if '#' in body.url:
body.url = body.url.split('#')[0]
return body
if __name__ == '__main__':
app.run()
В данном случае мы используем new-style сигнатуру обработчика, и явно указывать второй аргумент, в который в предыдущем примере прилетел бы объект Message, нам не нужно.
body сначала переводится в json, потом этот json скармливается в класс Document.
При ошибках валидации ошибки будут выведены в консоль, а сообщение вернётся обратно в очередь. Если, конечно, в консьюмереrequeue_broken_messages=True
(по умолчанию так) и если у консьюмера не задан dead letter exchange
. То же самое будет происходить если в коллбэке зарейзится любая другая ошибка.
Управление ack/nack
Есть несколько способов управлять ответами. Давайте рассмотрим все сразу.
from datetime import datetime
from datetime import timedelta
from pydantic import BaseModel
from mela import IncomingMessage
from mela import Mela
from mela.components.exceptions import NackMessageError
app = Mela(__name__)
class Document(BaseModel):
text: str
url: str
date: datetime
@app.service("filter")
async def filter_(body: Document, message: IncomingMessage):
if body.date > datetime.utcnow():
# First way: we can raise special exception with some `requeue` value
raise NackMessageError("We are not working with time travellers", requeue=False)
elif body.date < datetime.utcnow() - timedelta(days=365):
# Second way: we can manually nack message via IncomingMessage object
# As you can see, in this case we can't write any message about requeue reason.
# But it is still useful if you need to silently send message to DLX
await message.nack(requeue=False) # Go to archive, dude
if body.url == '':
# Third way: we can raise almost any exception. The message should be or should not
# be requeued based on `requeue_broken_messages` value
raise AssertionError("Message without url is not acceptable")
return body
if __name__ == '__main__':
app.run()
Чистый консьюмер
Консьюмер создаётся точно так же, как сервис, но с другим декоратором.
from pydantic import BaseModel
from pydantic import EmailStr
from mela import Mela
app = Mela(__name__)
class EmailNotification(BaseModel):
template_name: str
vars: dict
receiver: EmailStr
@app.consumer("email-sender")
def printer(body: EmailNotification):
# Some Jinja2 and SMTP integration
pass
if __name__ == '__main__':
app.run()
Если в ходе обработки сообщения не произошло никакой ошибки, то сообщение будет акнуто автоматически. Это поведение можно изменить. Однако, чаще всего оно всех устраивает. В любом случае, можно вручную акнуть сообщение, думаю, вы уже догадались, как это сделать.
Ещё важно показать application.yml
для консьюмера:
connections:
default:
host: localhost
port: 5672
username: user
password: bitnami
consumers:
email-sender:
exchange: notifications-x
exchange_type: topic
routing_key: "email.#"
queue: email-sender-q
Собственно, да. Просто другое название блока. И невзначай показанный пример работы с эксчейнджами других типов :)
Чистый паблишер и интеграция с FastAPI
from datetime import datetime
from uuid import uuid4
from fastapi import FastAPI
from mela import Mela
from mela.settings import Settings
from pydantic import BaseModel
app = FastAPI()
mela_app = Mela(__name__)
mela_app.settings = Settings()
class ReportRequest(BaseModel):
start_date: datetime
end_date: datetime
user_id: str
report_id: str | None = None
@app.post("/report")
async def read_root(report_request: ReportRequest):
if report_request.report_id is None:
report_request.report_id = str(uuid4())
# some DB writing
publisher = await mela_app.publisher_instance('report-generator')
await publisher.publish(report_request)
return report_request
application.yml
:
connections:
default:
host: localhost
port: 5672
username: user
password: bitnami
publishers:
report-generator:
exchange: report-x
routing_key: new-report
Тут даже не знаю что комментировать. Разве что добавлю, что в будущем хотелось бы сделать внешний декоратор, чтобы инжектить паблишеры и RPC клиенты в сторонние функции так же, как в следующем примере. Но это уже к планам на будущее. А пока перейдём к следующему примеру.
Инъекции дополнительных паблишеров
Иногда нам нужно реализовать сплиттер или просто по ходу обработки сообщения что-то куда-то запаблишить. Можно, конечно, сделать как в предыдущем разделе, но можно сделать круче:
from datetime import datetime
from pydantic import BaseModel
from mela import Mela
from mela.components import Publisher
app = Mela(__name__)
class Document(BaseModel):
text: str
url: str
date: datetime
has_images: bool = False
@app.service('archiver')
async def archiver(document: Document, images_downloader: Publisher = 'images-downloader') -> Document:
# archiving document
if document.has_images:
await images_downloader.publish(document)
return document
if __name__ == '__main__':
app.run()
application.yml
:
connections:
default:
host: localhost
port: 5672
username: admin
password: admin
services:
archiver:
consumer:
exchange: archiver-x
routing_key: archiver-q
queue: archiver-q
publisher:
exchange: notify-archived-x
exchange_type: topic
routing_key: document.archived
publishers:
images-downloader:
exchange: images-downloader-x
routing_key: images-downloader-q
На данный момент инъекции реализованы неправильно с точки зрения типизации. Есть план переписать их на новый служебный тип Annotated
. Но… Главное работает.
DLX
Для любого консьюмера поддерживается Dead Letter Exchange.
Сделать его очень просто:
connections:
default:
host: localhost
port: 5672
username: admin
password: admin
services:
service_with_dlx:
consumer:
exchange: dlx-test-x
routing_key: dlx-test-k
queue: dlx-test-q
dead_letter_exchange: dlx-test-dead-letter-x
dead_letter_routing_key: dlx-test-dead-letter-k
publisher:
exchange: test-x
routing_key: test_queue
Но вам придётся самостоятельно следить за тем, чтобы была очередь, которая будет собирать сломанные сообщения из вашего эксчейнджа.
RPC
# server.py
import asyncio
import aio_pika
from mela import Mela
app = Mela(__name__)
async def fetch(url):
# asynchronously fetching url here and return its body
await asyncio.sleep(1)
return url
@app.rpc_service("fetcher")
async def fetcher(url: str):
return {"fetched": await fetch(url)}
bots = {}
def create_bot(bot_id, bot_username, bot_password):
bots[bot_id] = {'username': bot_username, 'password': bot_password}
def get_bot(bot_id):
return bots[bot_id]
@app.rpc_service("bot_manager")
async def fetcher(body, message: aio_pika.Message):
if message.headers['method'] == 'create_bot':
create_bot(**body)
return {'result': None, 'status': "OK"}
elif message.headers['method'] == 'get_bot':
return {'result': get_bot(**body), 'status': "OK"}
else:
return {'result': None, 'status': "ERROR_UNKNOWN_METHOD"}
if __name__ == '__main__':
app.run()
# client.py
import asyncio
from mela import Mela
app = Mela(__name__)
async def main():
# RPC calls over RabbitMQ never were simpler!
fetcher = await app.rpc_client_instance("fetcher")
bot_manager = await app.rpc_client_instance("bot_manager")
res = await fetcher.call({'url': "test"})
print(res)
# we can even gather call results!
g = await asyncio.gather(fetcher.call({'url': url1}), fetcher.call({'url': url2}))
print(g)
create_bot_result = await bot_manager.call({
'bot_id': 1,
'bot_username': "LalkaPalka",
'bot_password': "supersecret",
},
headers={'method': 'create_bot'},
)
print(f"create_bot result {create_bot_result}")
get_bot_result = await bot_manager.call({'bot_id': 1}, headers={'method': 'get_bot'})
print(f"get_bot_result {get_bot_result}")
unknown_method_result = await bot_manager.call({'bot_id': 4}, headers={'method': 'getBot'})
print(f"unknown method result: {unknown_method_result}")
if __name__ == '__main__':
url1 = (
'https://tengrinews.kz/kazakhstan_news/vorvalis-dom-izbili-'
'almatinka-rasskazala-zaderjanii-supruga-459127/'
)
url2 = (
'https://www.inform.kz/ru/skol-ko-lichnyh-podsobnyh-'
'hozyaystv-naschityvaetsya-v-kazahstane_a3896073'
)
app.run(main())
connections:
default:
host: localhost
port: 5672
username: user
password: bitnami
rpc-services:
fetcher:
exchange: fetcher-x
routing_key: fetcher-k
queue: fetcher-q
response_exchange: fetching-result-x
bot_manager:
exchange: botmanager-x
routing_key: botmanager-k
queue: botmanager-q
response_exchange: botmanager-result-x
Вот вам пример сразу с двумя не конфликтующими друг с другом RPC-сервисами в одном процессе.
Тут тоже не вижу смысла что-то объяснять, но если будут вопросы — с радостью отвечу.
Несколько подключений и переменные окружения
Очень распространённый кейс — когда нам нужно перекачать данные из одного кластера реббита в другой. Сейчас мы не будем обсуждать правильность этой практики, а просто покажем, как это можно сделать легко и непринуждённо.
# application.yml
connections:
input_connection:
host: $RABBIT_INPUT_HOST
port: ${RABBIT_INPUT_PORT|5672}
username: ${RABBIT_INPUT_USERNAME|rabbitmq-bridge}
password: ${RABBIT_INPUT_PASSWORD|rabbitmq-bridge}
output_connection:
host: $RABBIT_OUTPUT_HOST
port: ${RABBIT_OUTPUT_PORT|5672}
username: ${RABBIT_OUTPUT_USERNAME|rabbitmq-bridge}
password: ${RABBIT_OUTPUT_PASSWORD|rabbitmq-bridge}
services:
bridge:
consumer:
connection: input_connection
prefetch_count: ${RABBIT_INPUT_PREFETCH_COUNT|1}
routing_key: ${RABBIT_INPUT_ROUTING_KEY}
exchange: ${RABBIT_INPUT_EXCHANGE}
queue: ${RABBIT_INPUT_QUEUE}
publisher:
connection: output_connection
routing_key: ${RABBIT_OUTPUT_ROUTING_KEY}
exchange: ${RABBIT_OUTPUT_EXCHANGE}
# app.py
from mela import Mela
app = Mela(__name__)
@app.service("bridge")
async def serve(body, message):
return body
if __name__ == '__main__':
app.run()
Как вы можете заметить, код сервиса очень простой. А вот в конфигурационном файле есть кое-что новое. Первое, что бросается в глаза — это переменные окружения. Да, их очень просто сюда вшить. Вот, кстати, пример дотэнва:
RABBIT_INPUT_HOST=localhost
RABBIT_INPUT_ROUTING_KEY=routing-key
RABBIT_INPUT_EXCHANGE=exchange
RABBIT_INPUT_QUEUE=queue
RABBIT_OUTPUT_HOST=localhost
RABBIT_OUTPUT_PORT=5673
RABBIT_OUTPUT_ROUTING_KEY=routing-key
RABBIT_OUTPUT_EXCHANGE=exchange
Всё очень просто и прямолинейно, правда?
А ещё это по сути уже можно упаковывать в докер, и запускать через оркестратор.
Второе, на что нужно обратить внимание в конфиг-файле этого раздела — это то, что здесь два коннекшена, и для консьюмера и паблишера используются разные коннекшены.
На этом у меня юзкейсы закончились. Перейдём к следующему разделу.
Производительность
Цель этой статьи — не похвастаться, да и к тому же производительность фреймворка — это целиком и полностью заслуга авторов aio-pika
, а не моя. Детальные бенчмарки — это будет тема для отдельной статьи, но пока просто скажу, что на моём не самом мощном ноуте простой бридж между двумя реббитами обрабатывает порядка 500 сообщений в секунду. А лучшее, чего мне удавалось добиться от обычной pika
— это 80–100 сообщений в секунду. В случае с фетчингом страниц по всем понятным причинам не асинхронная pika
не могла показать вообще сколько-нибудь адекватный результат.
Заключение
Как мне кажется, мне удалось составить неплохую внешнюю апишку. За исключением инъекций, но про них я писал выше. Внутри — бардак. Всё работает, конечно, но мейнтейнить и дальше развивать фреймворк будет сложно. Я уже начал переписывать ядро в чистовой вариант, но мне не хватает мотивации.
Поэтому я и написал эту статью. Если статья получит отклик, если этот фреймворк кого-то заинтересует, если найдутся люди, которые будут им пользоваться, а может даже и контрибьютить, то я обязательно продолжу работу над ним. Можете даже просто звёздочек накидать, это тоже даст мне хоть какую-то обратную связь :)
В конце дублирую ссылку на GitHub
Пишите вопросы в комментах. Ответами на самые важные буду дополнять статью, на остальные отвечу просто in-place.