Как я сделал Telegram-бота для студентов РТСУ

Начало

Привет, Хабр! Я учусь в Российско-Таджикском Славянском университете (на первом курсе), собственно у нас в университете действует так называемая кредитно-бальная система.

Для просмотра количества набранных баллов и так далее, у нас есть приложение которое было разработано университетом.

Оно доступно для Android.

e82b5f66b5ba1f4c95e3a6522cac9e7a.jpg

Однако, я недавно перешёл на iOS-систему, собственно к моему удивлению приложения там не оказалось.

Результаты поиска в App Store

Результаты поиска в App Store

Ну и тут, я подумал что надо бы разработать что-то типа Telegram-бота для просмотра успеваемости, в конце концов это многим должно помочь.

Да и вообще, Telegram работает везде, это моя любимая платформа для обмена сообщениями.

Поиск endpoint’ов…

Разумеется, университет сделал некий API для своего Android-фронтенда, для получения эндпоинтов не я, мой друг, декомпилировал APK файл и предоставил его мне, позже проанализировав «выхлоп» я нашел четыре необходимых мне эндпоинта

В частности эндпоинты для

  • Авторизации

  • Получения информации о профиле студента

  • Получения семестров

  • Получения данных об успеваемости по всем дисциплинам конкретного семестра.

Ну, а дальше, дело за малым, надо просто написать клиент для этого API и так далее.

Инициализация проекта с помощью Poetry, написание обёртки под API

Создаём проект

poetry init 

eeb5865502d0dd02d239c822ac5130ba.png

Я сразу создал почти на самом верхнем уровне проекта пакетник rtsu где и будет лежать наша обёртка.

Давайте посмотрим на api.py

from aiohttp import ClientSession, ContentTypeError, client_exceptions
from cashews import cache
from typing import Optional, Union, Dict, TypeVar, Type, List, Self

from pydantic import BaseModel, parse_obj_as

from .exceptions import NotAuthorizedError, RtsuContentTypeError, ServerError, AuthError
from .schemas import AuthSchema, Profile, Subject, AcademicYear

RTSU_API_BASE_URL = "https://mobile.rtsu.tj/api/v1"
P = TypeVar("P", bound=BaseModel)


