Python, MSA, Kafka

Всем привет! Сегодня микросервисная архитектура, что называется «на хайпе». Я перечитал достаточно много статей по данной тематике, но обнаружил, что среди всего прочего, не так много публикаций, объясняющих данную концепцию на конкретном примере (может, плохо искал). Сегодня я бы хотел пополнить ряды авторов и написать свою первую публикацию, не судите строго!

Оглавление

  1. Для кого эта статья?

  2. Краткое описание

  3. Структура

  4. FASTApi сервис (web)

  5. Currency сервис (Scrapy)

  6. Notify сервис (бот)

  7. Инфраструктура

  8. Заключение

Для кого эта статья?

Я надеюсь, что эта статья станет отличным и понятным примером для разработчика, только начинающего свой путь в микросервисах. Многие аспекты данной концепции требуют сбора большого количества информации из разных источников. При этом попутно возникающие вопросы часто сбивают с толку и отвлекают от первой конечной цели — «пощупать» своими руками взаимодействие нескольких независимых приложений, настроить между ними работу. Когда некое ядро понимания происходящего будет сформировано, полагаю, вы вернетесь к более узким и специфическим вопросам в этой теме, и понять их вам станет легче.

Краткое описание

Вашему вниманию представлена система, состоящая из 3-ех микросервисов:

  • Приложение на FASTApi;

  • Веб-парсер на Scrapy;

  • Телеграмм бот на Aiogram;

Что эта «система» собирается делать? Все достаточно просто — мы создадим end-point, который будет принимать в себя два параметра — телеграм ID и код валюты. Далее эти данные отправятся в парсер (Scrapy), который по полученному коду валюты, узнает ее курс в рублях. Напоследок, собранную информацию примет бот и отправит пользователю. Чтобы стало еще понятнее, предлагаю посмотреть на рис. 1:

Рис. 1. Схема работы микросервисов

Рис. 1. Схема работы микросервисов

В качестве языка программирования будем использовать Python, также нам понадобится Redis, Kafka и Zookeper. Чтобы «поднять» все это зло добро, прибегнем к Docker-у.

Структура

Для того, чтобы в дальнейшем читатель не «забуксовал» в попытке понять расположение того или иного файла или куска кода, считаю нужным пояснить как будет располагаться структура файлов и папок:

MSA/

docker‑compose.yml;

.env;

services/ (в ней будут находиться все наши микросервисы);

currency/ (сервис на Scrapy, ответственный за парсинг валют);

web/ (сервис на FASTApi, ответственный за получение входных данных);

notify/ (сервис с ботом, ответственный за отправку сообщения пользователю);

Конечно, у каждого сервиса должен быть свой .env файл. Однако здесь мы умышленно согрешим упростим и создадим один общий для всех. Также на первой строчке каждого блока кода, в качестве комментария будет располагаться путь к этому файлу.

Hidden text

Весь код для изучения/клонирования можно найти в этом репозитории. Однако я настоятельно рекомендую прописать все своими руками.

FASTApi сервис (web)

Начнем с самого главного — точки входа в нашу систему. Установим все необходимые зависимости, их, кстати, будем хранить в poetry. В web сервисе инициализируем окружение.

cd services/web && poetry init

И сразу установим зависимости

poetry add fastapi uvicorn aiokafka

Как уже говорилось раннее — мы будем использовать Apache Kafka в качестве брокера сообщений. В Python есть асинхронный пакет — aiokafka — им мы и воспользуемся.

Чтобы избежать «кучи-малы» и не мешать .py файлы с другими — внутри директории web создадим еще одну директорию app — и в ней уже будем располагать весь код.

Начнем с создания схем, здесь ничего сложного, мы ожидаем всего два параметра:

# services/web/app/schemas.py
from pydantic import BaseModel


class Message(BaseModel):
    currency_char_code: str
    telegram_id: int

Опишем нужный нам end-point:

# services/web/app/main.py
from fastapi import FastAPI
from web.schemas import Message


app = FastAPI()


@app.post("/currency-info")
async def send(message: Message):
    return message.model_dump()

Пока что будем просто возвращать то, что передал пользователь, а логику с брокером добавим позже. Далее укажем необходимые настройки и переменные, которые в будущем будем использовать для взаимодействия с брокером:

# services/web/app/settings.py
import os

KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")

