Пишем полезный сервис на Python для получении ИНН

В этой статье хочу рассказать о том, как написать полезный сервис, для получения ИНН по персональным данным (паспортные данные). ИНН физического лица получаем с использование сайта https://service.nalog.ru/. Похожая функциональность, скорее всего, уже где-то и кем-то была реализована. Основная идея статьи — поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB. Работа с клиентами сервиса реализована через RabbitMQ в режиме непрерывного чтения очереди, отправкой результата в выходную очередь. Сервис будет жить в Kubernetes, что требует наличие liveness и readiness проб. Для этого используется веб-сервер.

Фото by Christina Morillo from Pexels

Фото by Christina Morillo from Pexels

Общие сведения

Сервис будем реализовывать на Python 3.10 с использованием библиотек aio-pika, fastapi, pydantic, motor и других библиотек, которые будут указаны в pyproject.toml проекта. В качестве базы данных используем MongoDB 4+. Обращение к сервису налоговой выполняется при помощи библиотеки aiohttp. Проект размещён в публичном доступе на GitHub.

Приложение функционирует как слушатель входной очереди и веб-сервер для отдачи liveness и readiness-проб. При получении сообщения в очередь, из заголовка reply-to вычитывается имя выходной очереди, в которую будет направлен ответ. Обработка запроса передаётся в сервис, который проверяет наличие похожего запроса в базе данных. В случае отсутствия данных по клиенту, выполняется запрос к внешнему сервису. Внешний сервис может обработать какое-то количество сообщений без запроса капчи. После превышения лимитов, которые доподлинно не известны (но изменяются при общей повышенной нагрузке), сообщение помещается в мёртвую очередь и через указанное в настройках время возвращается в обработку.

Подготовительные работы для базы данных не требуется. При первом подключении к MongoDB будут созданы необходимые коллекции и индексы.

Контракт общения с сервисом

Определим контракт входного сообщения в формате JSON:

Hidden text

{
	 "requestId": str,
	 "firstName": str,
	 "lastName": str,
	 "middleName": str,
	 "birthDate": date,
	 "documentSerial": str,
	 "documentNumber": str,
	 "documentDate": date
}

Все поля интуитивно понятны. Атрибут requestId должен быть уникален в пределах всех сообщений, имеет смысл передавать его как строковое представление GUID.

Имя выходной очереди может передаваться через поле reply-to заголовка сообщения.

Контракт выходного сообщения будет следующим:

Hidden text

{
	"requestId": str,
	"inn": str,
	"cached": bool,
	"details": str,
	"elapsedTime": float
}

В ответе будем отдавать код запроса, собственно ИНН и время, за которое отработал сервис запрос и признак кэшированного ответа.

Структура проекта

Общая структура директорий проекта следующая.

src
  |--inn_service
    |--clients
    |--connection_managers
    |--core
    |--infrastructure
       |--controllers
       |--handlers
       |--http_server
       |--queue_manager                     
    |--models    
    |--repositories
    |--serializers
    |--services    
  main.py
.env.example
.gitignore
docker-compose.yaml
pyproject.yaml

В корневой директории будут размещаться инструменты запуска проекта: docker-compose, make-файл запуска линтинга и тестов. Собственно проект размещён в src/inn_service и содержит:

  • clients — клиенты для подключения к действительным поставщикам данных (nalog.ru и прочие);

  • connection_managers — инфраструктурные подключения к базе данных, очередям;

  • core — общий код приложения (собственно приложение, контейнер);

  • infrastructure — менеджер обработчиков очередей, сами обработчики, инфраструктурные контроллеры;

  • models — моделей приложения, DTO-объекты;

  • repositories — репозиторий для работы с базой данных;

  • serializers — сериализаторы входных запросов, данных для отправки в провайдер ИНН;

  • services — сервисы приложения.

Работу по созданию виртуального подключения переложим на PyCharm и poetry. Краткая команда установки: poetry install.

Настройки приложения