class RTSUApi:
    """
    This class provides for you functionality of RTSU public API
    """

    def __init__(self, token: Optional[str] = None):
        """
        Initializes `self`
        :param token: A rtsu-api token (optional)
        """

        self._api_token = token
        self._http_client = ClientSession()

    def set_token(self, token: str):
        """
        Setups token
        :param token: A token
        :return:
        """
        self._api_token = token

    async def _make_request(
            self,
            method: str,
            url_part: str,
            response_model: Type[Union[List[BaseModel], BaseModel]],
            json: Optional[Dict[str, Union[str, int]]] = None,
            params: Optional[Dict[str, str]] = None,
            auth_required: bool = False,
    ) -> Union[P, List[P]]:
        """
        Makes call to RTSU API
        :param url_part: Part of RTSU-API url, example - /auth
        :param json: A json for sending
        :param params: URI parameters for sending
        :return: Response object
        """

        if not json:
            json = {}

        if not params:
            params = {}

        headers = {}

        if auth_required:
            if not self._api_token:
                raise NotAuthorizedError("Not authorized, use `.auth` method.")

            headers['token'] = self._api_token

        try:
            response = await self._http_client.request(
                method,
                f"{RTSU_API_BASE_URL}/{url_part}",
                json=json,
                params=params,
                headers=headers,
                ssl=False,
            )
        except (client_exceptions.ClientConnectionError, client_exceptions.ClientConnectorError) as e:
            raise ServerError(f"Connection error, details: {e}")

        if response.status != 200:
            details = await response.text()
            raise ServerError(
                f"Server returned {response.status}, details: {details}"
            )

        try:
            deserialized_data = await response.json()
        except ContentTypeError as e:
            raise RtsuContentTypeError(
                e.message,
            )

        return parse_obj_as(response_model, deserialized_data)

    async def auth(self, login: str, password: str) -> AuthSchema:
        """
        Authenticates user
        :param login: A login of user
        :param password: A password of user
        :return: RTSU token on success
        """

        try:
            response: AuthSchema = await self._make_request(
                "POST",
                "auth",
                AuthSchema,
                params={
                    "login": login,
                    "password": password,
                }
            )
        except ServerError as e:
            raise AuthError(
                f"Auth error, check login and password, message from server: {e.message}"
            )

        self._api_token = response.token

        return response

    @cache.soft(ttl="24h", soft_ttl="1m")
    async def get_profile(self) -> Profile:
        """
        Returns profile of RTSU student
        :return: `Profile`-response
        """

        return await self._make_request(
            "GET",
            "student/profile",
            Profile,
            auth_required=True,
        )

    async def get_academic_years(self) -> List[AcademicYear]:
        """
        Returns `List` with `AcademicYear` objects
        :return:
        """

        return await self._make_request(
            "GET",
            "student/academic_years",
            List[AcademicYear],
            auth_required=True,
        )

    @cache.soft(ttl="24h", soft_ttl="1m")
    async def get_academic_year_subjects(self, year_id: int) -> List[Subject]:
        """
        Returns `List` with `Subjects` of some year
        :return:
        """

        return await self._make_request(
            "GET",
            f"student/grades/{year_id}",
            List[Subject],
            auth_required=True,
        )

    async def get_current_year_id(self) -> int:
        """
        Returns identifier of current year
        :return:
        """

        years = await self.get_academic_years()

        return years[0].id

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close_session()

    def __str__(self) -> str:
        """
        Stringifies `RTSUApi` objects
        :return:
        """

        return f"{self.__class__.__name__}"

    async def close_session(self):
        """Frees inner resources"""
        await self._http_client.close()

Что тут у нас? В _make_request у нас осуществляется запрос к серверу, а также десериализация json в pydantic-схему (ну или модель?)

Прошу заметить, что я использую замечательную cashews для кеширования результатов, в частности soft-ttl который еще и сильно помогает когда сервера университета падают.

В остальных же методах я просто указываю эндпоинт и response-schema ну и дёргаю тот же _make_request

Также, тут есть методы для того чтобы закрыть текущую aiohttp-сессию, ну тут понятно, помимо этого реализованы магические методы __aenter__ и __aexit__ для того чтобы использовать клиент в withконструкциях.

Ну и set_tokenметод которое просто устанавливает значения токена для того чтобы вручную «впихнуть» токен в клиент, это пригодиться нам чуть позже.

Pydantic-схемы

Допустим, заглянем в profile.py где лежит Profile-schema

from pydantic import Field

from .base import Base


class FullName(Base):
    ru: str = Field(alias='RU')
    tj: str = Field(alias='TJ')


class Faculty(FullName):
    ...


class Speciality(FullName):
    ...


class Profile(Base):
    id: int = Field(alias='RecordBookNumber')
    full_name: FullName = Field(alias='FullName')
    faculty: Faculty = Field(alias="Faculty")
    course: int = Field(alias='Course')
    training_period: int = Field(alias='TrainingPeriod')
    level: str = Field(alias="TrainingLevel")
    entrance_year: str = Field(alias='YearUniversityEntrance')

Почему так? API возвращает мне информацию сразу на двух языках (русском и таджикском)

Собственно, это действительно свидетельствует о том, что API сделали очень криво и убого, но что поделать, тут я просто наследую каждый field от FullName чтобы не писать по два раза RU, TJ и так далее.

Также, прошу заметить то, как элегантно можно сделать field’ы pythonic при помощи pydantic-aliases

После написания всего этого, я принялся тестировать клиент, на удивление он хорошо работает и предоставляет данные в том формате, в котором они мне нужны.

Собственно тесты к этому я тоже написал

import pytest
import pytest_asyncio

from rtsu_students_bot.rtsu import RTSUApi

from .config import settings

pytest_plugins = ('pytest_asyncio',)