PRODUCE_TOPIC = os.getenv("WEB_TOPIC")
# .env
KAFKA_BOOTSTRAP_SERVERS=kafka:9092

WEB_TOPIC=web

WEB_TOPIC — топик, куда наш web-сервис будет отправлять сообщения. В будущем Scrapy сервис будет этот топик слушать.

KAFKA_BOOTSTRAP_SERVERS — адрес сервера брокера (их может быть несколько, в нашем случае всего один).

Теперь перейдем к написанию класса-оболочки над нашим «продьюсером»:

# services/web/app/producer.py
from aiokafka import AIOKafkaProducer
from web.app import settings
import asyncio

event_loop = asyncio.get_event_loop()


class AIOWebProducer(object):
    def __init__(self):
        self.__producer = AIOKafkaProducer(
            bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
            loop=event_loop,
        )
        self.__produce_topic = settings.PRODUCE_TOPIC

    async def start(self) -> None:
        await self.__producer.start()

    async def stop(self) -> None:
        await self.__producer.stop()

    async def send(self, value: bytes) -> None:
        await self.start()
        try:
            await self.__producer.send(
                topic=self.__produce_topic,
                value=value,
            )
        finally:
            await self.stop()


def get_producer() -> AIOWebProducer:
    return AIOWebProducer()

Кто такой «продьюсер»? Говоря простым языком — это специальный класс, который может взаимодействовать с брокером сообщений и предоставляет интерфейс отправки сообщения по нужному топику.

Мы воспользовались классом AIOKafkaProducerиз модуля aiokafka. Он представляет из себя интерфейс взаимодействия с брокером от лица производителя (продьюсера) сообщений. В качестве параметров мы указали текущий цикл событий и KAFKA_BOOTSTRAP_SERVERS.

В нашем классе-обертке ничего сложного нет. Методы start и stop просто оболочки над одноименными методами класса AIOKafkaProducer. start — открывает соединение с кластером Kafka, stop — очищает все ожидающие данные и закрывает соединение.

Метод send — такая же надстройка, только она инкапсулирует создание и закрытие соединения перед отправкой сообщения по нужному топику. Также стоит отметить, что отправлять данные мы будем в виде байтов. Это следует из документации пакета и устройства самой Kafka.

Функция get_producer нужна для удобной инъекции продюсера в наш единственный end-point. Давайте как раз этим и займемся:

# web/app/main.py
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi import FastAPI, Depends
from web.app.schemas import Message
from web.app.producer import get_producer
import json

if TYPE_CHECKING:
    from web.app.producer import AIOWebProducer


app = FastAPI()


@app.post("/currency-info")
async def send(message: Message, producer: AIOWebProducer = Depends(get_producer)) -> None:
    message_to_produce = json.dumps(message.model_dump()).encode(encoding="utf-8")
    await producer.send(value=message_to_produce)

В целом, практически ничего не изменилось, кода действительно немного (2 строчки). Сначала мы сериализуем объект сообщения (переводим его в байты), после чего отправляем полученный набор байтов в назначенный топик при помощи метода send. Все.

Последнее, что нам осталось сделать для этого микросервиса — написать для него Dockerfile:

# services/web/Dockerfile

FROM python:3.11-slim-buster

WORKDIR /usr/src/web

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH=/usr/src

COPY poetry.lock pyproject.toml ./

RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root

COPY app app/

WORKDIR ./app

Currency сервис (Scrapy)

Как и до этого инициализируем окружение и добавим необходимые зависимости:

cd services/currency && poetry init
poetry add scrapy redis aiokafka

В этот раз мы не будем создавать папку app, так как Scrapy сделает это за нас и даже лучше. Вместо этого инициализируем проект при помощи команды:

scrapy startproject currency

Давайте избавимся от лишней вложенности папок currency, немного раскроем матрешку и приведем ее к такому виду:

Рис. 2. Структура файлов Scrapy сервиса

Рис. 2. Структура файлов Scrapy сервиса

Теперь поговорим о том, как будет парситься информация и куда она будет попадать. Итак, в Scrapy есть «пауки», суть которых как раз парсить данные с чего-либо. После того как данные будут получены, мы положим их в Redis, чтобы на каждый запрос пользователя не ходить в интернет. Сгенеририруем базовую комплектацию «паука», для этого введем:

scrapy genspider currency_v1 https://www.cbr.ru/scripts/xml_daily.asp

Далее в папке spiders должен был появиться каркас нашего будущего «паука». Давайте немного подредактируем его код:

# services/currency/currency/spiders/currency_v1.py
import scrapy


class CurrencyV1Spider(scrapy.Spider):
    name = "currency_v1"
    allowed_domains = ["www.cbr.ru"]
    start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"]

    def parse(self, response, **kwargs) -> None:
        currencies = response.xpath("//ValCurs//Valute")
        for currency in currencies:
            print(currency)

Итак, чтобы убедиться, что «паук» действительно что-либо парсит, предлагаю запустить его и проверить, для этого пишем в консоль:

scrapy crawl currency_v1 

Если в терминале посыпалось кучу различной информации вида — отлично, давайте подключим Redis, чтобы эта информация где-то хранилась. Для этого, во-первых, обновим .env:

KAFKA_BOOTSTRAP_SERVERS=kafka:9092

WEB_TOPIC=web

REDIS_HOST=redis

REDIS_PORT=6379

А, во-вторых, добавим необходимые настройки, чтобы наш сервис подключался к Redis:

# services/currency/currency/settings.py
import os

BOT_NAME = "currency"
SPIDER_MODULES = ["currency.spiders"]
NEWSPIDER_MODULE = "currency.spiders"
ROBOTSTXT_OBEY = True
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"

# --- REDIS SETTINGS

CURRENCY_REDIS_CACHE_TIME_IN_SECONDS = 60

REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}"

Я удалил абсолютно весь закомментированный автосгенерированный фреймворком код и добавил настройки для нашего NoSQL хранилища. Теперь в методе parse нашего паука будем сохранять поступившую информацию в Redis:

# services/currency/currency/spiders/currency_v1.py
from currency.currency import settings
import scrapy
import redis
import json


class CurrencyV1Spider(scrapy.Spider):
    name = "currency_v1"
    allowed_domains = ["www.cbr.ru"]
    start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"]

    def parse(self, response, **kwargs) -> None:
        currencies = response.xpath("//ValCurs//Valute")
        with redis.from_url(url=settings.REDIS_URL) as redis_client:
            for currency in currencies:
                redis_client.set(
                    name=self.get_currency_redis_key(currency),
                    value=self.get_currency_redis_value(currency),
                    ex=settings.CURRENCY_REDIS_CACHE_TIME_IN_SECONDS,
                )

    @staticmethod
    def get_currency_redis_key(selector) -> str:
        return selector.xpath(".//CharCode//text()").get()

    @staticmethod
    def get_currency_redis_value(selector) -> bytes:
        return json.dumps(
            {
                "currency_value": selector.xpath(".//Value//text()").get()
            }
        ).encode("utf-8")

Теперь давайте создадим какой-нибудь класс-контроллер, с помощью которого мы сможем получить курс валюты по ее коду:

# services/currency/currency/controller.py
from multiprocessing import Process, Queue
from scrapy.crawler import CrawlerProcess
from currency.currency import settings
from currency.currency.spiders.currency_v1 import CurrencyV1Spider
from redis import asyncio as aioredis
import json


class CurrencyController(object):

    def update_redis_cache(self) -> None:
        queue = Queue()
        process = Process(target=self._crawl_currency, args=(queue,))
        process.start()
        none_or_exception = queue.get()
        process.join()
        if none_or_exception is not None:
            raise none_or_exception

    @staticmethod
    def _crawl_currency(queue: Queue) -> None:
        try:
            process = CrawlerProcess()
            process.crawl(CurrencyV1Spider)
            process.start()
            queue.put(None)
        except Exception as exc:
            queue.put(exc)

    async def get_currency_info(self, currency_char_code: str) -> dict:
        if currency_info := await self.get_currency_from_redis(currency_char_code):
            return currency_info
        self.update_redis_cache()
        return await self.get_currency_from_redis(currency_char_code)
    
    @staticmethod
    async def get_currency_from_redis(char_code: str) -> dict | None:
        async with aioredis.from_url(settings.REDIS_URL) as redis_client:
            if currency_info := await redis_client.get(name=char_code):
                return json.loads(currency_info)