Начнём разработку с создания настроек приложения, используя BaseSettings из пакета pydantic.

В файле settings.py будут находиться настройки.

Hidden text

class Settings(BaseSettings):  
    app_name: str = 'INN service'  
    app_request_retry_times: int  # Количество попыток обработки внешнего запроса  
    app_request_retry_sec: int  # Время задержки в секундах перед повторной обработкой запроса  
  
    http_host: str  
    http_port: int  
    http_handler: str = 'asyncio'  
  
    mongo_host: str  
    mongo_port: str  
    mongo_user: str  
    mongo_pass: str  
    mongo_name: str  
    mongo_rs: Optional[str] = None  
    mongo_auth: str  
    mongo_timeout_server_select: int = 5000  
  
    rabbitmq_host: str  
    rabbitmq_port: int  
    rabbitmq_user: str  
    rabbitmq_pass: str  
    rabbitmq_vhost: str  
    rabbitmq_exchange_type: str  
    rabbitmq_prefetch_count: int  
    rabbitmq_source_queue_name: str  
  
    client_nalog_url: str  # Адрес внешнего сервиса для получения ИНН  
    client_nalog_timeout_sec: int  # Таймаут ожидания ответа от сервиса  
    client_nalog_retries: int  # Количество попыток запросов к внешнему сервису  
    client_nalog_wait_sec: int  # Время ожидания между попытками client_nalog_retries  
  
    @property  
    def mongo_dsn(self) -> str:  
        mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(  
            self.mongo_user,  
            self.mongo_pass,  
            self.mongo_host,  
            self.mongo_port,  
            self.mongo_auth  
        )  
  
        if self.mongo_rs:  
            mongo_dsn += f'?replicaSet={self.mongo_rs}'  
  
        return mongo_dsn  
  
    @property  
    def rabbitmq_dsn(self) -> str:  
        return 'amqp://{}:{}@{}:{}/{}'.format(  
            self.rabbitmq_user,  
            self.rabbitmq_pass,  
            self.rabbitmq_host,  
            self.rabbitmq_port,  
            self.rabbitmq_vhost  
        )

Предлагаю не указывать значения по умолчанию для настроек. Если что-то пойдёт не так, то сразу увидим проблему. В этот момент можно подготовить сразу и файл .env.example, содержащий настройки по-умолчанию для сервиса.

Подключения к инфраструктуре

Создадим слой подключения к инфраструктуре rabbitmq, mongodb через компоненты aio-pika и motor:

poetry add motor aio-pika fast fastapi uvicorn injector

Слой подключения будет размещаться в connection_managers и предназначен для организация подключения к базе данных и менеджеру очередей. Добавим две миксины для создания механизма регистрации автозапуска и завершения приложения. Механизм автозапуска функций применяется при старте приложения для инициализации подключения к RabbitMQ и MongoDB, а также для создания индексов в коллекции базы данных. В случае возникновения ошибок при подключении, приложение не стартует и выдаётся ошибка в логи.

Hidden text

class StartupEventMixin(ABC):  
  
    @abstractmethod  
    def startup(self) -> Coroutine:  
        raise NotImplementedError  
  
  
class ShutdownEventMixin(ABC):  
  
    @abstractmethod  
    def shutdown(self) -> Coroutine:  
        raise NotImplementedError

На примере RabbitConnectionManager продемонстрируем реализацию.

Hidden text

