Как я сделал Telegram-бота для студентов РТСУ
Начало
Привет, Хабр! Я учусь в Российско-Таджикском Славянском университете (на первом курсе), собственно у нас в университете действует так называемая кредитно-бальная система.
Для просмотра количества набранных баллов и так далее, у нас есть приложение которое было разработано университетом.
Оно доступно для Android.
Однако, я недавно перешёл на iOS-систему, собственно к моему удивлению приложения там не оказалось.
Результаты поиска в App Store
Ну и тут, я подумал что надо бы разработать что-то типа Telegram-бота для просмотра успеваемости, в конце концов это многим должно помочь.
Да и вообще, Telegram работает везде, это моя любимая платформа для обмена сообщениями.
Поиск endpoint’ов…
Разумеется, университет сделал некий API для своего Android-фронтенда, для получения эндпоинтов не я, мой друг, декомпилировал APK файл и предоставил его мне, позже проанализировав «выхлоп» я нашел четыре необходимых мне эндпоинта
В частности эндпоинты для
Авторизации
Получения информации о профиле студента
Получения семестров
Получения данных об успеваемости по всем дисциплинам конкретного семестра.
Ну, а дальше, дело за малым, надо просто написать клиент для этого API и так далее.
Инициализация проекта с помощью Poetry, написание обёртки под API
Создаём проект
poetry init
Я сразу создал почти на самом верхнем уровне проекта пакетник rtsu
где и будет лежать наша обёртка.
Давайте посмотрим на api.py
from aiohttp import ClientSession, ContentTypeError, client_exceptions
from cashews import cache
from typing import Optional, Union, Dict, TypeVar, Type, List, Self
from pydantic import BaseModel, parse_obj_as
from .exceptions import NotAuthorizedError, RtsuContentTypeError, ServerError, AuthError
from .schemas import AuthSchema, Profile, Subject, AcademicYear
RTSU_API_BASE_URL = "https://mobile.rtsu.tj/api/v1"
P = TypeVar("P", bound=BaseModel)
class RTSUApi:
"""
This class provides for you functionality of RTSU public API
"""
def __init__(self, token: Optional[str] = None):
"""
Initializes `self`
:param token: A rtsu-api token (optional)
"""
self._api_token = token
self._http_client = ClientSession()
def set_token(self, token: str):
"""
Setups token
:param token: A token
:return:
"""
self._api_token = token
async def _make_request(
self,
method: str,
url_part: str,
response_model: Type[Union[List[BaseModel], BaseModel]],
json: Optional[Dict[str, Union[str, int]]] = None,
params: Optional[Dict[str, str]] = None,
auth_required: bool = False,
) -> Union[P, List[P]]:
"""
Makes call to RTSU API
:param url_part: Part of RTSU-API url, example - /auth
:param json: A json for sending
:param params: URI parameters for sending
:return: Response object
"""
if not json:
json = {}
if not params:
params = {}
headers = {}
if auth_required:
if not self._api_token:
raise NotAuthorizedError("Not authorized, use `.auth` method.")
headers['token'] = self._api_token
try:
response = await self._http_client.request(
method,
f"{RTSU_API_BASE_URL}/{url_part}",
json=json,
params=params,
headers=headers,
ssl=False,
)
except (client_exceptions.ClientConnectionError, client_exceptions.ClientConnectorError) as e:
raise ServerError(f"Connection error, details: {e}")
if response.status != 200:
details = await response.text()
raise ServerError(
f"Server returned {response.status}, details: {details}"
)
try:
deserialized_data = await response.json()
except ContentTypeError as e:
raise RtsuContentTypeError(
e.message,
)
return parse_obj_as(response_model, deserialized_data)
async def auth(self, login: str, password: str) -> AuthSchema:
"""
Authenticates user
:param login: A login of user
:param password: A password of user
:return: RTSU token on success
"""
try:
response: AuthSchema = await self._make_request(
"POST",
"auth",
AuthSchema,
params={
"login": login,
"password": password,
}
)
except ServerError as e:
raise AuthError(
f"Auth error, check login and password, message from server: {e.message}"
)
self._api_token = response.token
return response
@cache.soft(ttl="24h", soft_ttl="1m")
async def get_profile(self) -> Profile:
"""
Returns profile of RTSU student
:return: `Profile`-response
"""
return await self._make_request(
"GET",
"student/profile",
Profile,
auth_required=True,
)
async def get_academic_years(self) -> List[AcademicYear]:
"""
Returns `List` with `AcademicYear` objects
:return:
"""
return await self._make_request(
"GET",
"student/academic_years",
List[AcademicYear],
auth_required=True,
)
@cache.soft(ttl="24h", soft_ttl="1m")
async def get_academic_year_subjects(self, year_id: int) -> List[Subject]:
"""
Returns `List` with `Subjects` of some year
:return:
"""
return await self._make_request(
"GET",
f"student/grades/{year_id}",
List[Subject],
auth_required=True,
)
async def get_current_year_id(self) -> int:
"""
Returns identifier of current year
:return:
"""
years = await self.get_academic_years()
return years[0].id
async def __aenter__(self) -> Self:
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_session()
def __str__(self) -> str:
"""
Stringifies `RTSUApi` objects
:return:
"""
return f"{self.__class__.__name__}"
async def close_session(self):
"""Frees inner resources"""
await self._http_client.close()
Что тут у нас? В _make_request
у нас осуществляется запрос к серверу, а также десериализация json в pydantic-схему (ну или модель?)
Прошу заметить, что я использую замечательную cashews для кеширования результатов, в частности soft-ttl который еще и сильно помогает когда сервера университета падают.
В остальных же методах я просто указываю эндпоинт и response-schema
ну и дёргаю тот же _make_request
Также, тут есть методы для того чтобы закрыть текущую aiohttp-сессию, ну тут понятно, помимо этого реализованы магические методы __aenter__
и __aexit__
для того чтобы использовать клиент в with
конструкциях.
Ну и set_token
метод которое просто устанавливает значения токена для того чтобы вручную «впихнуть» токен в клиент, это пригодиться нам чуть позже.
Pydantic-схемы
Допустим, заглянем в profile.py
где лежит Profile-schema
from pydantic import Field
from .base import Base
class FullName(Base):
ru: str = Field(alias='RU')
tj: str = Field(alias='TJ')
class Faculty(FullName):
...
class Speciality(FullName):
...
class Profile(Base):
id: int = Field(alias='RecordBookNumber')
full_name: FullName = Field(alias='FullName')
faculty: Faculty = Field(alias="Faculty")
course: int = Field(alias='Course')
training_period: int = Field(alias='TrainingPeriod')
level: str = Field(alias="TrainingLevel")
entrance_year: str = Field(alias='YearUniversityEntrance')
Почему так? API возвращает мне информацию сразу на двух языках (русском и таджикском)
Собственно, это действительно свидетельствует о том, что API сделали очень криво и убого, но что поделать, тут я просто наследую каждый field от FullName чтобы не писать по два раза RU, TJ и так далее.
Также, прошу заметить то, как элегантно можно сделать field’ы pythonic при помощи pydantic-aliases
После написания всего этого, я принялся тестировать клиент, на удивление он хорошо работает и предоставляет данные в том формате, в котором они мне нужны.
Собственно тесты к этому я тоже написал
import pytest
import pytest_asyncio
from rtsu_students_bot.rtsu import RTSUApi
from .config import settings
pytest_plugins = ('pytest_asyncio',)
@pytest_asyncio.fixture()
async def rtsu_client():
"""
Initializes client
:return: Prepared `RTSUApi` client
"""
async with RTSUApi() as api:
yield api
@pytest.mark.asyncio
async def test_rtsu_login(rtsu_client: RTSUApi):
"""
Tests rtsu login
:param rtsu_client: A RTSU API client
:return:
"""
resp = await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
assert resp.token is not None
@pytest.mark.asyncio
async def test_rtsu_profile_fetching(rtsu_client: RTSUApi):
"""
Tests rtsu profile fetching
:param rtsu_client:
:return:
"""
await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
profile = await rtsu_client.get_profile()
assert profile is not None
assert profile.full_name is not None
@pytest.mark.asyncio
async def test_rtsu_academic_years_fetching(rtsu_client: RTSUApi):
"""
Tests rtsu academic years fetching
:param rtsu_client:
:return:
"""
await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
years = await rtsu_client.get_academic_years()
assert type(years) == list
assert len(years) > 0
@pytest.mark.asyncio
async def test_rtsu_academic_year_subjects_fetching(rtsu_client: RTSUApi):
"""
Tests rtsu academic year fetching
:param rtsu_client:
:return:
"""
await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
ac_years = await rtsu_client.get_academic_years()
year = ac_years[0].id
years = await rtsu_client.get_academic_year_subjects(year)
assert type(years) == list
assert len(years) > 0
Тут тесты сделаны наверное грязно и тупо, я пока не читал про best-practises в тестировании API, буду рад предложениям в комментах по этому поводу.
Ах, да, для тестов тут используется отдельный конфиг, в конце статьи я покажу как его заполнить.
На руках у нас уже есть wrapper
База данных и SQLAlchemy
Установив алхимию, я принялся создавать пакет моделей
Но для начала, покажу вам файлик database.py в котором настроено подключение к базе.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from rtsu_students_bot.config import settings
engine = create_async_engine(
settings.db.url,
)
SessionLocal = sessionmaker(bind=engine, class_=AsyncSession)
Тут нет ничего такого, просто вместо обычной сессии я использую асинхронную (по очевидным причинам)
Вернёмся к пакетнику с моделями.
from sqlalchemy import Integer, Column, String, Boolean, BigInteger
from .base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
full_name = Column(String(length=255), nullable=True)
token = Column(String(length=600), nullable=True)
is_authorized = Column(Boolean, default=False)
telegram_id = Column(BigInteger)
def __str__(self):
return f"{self.__class__.__name__}"
Тут всё очень просто, у нас в базе будет храниться токен, telegram-id, ну и флажок который будет сообщать о том, авторизован ли пользователь.
Далее, пилим пакетник service
который будет помогать нам работать с базой данных
Содержимое user.py
from typing import Optional
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.models import User
from .exceptions import UserNotFound, UserAlreadyExists
async def get_user_by_tg_id(
session: AsyncSession,
telegram_id: int,
) -> Optional[User]:
"""
Returns user by tg-id
:param session: An `AsyncSession` object
:param telegram_id: A telegram-ID
:return: `User` or `None`
"""
stmt = select(User).where(User.telegram_id == telegram_id)
result = await session.execute(stmt)
return result.scalars().first()
async def get_user_by_id(
session: AsyncSession,
user_id: int,
) -> Optional[User]:
"""
Returns user by its id
:param session: An `AsyncSession` object
:param user_id: An ID
:return: `User` or `None`
"""
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalars().first()
async def create_user(
session: AsyncSession,
telegram_id: int,
full_name: Optional[str] = None,
token: Optional[str] = None,
):
"""
Creates `User` object
:param session: An `AsyncSession` object
:param telegram_id: A telegram-id
:param full_name: Fullname of user
:param token: A token of user
:return: Created `User`
"""
existed_user = await get_user_by_tg_id(session, telegram_id)
if existed_user is not None:
raise UserAlreadyExists(f"User with ID {telegram_id} already exists.")
is_authorized = token is not None
obj = User(
telegram_id=telegram_id,
full_name=full_name,
token=token,
is_authorized=is_authorized,
)
session.add(obj)
await session.flush()
await session.refresh(obj)
return obj
async def update_user_token(
session: AsyncSession,
telegram_id: int,
token: Optional[str] = None,
) -> User:
"""
Authorizes `User`
:param telegram_id:
:param session:
:param token:
:return:
"""
user = await get_user_by_tg_id(session, telegram_id)
if not user:
raise UserNotFound(f"User with telegram-id {telegram_id} not found.")
is_authorized = token is not None
stmt = update(User).where(
int(user.id) == User.id
).values(
is_authorized=is_authorized,
token=token,
)
await session.execute(stmt)
return await get_user_by_tg_id(session, user.telegram_id)
async def update_user(
session: AsyncSession,
user_id: int,
telegram_id: Optional[int] = None,
full_name: Optional[str] = None,
) -> User:
"""
Updates telegram user
:param session:
:param user_id:
:param telegram_id:
:param full_name:
:return:
"""
user = await get_user_by_id(session, user_id)
if user is None:
raise UserNotFound(f"User with ID {user_id} not found.")
stmt = update(User).where(User.id == user_id)
if telegram_id is not None:
stmt = stmt.values(
telegram_id=telegram_id,
)
if full_name is not None:
stmt = stmt.values(
full_name=full_name
)
await session.execute(stmt)
return await get_user_by_id(session, user_id)
async def delete_user(session: AsyncSession, user_id: int):
"""
Deletes `User` object
:param user_id:
:param session: An `AsyncSession` object
:return:
"""
if await get_user_by_id(session, user_id) is None:
raise ValueError("Invalid user-id passed.")
stmt = delete(User).where(User.id == user_id)
await session.execute(stmt)
Здесь я реализовал обычные функции которые будут помогать мне работать с базой, хотя наверное лучше бы я применил паттерн «Репозиторий»
Тестируем созданный CRUD
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from rtsu_students_bot.service import user
from rtsu_students_bot.models import Base
from .config import settings
pytest_plugins = ('pytest_asyncio',)
engine = create_async_engine(
settings.db_url,
)
SessionLocal = sessionmaker(autoflush=True, bind=engine, class_=AsyncSession)
@pytest_asyncio.fixture()
async def session():
"""
Initializes client
:return: Prepared `RTSUApi` client
"""
async with SessionLocal() as e, e.begin():
yield e
@pytest.mark.asyncio
async def test_tables_creating():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@pytest.mark.asyncio
async def test_user_creation(session: AsyncSession):
"""
Tests user-creation
:return:
"""
user_data = {
"full_name": "Vladimir Putin",
"telegram_id": 1,
}
created_user = await user.create_user(session, **user_data)
assert created_user.full_name == user_data.get("full_name")
assert created_user.telegram_id == user_data.get("telegram_id")
@pytest.mark.asyncio
async def test_user_update(session: AsyncSession):
"""
Tests user updating
:param session:
:return:
"""
updating_data = {
"full_name": "Volodymir Zelensky"
}
first_user = await user.get_user_by_tg_id(session, 1)
updated_user = await user.update_user(session, first_user.id, **updating_data)
assert first_user.id == updated_user.id
assert first_user.telegram_id == updated_user.telegram_id
assert updated_user.full_name == updating_data.get("full_name")
@pytest.mark.asyncio
async def test_user_token_updating(session: AsyncSession):
"""
Tests user-token updating
:param session:
:return:
"""
first_user = await user.get_user_by_tg_id(session, 1)
assert not first_user.is_authorized
first_user = await user.update_user_token(session, first_user.telegram_id, token="test token")
assert first_user.is_authorized
assert first_user.token == "test token"
assert first_user.telegram_id == 1
@pytest.mark.asyncio
async def test_user_deleting(session: AsyncSession):
"""
Tests user-token updating
:param session:
:return:
"""
first_user = await user.get_user_by_tg_id(session, 1)
assert first_user is not None
await user.delete_user(session, first_user.id)
first_user = await user.get_user_by_tg_id(session, 1)
assert first_user is None
Тут также создается подключение к базе данных, я просто проверяю CRUD на работоспособность, собственно данные тесты помогли мне найти одну ошибку, поэтому я думаю писал я их не зря =)
Пилим самого бота
Для работы с Telegram Bot API я несомненно выбрал Aiogram ну и для этого сделал еще один пакетник.
Middlewares
Для того чтобы «протаскивать» необходимые ресурсы к хендлерам (API-клиент и AsyncSession), мне нужен специальный для этих целей мидлварь
import logging
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram import types
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.rtsu import RTSUApi
from rtsu_students_bot.database import engine
class ResourcesMiddleware(BaseMiddleware):
"""
Middleware for providing resources like db-connection and RTSU-client
"""
def __init__(self):
"""
Initializes self
"""
self._logger = logging.getLogger("resources_middleware")
super().__init__()
@staticmethod
async def _provide_api_client() -> RTSUApi:
"""
Provides `RTSU` api client
:return: Initialized client
"""
client = RTSUApi()
return client
@staticmethod
async def _provide_db_session() -> AsyncSession:
"""
Provides `AsyncSession` object
:return: Initialized session
"""
session = AsyncSession(engine)
return session
async def _provide_resources(self) -> dict:
"""
Initializes & provides needed resources, such as `RTSU-api-client` and `AsyncSession`
:return:
"""
self._logger.debug("Providing resources")
api_client = await self._provide_api_client()
db_session = await self._provide_db_session()
resources = {
"rtsu": api_client,
"db_session": db_session,
}
return resources
async def _cleanup(self, data: dict):
"""
Closes connections & etc.
:param data:
:return:
"""
self._logger.debug("Cleaning resources")
if "db_session" in data:
self._logger.debug("SQLAlchemy session detected, closing connection.")
session: AsyncSession = data["db_session"]
await session.commit() # Commit changes
await session.close()
if "rtsu" in data:
self._logger.debug("RTSU API Client detected, closing resource.")
api_client: RTSUApi = data["rtsu"]
await api_client.close_session()
async def on_pre_process_message(self, update: types.Message, data: dict):
"""
For pre-processing `types.Update`
:param data: Data from other middlewares
:param update: A telegram-update
:return:
"""
resources = await self._provide_resources()
data.update(resources)
return data
async def on_pre_process_callback_query(self, query: types.CallbackQuery, data: dict):
"""
Method for preprocessing callback-queries
:param query: A callback-query
:param data: A data from another middleware
:return:
"""
resources = await self._provide_resources()
data.update(resources)
return data
async def on_post_process_callback_query(self, query: types.CallbackQuery, data_from_handler: list, data: dict):
"""
Method for post-processing callback query
:param data_from_handler: Data from handler
:param query: A callback query
:param data: A data from another middleware
:return:
"""
await self._cleanup(data)
async def on_post_process_message(self, message: types.Message, data_from_handler: list, data: dict):
"""
For post-processing message
:param data_from_handler:
:param message:
:param data:
:return:
"""
await self._cleanup(data)
Тут все очень просто, при старте обработки сообщений/CallbackQuery я просто подгружаю ресурсы и передаю их в data
Также, после обработки всего этого дела, я вызываю _cleanup
который при обнаружении сессий закроет их (ну и сделает коммит в случае с алхимией)
Получение пользователя
Для этого я сделал тоже отдельный мидлварь, который разумеется отрабатывает после первого
import logging
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram import types
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.service import user
from rtsu_students_bot.rtsu import RTSUApi
class UserMiddleware(BaseMiddleware):
"""
Middleware for providing a `User` object
"""
def __init__(self):
"""
Initializes self
"""
self._logger = logging.getLogger("users_middleware")
super().__init__()
async def _provide_user(self, user_id: int, data: dict) -> dict:
"""
Fetches and returns user
"""
if 'db_session' not in data:
raise RuntimeError("AsyncSession not found.")
if 'rtsu' not in data:
raise RuntimeError("RTSU API client not found.")
db_session: AsyncSession = data.get("db_session")
rtsu_client: RTSUApi = data.get("rtsu")
self._logger.debug(f"Getting user with ID {user_id}")
u = await user.get_user_by_tg_id(db_session, user_id)
if u is None:
self._logger.debug(f"User with ID {user_id} not found, creating...")
u = await user.create_user(db_session, telegram_id=user_id)
self._logger.debug(f"User provided, {u}")
# If user is authorized, lets setup `RTSU` client
if u.is_authorized:
rtsu_client.set_token(u.token)
self._logger.debug("User is authorized, API-client's token initialized.")
data["user"] = u
return data
async def on_pre_process_message(self, message: types.message, data: dict):
"""
Method for preprocessing messages (provides user)
:param message: A message
:param data: A data from another middleware
:return: None
"""
return await self._provide_user(message.from_user.id, data)
async def on_pre_process_callback_query(self, query: types.CallbackQuery, data: dict):
"""
Method for preprocessing callback-queries (provides user)
:param data:
:param query:
:return:
"""
return await self._provide_user(query.from_user.id, data)
Конкретно тут, мы просто создаем пользователя если на нашли его в бд, помимо того, если пользователь авторизован (токен есть в базе данных) мы также инициализируем токенAPI-клиента.
Теперь у нас есть почти всё что нужно для обработки сообщений.
Шаблонизатор Jinja2
Хорошо бы выделить «шаблоны» куда-то, собственно, я решил воспользоватся шаблонизатором «Нинздя»
Для этих целей я создал файлик template_engine.py
from typing import Optional, Any, Dict
from jinja2 import Environment, PackageLoader, select_autoescape
env = Environment(
loader=PackageLoader('rtsu_students_bot', 'templates'),
autoescape=select_autoescape(['html'])
)
def render_template(name: str, values: Optional[Dict[str, Any]] = None, **kwargs):
"""
Renders template & returns text
:param name: Name of template
:param values: Values for template (optional)
:param kwargs: Keyword-arguments for template (high-priority)
"""
template = env.get_template(name)
if values:
rendered_template = template.render(values, **kwargs)
else:
rendered_template = template.render(**kwargs)
return rendered_template
Тут я инициализировал шаблонизатор и сделал функцию render_template
которая просто рендерит шаблон и возвращает готовый текст.
Шаблоны
Для них я сделал отдельную директорию в проекте, там ничего интересного я думаю вы не найдете.
Вот например шаблон для отправки профиля студента.