@pytest_asyncio.fixture()
async def rtsu_client():
    """
    Initializes client
    :return: Prepared `RTSUApi` client
    """

    async with RTSUApi() as api:
        yield api


@pytest.mark.asyncio
async def test_rtsu_login(rtsu_client: RTSUApi):
    """
    Tests rtsu login
    :param rtsu_client: A RTSU API client
    :return:
    """

    resp = await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)

    assert resp.token is not None


@pytest.mark.asyncio
async def test_rtsu_profile_fetching(rtsu_client: RTSUApi):
    """
    Tests rtsu profile fetching
    :param rtsu_client:
    :return:
    """

    await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)

    profile = await rtsu_client.get_profile()

    assert profile is not None
    assert profile.full_name is not None


@pytest.mark.asyncio
async def test_rtsu_academic_years_fetching(rtsu_client: RTSUApi):
    """
    Tests rtsu academic years fetching
    :param rtsu_client:
    :return:
    """

    await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)

    years = await rtsu_client.get_academic_years()

    assert type(years) == list
    assert len(years) > 0


@pytest.mark.asyncio
async def test_rtsu_academic_year_subjects_fetching(rtsu_client: RTSUApi):
    """
    Tests rtsu academic year fetching
    :param rtsu_client:
    :return:
    """

    await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)

    ac_years = await rtsu_client.get_academic_years()
    year = ac_years[0].id
    years = await rtsu_client.get_academic_year_subjects(year)

    assert type(years) == list
    assert len(years) > 0

Тут тесты сделаны наверное грязно и тупо, я пока не читал про best-practises в тестировании API, буду рад предложениям в комментах по этому поводу.

Ах, да, для тестов тут используется отдельный конфиг, в конце статьи я покажу как его заполнить.

На руках у нас уже есть wrapper

База данных и SQLAlchemy

Установив алхимию, я принялся создавать пакет моделей

Но для начала, покажу вам файлик database.py в котором настроено подключение к базе.

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from rtsu_students_bot.config import settings

engine = create_async_engine(
    settings.db.url,
)

SessionLocal = sessionmaker(bind=engine, class_=AsyncSession)

Тут нет ничего такого, просто вместо обычной сессии я использую асинхронную (по очевидным причинам)

Вернёмся к пакетнику с моделями.

6ee4eab56d514279e6cbf70089a82ec0.png

from sqlalchemy import Integer, Column, String, Boolean, BigInteger

from .base import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    full_name = Column(String(length=255), nullable=True)
    token = Column(String(length=600), nullable=True)
    is_authorized = Column(Boolean, default=False)
    telegram_id = Column(BigInteger)

    def __str__(self):
        return f"{self.__class__.__name__}"

Тут всё очень просто, у нас в базе будет храниться токен, telegram-id, ну и флажок который будет сообщать о том, авторизован ли пользователь.

Далее, пилим пакетник serviceкоторый будет помогать нам работать с базой данных

11c7e0fdc5dd4ab4028983023bd09e85.png

Содержимое user.py

from typing import Optional

from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession

from rtsu_students_bot.models import User

from .exceptions import UserNotFound, UserAlreadyExists


async def get_user_by_tg_id(
        session: AsyncSession,
        telegram_id: int,
) -> Optional[User]:
    """
    Returns user by tg-id
    :param session: An `AsyncSession` object
    :param telegram_id: A telegram-ID
    :return: `User` or `None`
    """

    stmt = select(User).where(User.telegram_id == telegram_id)

    result = await session.execute(stmt)

    return result.scalars().first()


async def get_user_by_id(
        session: AsyncSession,
        user_id: int,
) -> Optional[User]:
    """
    Returns user by its id
    :param session: An `AsyncSession` object
    :param user_id: An ID
    :return: `User` or `None`
    """

    stmt = select(User).where(User.id == user_id)

    result = await session.execute(stmt)

    return result.scalars().first()


