Python, MSA, Kafka
Всем привет! Сегодня микросервисная архитектура, что называется «на хайпе». Я перечитал достаточно много статей по данной тематике, но обнаружил, что среди всего прочего, не так много публикаций, объясняющих данную концепцию на конкретном примере (может, плохо искал). Сегодня я бы хотел пополнить ряды авторов и написать свою первую публикацию, не судите строго!
Оглавление
Для кого эта статья?
Краткое описание
Структура
FASTApi сервис (web)
Currency сервис (Scrapy)
Notify сервис (бот)
Инфраструктура
Заключение
Для кого эта статья?
Я надеюсь, что эта статья станет отличным и понятным примером для разработчика, только начинающего свой путь в микросервисах. Многие аспекты данной концепции требуют сбора большого количества информации из разных источников. При этом попутно возникающие вопросы часто сбивают с толку и отвлекают от первой конечной цели — «пощупать» своими руками взаимодействие нескольких независимых приложений, настроить между ними работу. Когда некое ядро понимания происходящего будет сформировано, полагаю, вы вернетесь к более узким и специфическим вопросам в этой теме, и понять их вам станет легче.
Краткое описание
Вашему вниманию представлена система, состоящая из 3-ех микросервисов:
Приложение на FASTApi;
Веб-парсер на Scrapy;
Телеграмм бот на Aiogram;
Что эта «система» собирается делать? Все достаточно просто — мы создадим end-point, который будет принимать в себя два параметра — телеграм ID и код валюты. Далее эти данные отправятся в парсер (Scrapy), который по полученному коду валюты, узнает ее курс в рублях. Напоследок, собранную информацию примет бот и отправит пользователю. Чтобы стало еще понятнее, предлагаю посмотреть на рис. 1:
Рис. 1. Схема работы микросервисов
В качестве языка программирования будем использовать Python, также нам понадобится Redis, Kafka и Zookeper. Чтобы «поднять» все это зло добро, прибегнем к Docker-у.
Структура
Для того, чтобы в дальнейшем читатель не «забуксовал» в попытке понять расположение того или иного файла или куска кода, считаю нужным пояснить как будет располагаться структура файлов и папок:
MSA/
docker‑compose.yml;
.env;
services/ (в ней будут находиться все наши микросервисы);
currency/ (сервис на Scrapy, ответственный за парсинг валют);
web/ (сервис на FASTApi, ответственный за получение входных данных);
notify/ (сервис с ботом, ответственный за отправку сообщения пользователю);
Конечно, у каждого сервиса должен быть свой .env файл. Однако здесь мы умышленно согрешим упростим и создадим один общий для всех. Также на первой строчке каждого блока кода, в качестве комментария будет располагаться путь к этому файлу.
Hidden text
Весь код для изучения/клонирования можно найти в этом репозитории. Однако я настоятельно рекомендую прописать все своими руками.
FASTApi сервис (web)
Начнем с самого главного — точки входа в нашу систему. Установим все необходимые зависимости, их, кстати, будем хранить в poetry. В web сервисе инициализируем окружение.
cd services/web && poetry init
И сразу установим зависимости
poetry add fastapi uvicorn aiokafka
Как уже говорилось раннее — мы будем использовать Apache Kafka в качестве брокера сообщений. В Python есть асинхронный пакет — aiokafka — им мы и воспользуемся.
Чтобы избежать «кучи-малы» и не мешать .py
файлы с другими — внутри директории web создадим еще одну директорию app — и в ней уже будем располагать весь код.
Начнем с создания схем, здесь ничего сложного, мы ожидаем всего два параметра:
# services/web/app/schemas.py
from pydantic import BaseModel
class Message(BaseModel):
currency_char_code: str
telegram_id: int
Опишем нужный нам end-point:
# services/web/app/main.py
from fastapi import FastAPI
from web.schemas import Message
app = FastAPI()
@app.post("/currency-info")
async def send(message: Message):
return message.model_dump()
Пока что будем просто возвращать то, что передал пользователь, а логику с брокером добавим позже. Далее укажем необходимые настройки и переменные, которые в будущем будем использовать для взаимодействия с брокером:
# services/web/app/settings.py
import os
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
PRODUCE_TOPIC = os.getenv("WEB_TOPIC")
# .env
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
WEB_TOPIC=web
WEB_TOPIC — топик, куда наш web-сервис будет отправлять сообщения. В будущем Scrapy сервис будет этот топик слушать.
KAFKA_BOOTSTRAP_SERVERS — адрес сервера брокера (их может быть несколько, в нашем случае всего один).
Теперь перейдем к написанию класса-оболочки над нашим «продьюсером»:
# services/web/app/producer.py
from aiokafka import AIOKafkaProducer
from web.app import settings
import asyncio
event_loop = asyncio.get_event_loop()
class AIOWebProducer(object):
def __init__(self):
self.__producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
loop=event_loop,
)
self.__produce_topic = settings.PRODUCE_TOPIC
async def start(self) -> None:
await self.__producer.start()
async def stop(self) -> None:
await self.__producer.stop()
async def send(self, value: bytes) -> None:
await self.start()
try:
await self.__producer.send(
topic=self.__produce_topic,
value=value,
)
finally:
await self.stop()
def get_producer() -> AIOWebProducer:
return AIOWebProducer()
Кто такой «продьюсер»? Говоря простым языком — это специальный класс, который может взаимодействовать с брокером сообщений и предоставляет интерфейс отправки сообщения по нужному топику.
Мы воспользовались классом AIOKafkaProducer
из модуля aiokafka
. Он представляет из себя интерфейс взаимодействия с брокером от лица производителя (продьюсера) сообщений. В качестве параметров мы указали текущий цикл событий и KAFKA_BOOTSTRAP_SERVERS
.
В нашем классе-обертке ничего сложного нет. Методы start
и stop
просто оболочки над одноименными методами класса AIOKafkaProducer
. start
— открывает соединение с кластером Kafka, stop
— очищает все ожидающие данные и закрывает соединение.
Метод send
— такая же надстройка, только она инкапсулирует создание и закрытие соединения перед отправкой сообщения по нужному топику. Также стоит отметить, что отправлять данные мы будем в виде байтов. Это следует из документации пакета и устройства самой Kafka.
Функция get_producer
нужна для удобной инъекции продюсера в наш единственный end-point. Давайте как раз этим и займемся:
# web/app/main.py
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi import FastAPI, Depends
from web.app.schemas import Message
from web.app.producer import get_producer
import json
if TYPE_CHECKING:
from web.app.producer import AIOWebProducer
app = FastAPI()
@app.post("/currency-info")
async def send(message: Message, producer: AIOWebProducer = Depends(get_producer)) -> None:
message_to_produce = json.dumps(message.model_dump()).encode(encoding="utf-8")
await producer.send(value=message_to_produce)
В целом, практически ничего не изменилось, кода действительно немного (2 строчки). Сначала мы сериализуем объект сообщения (переводим его в байты), после чего отправляем полученный набор байтов в назначенный топик при помощи метода send. Все.
Последнее, что нам осталось сделать для этого микросервиса — написать для него Dockerfile:
# services/web/Dockerfile
FROM python:3.11-slim-buster
WORKDIR /usr/src/web
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH=/usr/src
COPY poetry.lock pyproject.toml ./
RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root
COPY app app/
WORKDIR ./app
Currency сервис (Scrapy)
Как и до этого инициализируем окружение и добавим необходимые зависимости:
cd services/currency && poetry init
poetry add scrapy redis aiokafka
В этот раз мы не будем создавать папку app, так как Scrapy сделает это за нас и даже лучше. Вместо этого инициализируем проект при помощи команды:
scrapy startproject currency
Давайте избавимся от лишней вложенности папок currency, немного раскроем матрешку и приведем ее к такому виду:
Рис. 2. Структура файлов Scrapy сервиса
Теперь поговорим о том, как будет парситься информация и куда она будет попадать. Итак, в Scrapy есть «пауки», суть которых как раз парсить данные с чего-либо. После того как данные будут получены, мы положим их в Redis, чтобы на каждый запрос пользователя не ходить в интернет. Сгенеририруем базовую комплектацию «паука», для этого введем:
scrapy genspider currency_v1 https://www.cbr.ru/scripts/xml_daily.asp
Далее в папке spiders должен был появиться каркас нашего будущего «паука». Давайте немного подредактируем его код:
# services/currency/currency/spiders/currency_v1.py
import scrapy
class CurrencyV1Spider(scrapy.Spider):
name = "currency_v1"
allowed_domains = ["www.cbr.ru"]
start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"]
def parse(self, response, **kwargs) -> None:
currencies = response.xpath("//ValCurs//Valute")
for currency in currencies:
print(currency)
Итак, чтобы убедиться, что «паук» действительно что-либо парсит, предлагаю запустить его и проверить, для этого пишем в консоль:
scrapy crawl currency_v1
Если в терминале посыпалось кучу различной информации вида
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
WEB_TOPIC=web
REDIS_HOST=redis
REDIS_PORT=6379
А, во-вторых, добавим необходимые настройки, чтобы наш сервис подключался к Redis:
# services/currency/currency/settings.py
import os
BOT_NAME = "currency"
SPIDER_MODULES = ["currency.spiders"]
NEWSPIDER_MODULE = "currency.spiders"
ROBOTSTXT_OBEY = True
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"
# --- REDIS SETTINGS
CURRENCY_REDIS_CACHE_TIME_IN_SECONDS = 60
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}"
Я удалил абсолютно весь закомментированный автосгенерированный фреймворком код и добавил настройки для нашего NoSQL хранилища. Теперь в методе parse
нашего паука будем сохранять поступившую информацию в Redis:
# services/currency/currency/spiders/currency_v1.py
from currency.currency import settings
import scrapy
import redis
import json
class CurrencyV1Spider(scrapy.Spider):
name = "currency_v1"
allowed_domains = ["www.cbr.ru"]
start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"]
def parse(self, response, **kwargs) -> None:
currencies = response.xpath("//ValCurs//Valute")
with redis.from_url(url=settings.REDIS_URL) as redis_client:
for currency in currencies:
redis_client.set(
name=self.get_currency_redis_key(currency),
value=self.get_currency_redis_value(currency),
ex=settings.CURRENCY_REDIS_CACHE_TIME_IN_SECONDS,
)
@staticmethod
def get_currency_redis_key(selector) -> str:
return selector.xpath(".//CharCode//text()").get()
@staticmethod
def get_currency_redis_value(selector) -> bytes:
return json.dumps(
{
"currency_value": selector.xpath(".//Value//text()").get()
}
).encode("utf-8")
Теперь давайте создадим какой-нибудь класс-контроллер, с помощью которого мы сможем получить курс валюты по ее коду:
# services/currency/currency/controller.py
from multiprocessing import Process, Queue
from scrapy.crawler import CrawlerProcess
from currency.currency import settings
from currency.currency.spiders.currency_v1 import CurrencyV1Spider
from redis import asyncio as aioredis
import json
class CurrencyController(object):
def update_redis_cache(self) -> None:
queue = Queue()
process = Process(target=self._crawl_currency, args=(queue,))
process.start()
none_or_exception = queue.get()
process.join()
if none_or_exception is not None:
raise none_or_exception
@staticmethod
def _crawl_currency(queue: Queue) -> None:
try:
process = CrawlerProcess()
process.crawl(CurrencyV1Spider)
process.start()
queue.put(None)
except Exception as exc:
queue.put(exc)
async def get_currency_info(self, currency_char_code: str) -> dict:
if currency_info := await self.get_currency_from_redis(currency_char_code):
return currency_info
self.update_redis_cache()
return await self.get_currency_from_redis(currency_char_code)
@staticmethod
async def get_currency_from_redis(char_code: str) -> dict | None:
async with aioredis.from_url(settings.REDIS_URL) as redis_client:
if currency_info := await redis_client.get(name=char_code):
return json.loads(currency_info)
Можно не обращать внимание на танцы с бубном вокруг multiprocessing
, просто Scrapy не позволяет запустить паук в том же самом процессе, поэтому для каждого запуска мы создаем новый. Возможно (и скорее всего) это не самое лучшее решение, однако я допускаю его в рамках данной статьи для упрощения. Итак, возвращаемся к брокеру.
В этом сервисе нам необходим как производитель, так и потребитель. Потребитель будет слушать на входящие сообщения от web сервиса, производитель отправлять notify сервису. Добавим необходимые настройки:
# services/currency/currency/settings.py
# --- SCRAPY SETTINGS
...
# --- REDIS SETTINGS
...
# --- KAFKA SETTINGS
PRODUCE_TOPIC = os.getenv("CURRENCY_TOPIC")
CONSUME_TOPIC = os.getenv("WEB_TOPIC")
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
Не забываем про .env:
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
WEB_TOPIC=web
REDIS_HOST=redis
REDIS_PORT=6379
CURRENCY_TOPIC=currency
И пишем код в main.py
для прослушивания и отправки сообщений посредством брокера:
# services/currency/currency/main.py
import json
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from currency.currency import settings
from currency.currency.controller import CurrencyController
async def start_consumer() -> AIOKafkaConsumer:
consumer = AIOKafkaConsumer(
settings.CONSUME_TOPIC,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
)
await consumer.start()
return consumer
async def start_producer() -> AIOKafkaProducer:
producer = AIOKafkaProducer(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
return producer
async def main() -> None:
controller = CurrencyController()
consumer = await start_consumer()
producer = await start_producer()
try:
async for message in consumer:
decoded_message = json.loads(message.value)
currency_info = await controller.get_currency_info(
currency_char_code=decoded_message.get("currency_char_code"),
) or dict()
currency_info["telegram_id"] = decoded_message["telegram_id"]
info_to_send = json.dumps(currency_info).encode(encoding="utf-8")
await producer.send(topic=settings.PRODUCE_TOPIC, value=info_to_send)
finally:
await consumer.stop()
await producer.stop()
if __name__ == '__main__':
asyncio.run(main())
Здесь мы инициализируем производителя и потребителя, слушаем на входящие сообщения, как только оно появляется — парсим информацию о валюте и отправляем уже телеграмм боту. Еще раз: слушаем WEB_TOPIC, а производим в CURRENCY_TOPIC .
Осталось написать Dockerfile:
# services/currency/Dockerfile
FROM python:3.11-slim-buster
WORKDIR /usr/src/currency/
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH=/usr/src
COPY poetry.lock pyproject.toml ./
RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root
COPY currency currency/
WORKDIR ./currency
Notify сервис (бот)
Считаю нецелесообразно рассказывать в этой статье, как создавать своего телеграмм-бота. В интернете масса информации по этой теме. Начинаем мы с того, что у вас есть токен и вы знаете зачем он нужен.
Как и в предыдущем сервисе инициализируем poetry:
cd services/notify && poetry init
И добавим необходимые зависимости:
poetry add aiogram aiokafka
Создадим внутри директории notify новую директорию app
, а в ней settings.py
и добавим туда следующий код:
import os
BOT_TOKEN = os.getenv("BOT_TOKEN")
CONSUME_TOPIC = os.getenv("CURRENCY_TOPIC")
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
Из интересного здесь то, что в качестве прослушиваемого топика, мы используем CURRENCY_TOPIC
, куда отправляет сообщение наш микро-сервис на Scrapy. Также не забудьте в .env
файл добавить переменную BOT_TOKEN
и указать в ней токен вашего бота. Файл с переменными окружениями по итогу должен выглядеть примерно так:
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
WEB_TOPIC=web
REDIS_HOST=redis
REDIS_PORT=6379
CURRENCY_TOPIC=currency
BOT_TOKEN=#токен
Далее рядом создадим main.py,
в нем будет запуск бота и прослушивание входящих сообщений:
from aiogram import Bot
from aiogram import Dispatcher
from aiokafka import AIOKafkaConsumer
from notify.app import settings
import asyncio
import json
dispatcher = Dispatcher()
BOT = Bot(token=settings.BOT_TOKEN)
async def consume() -> None:
consumer = AIOKafkaConsumer(
settings.CONSUME_TOPIC,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
)
await consumer.start()
try:
async for msg in consumer:
serialized = json.loads(msg.value)
await BOT.send_message(
chat_id=serialized.get("telegram_id"),
text=serialized.get("currency_value") or "Валюта не найдена",
)
finally:
await consumer.stop()
async def main() -> None:
polling = asyncio.create_task(dispatcher.start_polling(BOT))
consuming = asyncio.create_task(consume())
await asyncio.gather(polling, consuming)
print("Bot has successfully started polling")
if __name__ == "__main__":
asyncio.run(main())
Будем отделять мух от котлет и обращать внимание на код, связанный с потреблением сообщений.
Мы воспользовались классом
AIOKafkaConsumer
, куда передали топик, который мы слушаем, иBOOTSTRAP_SERVERS
.Асинхронная функция
consume
состоит глобально из трех частей: устанавливаем соединение с брокером —await consumer.start()
, слушаем на входящие сообщения —async for msg in consumer
. Если сообщение есть — десериализуем его и отправляем пользователю в его телеграмм при помощи методаsend_message
.
Как вы поняли, здесь отсутствует «продьюсер», так как это конечная точка нашей системы, откуда сообщение уже отправится пользователю в телеграмм бот.
Также напишем боту Dockerfile, он почти ничем не отличается от предыдущего:
# services/notify/Dockerfile
FROM python:3.11-slim-buster
WORKDIR /usr/src/notify
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH=/usr/src
COPY poetry.lock pyproject.toml ./
RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root
COPY app app/
WORKDIR ./app
Инфраструктура
Для того, чтобы развернуть все наши шестеренки, опишем docker-compose.yml
:
Hidden text
version: '3.3'
services:
web:
restart: on-failure
build:
context: services/web/
dockerfile: Dockerfile
container_name: "web"
ports:
- "8000:8000"
environment:
KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS
WEB_TOPIC: $WEB_TOPIC
command: uvicorn main:app --reload --host 0.0.0.0 --port 8000
currency:
restart: on-failure
build:
context: services/currency/
dockerfile: Dockerfile
container_name: "currency"
environment:
KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS
WEB_TOPIC: $WEB_TOPIC
CURRENCY_TOPIC: $CURRENCY_TOPIC
REDIS_HOST: $REDIS_HOST
REDIS_PORT: $REDIS_PORT
command: python main.py
bot:
restart: on-failure
build:
context: services/notify/
dockerfile: Dockerfile
container_name: "notify"
environment:
KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS
CURRENCY_TOPIC: $CURRENCY_TOPIC
BOT_TOKEN: $BOT_TOKEN
command: python main.py
zookeeper:
image: 'bitnami/zookeeper:3.7.0'
container_name: zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2.8.0'
container_name: kafka
ports:
- "9093:9093"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
redis:
image: redis
container_name: redis
environment:
REDIS_HOST: REDIS_HOST
ports:
- "6379:6379"
Момент истины — поднимаем:
docker compose up -d --build
Заранее следует проверить, что ничего не «отвалилось»:
docker ps
В выводе должно быть шесть рабочих контейнеров. Если это так — убедитесь, что вы начали диалог с ботом и, если оно тоже так, смело переходим по адресу и отправляем POST-запрос нашему единственному енд-поинту, куда передаем свой телеграмм ID и, допустим, USD. Ответом на это действие должно быть сообщение в телеграмме от нашего бота с одиноким и неприятным числом.
Заключение
Напоследок, хотелось бы сказать, что в данной статье намеренно пропущены некоторые моменты с настройкой Kafka, с ее переменными, которые можно указать в docker-compose.yml
. Я не хотел перегружать статью информацией, так как, полагаю, что для человека, который будет работать с этим инструментом впервые, важнее получить практический опыт.
Я благодарен, если вы дочитали статью до этого момента. Надеюсь, я смог в понятной форме рассказать и, что главное, показать, как работает брокер на поверхностном уровне, не залезая в дебри. До новых встреч!