class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):  
    def startup(self) -> Coroutine:  
        return self.create_connection()

	def shutdown(self) -> Coroutine:  
        return self.close_connection()  
  
	async def create_connection(self) -> None:  
	    self.logger.info('Create connection RabbitMQ')  
	    try:  
	        self._connection = await connect_robust(self._dsn)  
	        self._connection.reconnect_callbacks.add(self.on_connection_restore)  
	        self._connection.close_callbacks.add(self.on_close_connection)  
	        self.connected = True  
	    except ConnectionError as exc:  
	        err_message = f'Rabbit connection problem: {exc}'  
	        self.logger.error(err_message)  
	        raise ConnectionError(err_message)  
	  
	async def close_connection(self) -> None:  
	    if self._connection:  
	        await self._connection.close()
	
	# ... некоторый код пропущен, полная версия на гитхабе
	
	def on_close_connection(self, *args):  
	    self.logger.error('Lost connection to RabbitMQ...')  
	    self.connected = False  
	  
	def on_connection_restore(self, *args):  
	    self.logger.info('Connection to RabbitMQ has been restored...')  
	    self._channel = None  
	    self._exchange = None  
	    self.connected = True

При подключении к RabbitMQ устанавливаются функции коллбэков для реагирования на потерю соединения и его восстановление.

Менеджер обработчиков

Менеджер обработчиков предназначен для управления слушателями (consumers) очередей. В проекте используется концепция «мёртвых очередей», которая позволяет отложить сообщение на некоторое время и вернуться к его обработке позже. Причиной для этого может являться долгий ответ от провайдера, временные ошибки провайдера, требование ввода капчи из-за нагрузки. Достаточно подробно механизм мёртвых очередей технически разобран в статье Отложенные ретраи силами RabbitMQ. Каждый обработчик очереди должен хранить и возвращать признак использования ретраев, время между возвратами в основную очередь на обработку, а также имя очереди, которую планирует слушать. Основной код обработчика находится в run_handler. От функции ожидается True при успешном обработке либо непоправимой ошибке запроса (некорректное тело сообщения) и False, если запрос не удалось обработать, но следует повторить позднее.

Код базового обработчика:

Hidden text

class BaseHandler(ABC):  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            rabbitmq_connection: RabbitConnectionManager  
    ) -> None:  
        self.settings = settings  
        self.logger = logger  
        self.rabbitmq_connection = rabbitmq_connection  
  
    @abstractmethod  
    def get_use_retry(self) -> bool:  
        raise NotImplementedError  
  
    def get_retry_ttl(self) -> int:  
        return 0  
  
    @abstractmethod  
    def get_source_queue(self) -> str:  
        raise NotImplementedError  
  
    def convert_seconds_to_mseconds(self, value: int) -> int:  
        return value * 1000  
  
    @abstractmethod  
    async def run_handler(  
            self,  
            message: dict,  
            request_id: Optional[str],  
            result_queue: Optional[str],  
            count_retry: Optional[int] = 0  
    ) -> bool:  
        raise NotImplementedError

Собственно единственный наследник класса RequestHandler, реализующий приём и обработку сообщения:

Hidden text

class RequestHandler(BaseHandler):  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            rabbitmq_connection: RabbitConnectionManager,  
            service: InnService  
    ) -> None:  
        super().__init__(settings, logger, rabbitmq_connection)  
        self.source_queue_name = self.settings.rabbitmq_source_queue_name  
        self.retry_times = self.settings.app_request_retry_times  
        self.retry_sec = self.settings.app_request_retry_sec  
        self.service = service  
  
    def get_source_queue(self) -> str:  
        return self.source_queue_name  
  
    def get_use_retry(self) -> bool:  
        return True  
  
    def get_retry_ttl(self) -> int:  
        return self.retry_sec  
  
    async def run_handler(  
            self,  
            message: dict,  
            request_id: Optional[str],  
            result_queue: Optional[str],  
            count_retry: Optional[int] = 0  
    ) -> bool:  
        if count_retry > self.retry_times:  
            self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')  
            return True  
  
        self.logger.info(f'Get request {request_id} for response {result_queue}')  
  
        client_data = RequestSerializer.parse_obj(message)  
  
        response = await self.service.get_client_inn(client_data)  
  
        if result_queue:  
            json_message = response.dict()  
            await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)  
  
        return True