async def create_user(
        session: AsyncSession,
        telegram_id: int,
        full_name: Optional[str] = None,
        token: Optional[str] = None,
):
    """
    Creates `User` object
    :param session: An `AsyncSession` object
    :param telegram_id: A telegram-id
    :param full_name: Fullname of user
    :param token: A token of user
    :return: Created `User`
    """

    existed_user = await get_user_by_tg_id(session, telegram_id)

    if existed_user is not None:
        raise UserAlreadyExists(f"User with ID {telegram_id} already exists.")

    is_authorized = token is not None

    obj = User(
        telegram_id=telegram_id,
        full_name=full_name,
        token=token,
        is_authorized=is_authorized,
    )

    session.add(obj)
    await session.flush()
    await session.refresh(obj)

    return obj


async def update_user_token(
        session: AsyncSession,
        telegram_id: int,
        token: Optional[str] = None,
) -> User:
    """
    Authorizes `User`
    :param telegram_id:
    :param session:
    :param token:
    :return:
    """

    user = await get_user_by_tg_id(session, telegram_id)

    if not user:
        raise UserNotFound(f"User with telegram-id {telegram_id} not found.")

    is_authorized = token is not None

    stmt = update(User).where(
        int(user.id) == User.id
    ).values(
        is_authorized=is_authorized,
        token=token,
    )
    await session.execute(stmt)

    return await get_user_by_tg_id(session, user.telegram_id)


async def update_user(
        session: AsyncSession,
        user_id: int,
        telegram_id: Optional[int] = None,
        full_name: Optional[str] = None,
) -> User:
    """
    Updates telegram user
    :param session:
    :param user_id:
    :param telegram_id:
    :param full_name:
    :return:
    """

    user = await get_user_by_id(session, user_id)

    if user is None:
        raise UserNotFound(f"User with ID {user_id} not found.")

    stmt = update(User).where(User.id == user_id)

    if telegram_id is not None:
        stmt = stmt.values(
            telegram_id=telegram_id,
        )

    if full_name is not None:
        stmt = stmt.values(
            full_name=full_name
        )

    await session.execute(stmt)

    return await get_user_by_id(session, user_id)


async def delete_user(session: AsyncSession, user_id: int):
    """
    Deletes `User` object
    :param user_id:
    :param session: An `AsyncSession` object
    :return:
    """

    if await get_user_by_id(session, user_id) is None:
        raise ValueError("Invalid user-id passed.")

    stmt = delete(User).where(User.id == user_id)

    await session.execute(stmt)

Здесь я реализовал обычные функции которые будут помогать мне работать с базой, хотя наверное лучше бы я применил паттерн «Репозиторий»

Тестируем созданный CRUD

import pytest
import pytest_asyncio

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from rtsu_students_bot.service import user
from rtsu_students_bot.models import Base

from .config import settings

pytest_plugins = ('pytest_asyncio',)

engine = create_async_engine(
    settings.db_url,
)

SessionLocal = sessionmaker(autoflush=True, bind=engine, class_=AsyncSession)


@pytest_asyncio.fixture()
async def session():
    """
    Initializes client
    :return: Prepared `RTSUApi` client
    """

    async with SessionLocal() as e, e.begin():
        yield e


@pytest.mark.asyncio
async def test_tables_creating():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


@pytest.mark.asyncio
async def test_user_creation(session: AsyncSession):
    """
    Tests user-creation
    :return:
    """

    user_data = {
        "full_name": "Vladimir Putin",
        "telegram_id": 1,
    }

    created_user = await user.create_user(session, **user_data)

    assert created_user.full_name == user_data.get("full_name")
    assert created_user.telegram_id == user_data.get("telegram_id")


@pytest.mark.asyncio
async def test_user_update(session: AsyncSession):
    """
    Tests user updating
    :param session:
    :return:
    """

    updating_data = {
        "full_name": "Volodymir Zelensky"
    }

    first_user = await user.get_user_by_tg_id(session, 1)

    updated_user = await user.update_user(session, first_user.id, **updating_data)

    assert first_user.id == updated_user.id
    assert first_user.telegram_id == updated_user.telegram_id
    assert updated_user.full_name == updating_data.get("full_name")