Можно не обращать внимание на танцы с бубном вокруг multiprocessing, просто Scrapy не позволяет запустить паук в том же самом процессе, поэтому для каждого запуска мы создаем новый. Возможно (и скорее всего) это не самое лучшее решение, однако я допускаю его в рамках данной статьи для упрощения. Итак, возвращаемся к брокеру.

В этом сервисе нам необходим как производитель, так и потребитель. Потребитель будет слушать на входящие сообщения от web сервиса, производитель отправлять notify сервису. Добавим необходимые настройки:

# services/currency/currency/settings.py
# --- SCRAPY SETTINGS
...

# --- REDIS SETTINGS
...

# --- KAFKA SETTINGS

PRODUCE_TOPIC = os.getenv("CURRENCY_TOPIC")
CONSUME_TOPIC = os.getenv("WEB_TOPIC")
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")

Не забываем про .env:

KAFKA_BOOTSTRAP_SERVERS=kafka:9092

WEB_TOPIC=web

REDIS_HOST=redis

REDIS_PORT=6379

CURRENCY_TOPIC=currency

И пишем код в main.py для прослушивания и отправки сообщений посредством брокера:

# services/currency/currency/main.py

import json
import asyncio

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from currency.currency import settings
from currency.currency.controller import CurrencyController


async def start_consumer() -> AIOKafkaConsumer:
    consumer = AIOKafkaConsumer(
        settings.CONSUME_TOPIC,
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
    )
    await consumer.start()
    return consumer


async def start_producer() -> AIOKafkaProducer:
    producer = AIOKafkaProducer(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)
    await producer.start()
    return producer


async def main() -> None:
    controller = CurrencyController()
    consumer = await start_consumer()
    producer = await start_producer()
    try:
        async for message in consumer:
            decoded_message = json.loads(message.value)
            currency_info = await controller.get_currency_info(
                currency_char_code=decoded_message.get("currency_char_code"),
            ) or dict()
            currency_info["telegram_id"] = decoded_message["telegram_id"]
            info_to_send = json.dumps(currency_info).encode(encoding="utf-8")
            await producer.send(topic=settings.PRODUCE_TOPIC, value=info_to_send)
    finally:
        await consumer.stop()
        await producer.stop()


if __name__ == '__main__':
    asyncio.run(main())

Здесь мы инициализируем производителя и потребителя, слушаем на входящие сообщения, как только оно появляется — парсим информацию о валюте и отправляем уже телеграмм боту. Еще раз: слушаем WEB_TOPIC, а производим в CURRENCY_TOPIC .

Осталось написать Dockerfile:

# services/currency/Dockerfile
FROM python:3.11-slim-buster

WORKDIR /usr/src/currency/

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH=/usr/src

COPY poetry.lock pyproject.toml ./

RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root

COPY currency currency/

WORKDIR ./currency

Notify сервис (бот)

Считаю нецелесообразно рассказывать в этой статье, как создавать своего телеграмм-бота. В интернете масса информации по этой теме. Начинаем мы с того, что у вас есть токен и вы знаете зачем он нужен.

Как и в предыдущем сервисе инициализируем poetry:

cd services/notify && poetry init

И добавим необходимые зависимости:

poetry add aiogram aiokafka

Создадим внутри директории notify новую директорию app, а в ней settings.py и добавим туда следующий код:

import os

BOT_TOKEN = os.getenv("BOT_TOKEN")

CONSUME_TOPIC = os.getenv("CURRENCY_TOPIC")

KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")

Из интересного здесь то, что в качестве прослушиваемого топика, мы используем CURRENCY_TOPIC, куда отправляет сообщение наш микро-сервис на Scrapy. Также не забудьте в .env файл добавить переменную BOT_TOKEN и указать в ней токен вашего бота. Файл с переменными окружениями по итогу должен выглядеть примерно так:

KAFKA_BOOTSTRAP_SERVERS=kafka:9092

WEB_TOPIC=web

REDIS_HOST=redis

REDIS_PORT=6379

CURRENCY_TOPIC=currency

BOT_TOKEN=#токен

Далее рядом создадим main.py, в нем будет запуск бота и прослушивание входящих сообщений:

from aiogram import Bot
from aiogram import Dispatcher
from aiokafka import AIOKafkaConsumer
from notify.app import settings

import asyncio
import json

dispatcher = Dispatcher()
BOT = Bot(token=settings.BOT_TOKEN)