При получении сообщения проверяем количество повторного попадания в очередь через параметр count_retry. В случае превышения — отправляем статус обработки сообщения (ошибку) в выходную очередь и приостанавливаем обработку данного сообщения. RequestSerializer.parse_obj(message) не обёрнут в try…except блок потому как менеджер очередей контролирует ошибки преобразования сообщений ValidationError.

Работа с базой данных

Выбор на MongoDB пал из-за простоты использования, отсутствия миграций, гибкой схемы обработки данных. В задаче нет необходимости в хранении зависимых данных, оформление связей между таблицами. Для работы с данными будем использовать паттерн Репозиторий.

В базовом репозитории расположены функции работы с данными, индексами в нотации Mongo, а в конкретных классах реализуем необходимые сервису функции. Создание индексов выполняется при старте приложения в фоновом режиме (флаг background), для чего используется имплементация миксины StartupEventMixin. Запросы набора данных поддерживают пагинацию и сортировку.

Конкретный класс создаётся на каждую отдельную коллекцию. В проекте один репозиторий для клиентских запросов. Модель для хранения данных находится в директории models и называется ClientDataModel. Клиентская модель создана с типизацией, поддерживаемой MongoDB (datetime вместо date), для атрибута created_at указана функция генерации значения по умолчанию через default_factory. Также в модель добавлена функция подсчёта времени обработки запроса elapsed_time и метод класса для создания объекта из клиентского запроса.

Hidden text

class ClientDataModel(BaseModel):  
    created_at: datetime = Field(default_factory=datetime.utcnow)  
    request_id: str  
    first_name: str  
    last_name: str  
    middle_name: str  
    birth_date: datetime  
    birth_place: str = Field(default='')  
    passport_num: str  
    document_date: datetime  
    executed_at: Optional[datetime]  
    inn: Optional[str]  
    error: Optional[str]  
  
    @classmethod  
    def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':  
        return ClientDataModel(  
            request_id=request.request_id,  
            first_name=request.first_name,  
            last_name=request.last_name,  
            middle_name=request.middle_name,  
            birth_date=datetime.combine(request.birth_date, datetime.min.time()),  
            passport_num='{} {}'.format(request.document_serial, request.document_number),  
            document_date=datetime.combine(request.document_date, datetime.min.time()),  
        )  
  
    @property  
    def elapsed_time(self) -> float:  
        end = self.executed_at or datetime.utcnow()  
        return (end - self.created_at).total_seconds()

Код базового репозитория:

Hidden text

class BaseRepository(StartupEventMixin):  
  
    def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:  
        self.mongodb_connection_manager = mongodb_connection_manager  
        self.db_name = setting.mongo_name  
  
    @property  
    def collection_name(self) -> str:  
        raise NotImplementedError  
  
    @property  
    def collection_indexes(self) -> Iterable[IndexDef]:  
        raise NotImplementedError  
  
    def startup(self) -> Coroutine:  
        return self.create_indexes()  
  
    async def create_index(self, field_name: str, sort_id: int) -> None:  
        connection = await self.mongodb_connection_manager.get_connection()  
        collection = connection[self.db_name][self.collection_name]  
        await collection.create_index([(field_name, sort_id), ], background=True)  
  
    async def create_indexes(self) -> None:  
        tasks = []  
        for index_item in self.collection_indexes:  
            tasks.append(self.create_index(index_item.name, index_item.sort))  
        asyncio.ensure_future(asyncio.gather(*tasks))  
  
    async def get_one_document(self, criteria: dict) -> Optional[dict]:  
        connection = await self.mongodb_connection_manager.get_connection()  
        collection = connection[self.db_name][self.collection_name]  
        return await collection.find_one(criteria)  
  
    async def get_list_document(  
            self,  
            criteria: dict,  
            sort_criteria: Optional[list] = None,  
            limit: Optional[int] = 0,  
            skip: Optional[int] = 0,  
    ) -> List[dict]:  
        if not sort_criteria:  
            sort_criteria = []  
        connection = await self.mongodb_connection_manager.get_connection()  
        cursor = connection[self.db_name][self.collection_name].find(  
            criteria,  
            limit=limit,  
            skip=skip,  
            sort=sort_criteria  
        )  
  
        result = list()  
        async for data in cursor:  
            result.append(data)  
        return result  
  
    async def save_document(self, data: dict) -> str:  
        connection = await self.mongodb_connection_manager.get_connection()  
        result = await connection[self.db_name][self.collection_name].insert_one(data)  
        return result.inserted_id  
  
    async def update_document(self, criteria: dict, data: dict) -> None:  
        connection = await self.mongodb_connection_manager.get_connection()  
        await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})