@pytest.mark.asyncio
async def test_user_token_updating(session: AsyncSession):
    """
    Tests user-token updating
    :param session:
    :return:
    """

    first_user = await user.get_user_by_tg_id(session, 1)

    assert not first_user.is_authorized

    first_user = await user.update_user_token(session, first_user.telegram_id, token="test token")

    assert first_user.is_authorized
    assert first_user.token == "test token"
    assert first_user.telegram_id == 1


@pytest.mark.asyncio
async def test_user_deleting(session: AsyncSession):
    """
    Tests user-token updating
    :param session:
    :return:
    """

    first_user = await user.get_user_by_tg_id(session, 1)

    assert first_user is not None

    await user.delete_user(session, first_user.id)

    first_user = await user.get_user_by_tg_id(session, 1)

    assert first_user is None

Тут также создается подключение к базе данных, я просто проверяю CRUD на работоспособность, собственно данные тесты помогли мне найти одну ошибку, поэтому я думаю писал я их не зря =)

Пилим самого бота

Для работы с Telegram Bot API я несомненно выбрал Aiogram ну и для этого сделал еще один пакетник.

11ccabb76e5103623d88fac3ecabe3b1.png

Middlewares

Для того чтобы «протаскивать» необходимые ресурсы к хендлерам (API-клиент и AsyncSession), мне нужен специальный для этих целей мидлварь

import logging

from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram import types
from sqlalchemy.ext.asyncio import AsyncSession

from rtsu_students_bot.rtsu import RTSUApi
from rtsu_students_bot.database import engine


class ResourcesMiddleware(BaseMiddleware):
    """
    Middleware for providing resources like db-connection and RTSU-client
    """

    def __init__(self):
        """
        Initializes self
        """

        self._logger = logging.getLogger("resources_middleware")

        super().__init__()

    @staticmethod
    async def _provide_api_client() -> RTSUApi:
        """
        Provides `RTSU` api client
        :return: Initialized client
        """

        client = RTSUApi()

        return client

    @staticmethod
    async def _provide_db_session() -> AsyncSession:
        """
        Provides `AsyncSession` object
        :return: Initialized session
        """

        session = AsyncSession(engine)

        return session

    async def _provide_resources(self) -> dict:
        """
        Initializes & provides needed resources, such as `RTSU-api-client` and `AsyncSession`
        :return:
        """
        self._logger.debug("Providing resources")
        api_client = await self._provide_api_client()
        db_session = await self._provide_db_session()

        resources = {
            "rtsu": api_client,
            "db_session": db_session,
        }

        return resources

    async def _cleanup(self, data: dict):
        """
        Closes connections & etc.
        :param data:
        :return:
        """

        self._logger.debug("Cleaning resources")

        if "db_session" in data:
            self._logger.debug("SQLAlchemy session detected, closing connection.")
            session: AsyncSession = data["db_session"]
            await session.commit()  # Commit changes
            await session.close()

        if "rtsu" in data:
            self._logger.debug("RTSU API Client detected, closing resource.")
            api_client: RTSUApi = data["rtsu"]
            await api_client.close_session()

    async def on_pre_process_message(self, update: types.Message, data: dict):
        """
        For pre-processing `types.Update`
        :param data: Data from other middlewares
        :param update: A telegram-update
        :return:
        """
        resources = await self._provide_resources()

        data.update(resources)

        return data

    async def on_pre_process_callback_query(self, query: types.CallbackQuery, data: dict):
        """
        Method for preprocessing callback-queries
        :param query: A callback-query
        :param data: A data from another middleware
        :return:
        """

        resources = await self._provide_resources()

        data.update(resources)

        return data

    async def on_post_process_callback_query(self, query: types.CallbackQuery, data_from_handler: list, data: dict):
        """
        Method for post-processing callback query
        :param data_from_handler: Data from handler
        :param query: A callback query
        :param data: A data from another middleware
        :return:
        """

        await self._cleanup(data)

    async def on_post_process_message(self, message: types.Message, data_from_handler: list, data: dict):
        """
        For post-processing message
        :param data_from_handler:
        :param message:
        :param data:
        :return:
        """
        await self._cleanup(data)