async def consume() -> None:
    consumer = AIOKafkaConsumer(
        settings.CONSUME_TOPIC,
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            serialized = json.loads(msg.value)
            await BOT.send_message(
                chat_id=serialized.get("telegram_id"),
                text=serialized.get("currency_value") or "Валюта не найдена",
            )
    finally:
        await consumer.stop()


async def main() -> None:
    polling = asyncio.create_task(dispatcher.start_polling(BOT))
    consuming = asyncio.create_task(consume())
    await asyncio.gather(polling, consuming)
    print("Bot has successfully started polling")


if __name__ == "__main__":
    asyncio.run(main())

Будем отделять мух от котлет и обращать внимание на код, связанный с потреблением сообщений.

  1. Мы воспользовались классом AIOKafkaConsumer, куда передали топик, который мы слушаем, и BOOTSTRAP_SERVERS.

  2. Асинхронная функция consume состоит глобально из трех частей: устанавливаем соединение с брокером — await consumer.start(), слушаем на входящие сообщения — async for msg in consumer. Если сообщение есть — десериализуем его и отправляем пользователю в его телеграмм при помощи метода send_message.

Как вы поняли, здесь отсутствует «продьюсер», так как это конечная точка нашей системы, откуда сообщение уже отправится пользователю в телеграмм бот.

Также напишем боту Dockerfile, он почти ничем не отличается от предыдущего:

# services/notify/Dockerfile
FROM python:3.11-slim-buster

WORKDIR /usr/src/notify

ENV PYTHONDONTWRITEBYTECODE 1

ENV PYTHONUNBUFFERED 1

ENV PYTHONPATH=/usr/src

COPY poetry.lock pyproject.toml ./

RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root

COPY app app/

WORKDIR ./app

Инфраструктура

Для того, чтобы развернуть все наши шестеренки, опишем docker-compose.yml:

Hidden text

version: '3.3'

services:
  web:
    restart: on-failure
    build:
      context: services/web/
      dockerfile: Dockerfile
    container_name: "web"
    ports:
      - "8000:8000"
    environment:
      KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS
      WEB_TOPIC: $WEB_TOPIC
    command: uvicorn main:app --reload --host 0.0.0.0 --port 8000

  currency:
    restart: on-failure
    build:
      context: services/currency/
      dockerfile: Dockerfile
    container_name: "currency"
    environment:
      KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS
      WEB_TOPIC: $WEB_TOPIC
      CURRENCY_TOPIC: $CURRENCY_TOPIC
      REDIS_HOST: $REDIS_HOST
      REDIS_PORT: $REDIS_PORT
    command: python main.py

  bot:
    restart: on-failure
    build:
      context: services/notify/
      dockerfile: Dockerfile
    container_name: "notify"
    environment:
      KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS
      CURRENCY_TOPIC: $CURRENCY_TOPIC
      BOT_TOKEN: $BOT_TOKEN
    command: python main.py

  zookeeper:
    image: 'bitnami/zookeeper:3.7.0'
    container_name: zookeeper
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  
  kafka:
    image: 'bitnami/kafka:2.8.0'
    container_name: kafka
    ports:
      - "9093:9093"
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  redis:
    image: redis
    container_name: redis
    environment:
      REDIS_HOST: REDIS_HOST
    ports:
      - "6379:6379"

Момент истины — поднимаем:

docker compose up -d --build

Заранее следует проверить, что ничего не «отвалилось»:

docker ps

В выводе должно быть шесть рабочих контейнеров. Если это так — убедитесь, что вы начали диалог с ботом и, если оно тоже так, смело переходим по адресу и отправляем POST-запрос нашему единственному енд-поинту, куда передаем свой телеграмм ID и, допустим, USD. Ответом на это действие должно быть сообщение в телеграмме от нашего бота с одиноким и неприятным числом.

Заключение

Напоследок, хотелось бы сказать, что в данной статье намеренно пропущены некоторые моменты с настройкой Kafka, с ее переменными, которые можно указать в docker-compose.yml. Я не хотел перегружать статью информацией, так как, полагаю, что для человека, который будет работать с этим инструментом впервые, важнее получить практический опыт.

Я благодарен, если вы дочитали статью до этого момента. Надеюсь, я смог в понятной форме рассказать и, что главное, показать, как работает брокер на поверхностном уровне, не залезая в дебри. До новых встреч!

© Habrahabr.ru