Сервисный слой

Сервисный слой выполняет всю необходимую обработку с данными.

  • обращение в базу данных для поиска аналогичного запроса (request_id и паспортные данные);

  • отдать результат, если данные были найдены;

  • выполнить запрос к API;

  • сохранить результат запроса в базу данных;

  • вернуть ответ.

В сервисном слое попытался абстрагироваться от работы с инфраструктурой. Возврат ответа производится в вызывающую функцию, которая должна знать куда вернуть ответ. В данном случае, менеджер очередей «знает» куда ему ответить благодаря наличию поля reply-to в заголовке запроса. Возвращаемое значение оформлено в виде DTO-объекта (RequestDTO).

Код класса InnService:

Hidden text

class InnService:  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            client: NalogApiClient,  
            storage: RequestRepository  
    ) -> None:  
        self.settings = settings  
        self.logger = logger  
        self.client = client  
        self.storage_repository = storage  
  
    async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:  
        client_passport = f'{client_data.document_serial} {client_data.document_number}'  
        client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)  
        return client_request  
  
    def update_status(self, model: RequestModel, inn: str, error: str) -> None:  
        model.inn = inn  
        model.error = error  
  
    async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:  
        """Получение клиентского ИНН"""  
        start_process = datetime.utcnow()  
        model = RequestModel.create_from_request(client_data)  
  
        # Получить данные из БД  
        existing_data = await self.get_client_inn_from_storage(client_data)  
        if existing_data:  
            elapsed_time = (datetime.utcnow() - start_process).total_seconds()  
            return RequestDTO(  
                request_id=client_data.request_id,  
                inn=existing_data.inn,  
                elapsed_time=elapsed_time,  
                cashed=True  
            )  
  
        # Сделать фактический запрос в Nalog API  
        request = NalogApiRequestSerializer.create_from_request(client_data)  
        error, result = None, ''  
        try:  
            result = await self.client.send_request_for_inn(request)  
        except NalogApiClientException as exception:  
            self.logger.error('Error request to Nalog api service', details=str(exception))  
            error = str(exception)  
  
        self.update_status(model, result, error)  
        await self.storage_repository.save_request(model)  
  
        return RequestDTO(  
            request_id=model.request_id,  
            inn=model.inn,  
            details=model.error,  
            elapsed_time=model.elapsed_time  
        )

Второй сервис в приложении — это сервис опроса инфраструктуры для health-check. Инфраструктурные менеджеры, которые необходимо мониторить, должны наследоваться от миксины EventLiveProbeMixin и реализовать функцию is_connected.

Клиент

Клиент NalogApiClient предназначен для выполнения POST запроса к https://service.nalog.ru/inn.do и разбора статуса ответа. Функция непосредственного оформления запроса обёрнута в retry декоратор повторителя запроса при возникновении ошибок. Настройки повторителя в общих настройках приложения.

Hidden text