Тут все очень просто, при старте обработки сообщений/CallbackQuery я просто подгружаю ресурсы и передаю их в data

Также, после обработки всего этого дела, я вызываю _cleanup который при обнаружении сессий закроет их (ну и сделает коммит в случае с алхимией)

Получение пользователя

Для этого я сделал тоже отдельный мидлварь, который разумеется отрабатывает после первого

import logging

from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram import types
from sqlalchemy.ext.asyncio import AsyncSession

from rtsu_students_bot.service import user
from rtsu_students_bot.rtsu import RTSUApi


class UserMiddleware(BaseMiddleware):
    """
    Middleware for providing a `User` object
    """

    def __init__(self):
        """
        Initializes self
        """

        self._logger = logging.getLogger("users_middleware")
        super().__init__()

    async def _provide_user(self, user_id: int, data: dict) -> dict:
        """
        Fetches and returns user
        """

        if 'db_session' not in data:
            raise RuntimeError("AsyncSession not found.")

        if 'rtsu' not in data:
            raise RuntimeError("RTSU API client not found.")

        db_session: AsyncSession = data.get("db_session")
        rtsu_client: RTSUApi = data.get("rtsu")

        self._logger.debug(f"Getting user with ID {user_id}")

        u = await user.get_user_by_tg_id(db_session, user_id)

        if u is None:
            self._logger.debug(f"User with ID {user_id} not found, creating...")
            u = await user.create_user(db_session, telegram_id=user_id)

        self._logger.debug(f"User provided, {u}")

        # If user is authorized, lets setup `RTSU` client
        if u.is_authorized:
            rtsu_client.set_token(u.token)
            self._logger.debug("User is authorized, API-client's token initialized.")

        data["user"] = u

        return data

    async def on_pre_process_message(self, message: types.message, data: dict):
        """
        Method for preprocessing messages (provides user)
        :param message: A message
        :param data: A data from another middleware
        :return: None
        """

        return await self._provide_user(message.from_user.id, data)

    async def on_pre_process_callback_query(self, query: types.CallbackQuery, data: dict):
        """
        Method for preprocessing callback-queries (provides user)
        :param data:
        :param query:
        :return:
        """

        return await self._provide_user(query.from_user.id, data)

Конкретно тут, мы просто создаем пользователя если на нашли его в бд, помимо того, если пользователь авторизован (токен есть в базе данных) мы также инициализируем токенAPI-клиента.

Теперь у нас есть почти всё что нужно для обработки сообщений.

Шаблонизатор Jinja2

Хорошо бы выделить «шаблоны» куда-то, собственно, я решил воспользоватся шаблонизатором «Нинздя»

Для этих целей я создал файлик template_engine.py

from typing import Optional, Any, Dict

from jinja2 import Environment, PackageLoader, select_autoescape

env = Environment(
    loader=PackageLoader('rtsu_students_bot', 'templates'),
    autoescape=select_autoescape(['html'])
)


def render_template(name: str, values: Optional[Dict[str, Any]] = None, **kwargs):
    """
    Renders template & returns text
    :param name: Name of template
    :param values: Values for template (optional)
    :param kwargs: Keyword-arguments for template (high-priority)
    """

    template = env.get_template(name)

    if values:
        rendered_template = template.render(values, **kwargs)
    else:
        rendered_template = template.render(**kwargs)

    return rendered_template

Тут я инициализировал шаблонизатор и сделал функцию render_template которая просто рендерит шаблон и возвращает готовый текст.

Шаблоны

e3bd1ffa7df0a4ccf626bfb45883b55d.png

Для них я сделал отдельную директорию в проекте, там ничего интересного я думаю вы не найдете.

Вот например шаблон для отправки профиля студента.


    
            

© Habrahabr.ru