class NalogApiClient:  
    CLIENT_EXCEPTIONS = (  
        NalogApiClientException,  
        aiohttp.ClientProxyConnectionError,  
        aiohttp.ServerTimeoutError,  
    )  
  
    def __init__(self, settings: Settings, logger: AppLogger):  
        self.nalog_api_service_url = settings.client_nalog_url  
        self.request_timeout = settings.client_nalog_timeout_sec  
        self.retries_times = settings.client_nalog_retries  
        self.retries_wait = settings.client_nalog_wait_sec  
        self.logger = logger  
        self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)  
  
    @property  
    def _headers(self):  
        return {  
            "Accept": "application/json, text/javascript, */*; q=0.01",  
            "Accept-Language": "ru-RU,ru",  
            "Connection": "keep-alive",  
            "Origin": "https://service.nalog.ru",  
            "Referer": self.nalog_api_service_url,  
            "Sec-Fetch-Dest": "empty",  
            "Sec-Fetch-Mode": "cors",  
            "Sec-Fetch-Site": "same-origin",  
            "Sec-GPC": "1",  
            "X-Requested-With": "XMLHttpRequest",  
        }  
  
    async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:  
        self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')  
  
        form_data = nalog_api_request.dict(by_alias=True)  
  
        @retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)  
        async def make_request(client_session: aiohttp.ClientSession):  
            async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:  
                if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:  
                    response_text = await response.text()  
                    raise NalogApiClientException(response_text)  
                data = await response.json()  
                code = data.get('code')  
                captcha_required = data.get('captchaRequired')  
                if captcha_required:  
                    raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')  
                if code == 0:  
                    return 'no inn'  
                elif code == 1:  
                    return data.get('inn')  
                else:  
                    raise NalogApiClientException(f'Unable to parse response! Details: {response}')  
  
        async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:  
            return await make_request(session)

Контейнер

Контейнер предназначен для сборки необходимых зависимостей и передачи их в приложение. Наш контейнер собран в классе ApplicationContainer. Все зависимости пробрасываются в виде синглтонов @singleton и регистрируются как провайдеры зависимостей типов @provider предоставляемых библиотекой injector. При написании тестов необходимо подготовить другой контейнер с актуальными fake или stub-объектами.

Основной интерес по работе с контейнером сосредоточен в классе ContainerManager, который используется для проверки реализации миксин EventSubscriberMixin и EventLiveProbeMixin. Функция get_event_collection формирует списки функций обратного вызова для старта и выхода из приложения. Собственно проход по спискам и вызов функций обратного вызова реализован в функциях: run_startup и run_shutdown.

Hidden text

class ContainerManager:  
  
    def __init__(self, cls_container: Type[Container]) -> None:  
        self._container = Injector(cls_container())  
        self._bindings = self._container.binder._bindings  
  
    def get_container(self) -> Injector:  
        return self._container  
  
    def get_live_probe_handlers(self) -> List[Type[Callable]]:  
        result = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, EventLiveProbeMixin):  
                binding_obj = self._container.get(binding)  
                result.append(binding_obj.is_connected)  
        return result  
  
    def get_startup_handlers(self):  
        handlers = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, StartupEventMixin):  
                binding_obj = self._container.get(binding)  
                handlers.append(binding_obj.startup())  
        return handlers  
  
    def get_shutdown_handlers(self):  
        handlers = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, ShutdownEventMixin):  
                binding_obj = self._container.get(binding)  
                handlers.append(binding_obj.shutdown())  
        return handlers  
  
    async def run_startup(self) -> None:  
        exception = None  
        for handler in self.get_startup_handlers():  
            if exception:  
                handler.close()  
            else:  
                try:  
                    await handler  
                except Exception as exc:  
                    exception = exc  
  
        if exception is not None:  
            raise exception  
  
    async def run_shutdown(self) -> None:  
        handlers = []  
        for handler in self.get_shutdown_handlers():  
            handlers.append(handler)  
        await asyncio.gather(*handlers)

Собственно сам контейнер, в котором производится инициализация нужных экземпляров классов. При написании тестов будет создан аналогичный контейнер.

Hidden text

class ApplicationContainer(Container):  
  
    @singleton  
    @provider    
    def provide_settings(self) -> Settings:  
        return Settings()  
  
	# ... немного кода пропущено

    @singleton  
    @provider    
    def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:  
        return MongoConnectionManager(settings, logger)  
  
    @singleton  
    @provider    
    def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:  
        return RabbitConnectionManager(settings, logger)  
  
    @singleton  
    @provider    
    def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:  
        return NalogApiClient(settings, logger)  
  
    @singleton  
    @provider    
    def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:  
        return RequestRepository(mongo_connection, settings)

Приложение

Основная задача приложения — собрать всё воедино и запустить общий поток выполнения. Код сборки приложения предельно простой, инициализацию классов выполняет менеджер контейнера. Сборка приложения выполняется следующими шагами:

  • получение контейнера, передача его в менеджер контейнеров;

  • инициализация event_loop;

  • добавление обработчиков для очередей;

  • запуск инициализаторов для инфраструктурного слоя (реализующих startup миксины);

  • запуск веб-сервера FastAPI для отдачи health-check;

  • включение глобального обработчика ошибок.

Hidden text

class Application:  
  
    def __init__(self, cls_container: Type[Container]) -> None:  
        self.loop = asyncio.get_event_loop()  
        self.container_manager = ContainerManager(cls_container)  
        self.container = self.container_manager.get_container()  
        self.settings = self.container.get(Settings)  
        self.logger = self.container.get(AppLogger)  
        self.live_probe_service = self.container.get(LiveProbeService)  
        self.queue_manager = self.container.get(QueueManager)  
        self.app_name = self.settings.app_name  
        self.http_server = None  
  
    def init_application(self):  
        self.http_server = ServerAPIManager(self.container)  
  
        request_handler = self.container.get(RequestHandler)  
        self.queue_manager.add_handler(request_handler)  
  
        live_probe_handlers = self.container_manager.get_live_probe_handlers()  
        for handler in live_probe_handlers:  
            self.live_probe_service.add_component(handler)  
  
    def run(self) -> None:  
        self.logger.info(f'Starting application {self.app_name}')  
  
        self.init_application()  
  
        try:  
            self.loop.run_until_complete(self.container_manager.run_startup())  
  
            tasks = asyncio.gather(  
                self.http_server.serve(),  
                self.queue_manager.run_handlers_async(),  
            )  
            self.loop.run_until_complete(tasks)  
  
            self.loop.run_forever()  
        except BaseException as exception:  
            exit(1)  
        finally:  
            self.loop.run_until_complete(self.container_manager.run_shutdown())  
  
            self.loop.close()  
            self.logger.info('Application disabled')

Приложение стартует из main-скрипта с использованием небольшой библиотеки typer. Маленькая библиотека имеет возможность удобно обрабатывать параметры командной строки.

Hidden text

import typer  
from core.application import Application  
from app_container import ApplicationContainer  
  
  
def main():  
    try:  
        application = Application(ApplicationContainer)  
        application.run()  
    except BaseException as exc:  
        typer.echo(f'Error starting application. Details: {str(exc)}')  
  
  
if __name__ == "__main__":  
    typer.run(main)

Как это всё запустить?

Проект содержит файл docker-compose для сборки. Необходимо скопировать файл .env.example в файл .env .

docker compose build
docker compose up

После выполнения этих команд, будет запущен экземпляр mongodb на 27017 порту и rabbitmq на 5672 порту с админкой на 15672. В административную панель RabbitMQ можно зайти по адресу http://localhost:15672. В разделе очередей необходимо создать новую очередь, в которую будут направляться результаты работы сервиса и прибиндить её к exchange по умолчанию (direct).

Продолжение следует

В статье рассмотрена тема разработки приложения на Python с использованием очередей, контейнером зависимостей и поддержкой health-check. Предлагаю обсудить архитектуру в комментариях, а затем продолжить развивать сервис. Следующими итерациями планирую добавить гипотетического не бесплатного клиента, которого будем использовать после определённого количества запросов в бесплатный сервис. И в завершении написать тесты.

Материалы, которые могут быть полезны для понимания материала:

© Habrahabr.ru