Как построить асинхронное Python-приложение для рассылки уведомлений о событии
В этой статье рассмотрим создание асинхронного приложения на Python с использованием библиотеки httpx для рассылки уведомлений пользователям о предстоящих событиях, на которые они зарегистрировались.
Приложение будет запускаться раз в сутки с помощью планировщика Cron на Linux сервере. Для отправки SMS-уведомлений воспользуемся платформой МТС Exolve.
Зачем это нужно
Рассылка уведомлений пользователям — важная задача во многих сервисах. Она позволяет информировать пользователей о важных обновлениях, новостях или других интересных им событиях. При этом SMS — это инструмент с гарантированной доставкой и высокой открываемостью, сообщения дойдут даже до пользователей с отключенным интернетом.
Асинхронное приложение на Python с использованием библиотеки httpx позволяет эффективно реализовать такую рассылку, обеспечивая высокую производительность и отзывчивость системы.
Почему асинхронно
В нашем случае, использование асинхронного подхода позволяет приложению эффективно обрабатывать большое количество запросов на рассылку уведомлений, минимизируя задержки и обеспечивая отзывчивость системы.
Пример асинхронной реализации клиента на httpx
Рассмотрим пример реализации отправки SMS-уведомлений через приложение на языке python. Для начала, убедитесь, что у вас установлены все необходимые зависимости. Мы используем httpx для создания приложения и сопутствующую библиотеку asyncio.
Проект будет со следующей структурой:
aclient_exmp/
/venv
/example_db
__init__.py
handle_data.py
info_db.py
/apimodul
__init__.py
mtt_client.py
/helper
__init__.py
decorators.py
logging.yaml
config.py
main.py
dev.env
В файле dev.env хранятся переменные окружения: API-ключ, URL-адрес запроса к МТС Exolve и номер телефона для отправки SMS.
В файле config.py получим данные из переменных окружения.
from dotenv import dotenv_values
info_env = dotenv_values('dev.env')
API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
BASE_URL = info_env.get('BASE_URL')
Логирование
Добавим логирование к нашему проекту. Файл logging.yaml представляет собой файл конфигурации, в котором определяются форматирование, обработчики и уровни логирования для различных логгеров в проекте. В данном примере файла logging.yaml мы определяем форматирование для логов, создаём обработчик для вывода логов в консоль и настраиваем корневой логгер для уровня DEBUG.
version: 1
formatters:
info:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console_handler:
class: logging.StreamHandler
level: INFO
formatter: info
stream: ext://sys.stdout
root:
level: DEBUG
handlers: [console_handler]
Дополним код в файле config.py, который выполнит загрузку конфигурации логирования из файла logging.yaml и настройку логгеров в проекте.
import logging.config
import os
import sys
import yaml
from dotenv import dotenv_values
info_env = dotenv_values('dev.env')
API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
BASE_URL=f"{info_env.get('BASE_URL')}"
CURRENT_FILE_PATH = os.path.abspath(__file__)
BASE_DIR = os.path.dirname(CURRENT_FILE_PATH)
LOGGING_CONF = BASE_DIR + '/logging.yaml'
if os.path.isfile(LOGGING_CONF) and os.access(LOGGING_CONF, os.R_OK):
_lc_stream = open(LOGGING_CONF, 'r')
_lc_conf = yaml.load(_lc_stream, Loader=yaml.FullLoader)
_lc_stream.close()
logging.config.dictConfig(_lc_conf)
else:
print(
"ERROR: logger config file '%s' not exsits or not readable\n" %
LOGGING_CONF)
sys.exit(1)
В этом примере кода мы проверяем, существует ли файл logging.yaml и доступен ли он для чтения. Затем мы открываем файл, загружаем его содержимое в переменную _lc_conf с помощью модуля yaml, и закрываем файл. Далее мы используем метод dictConfig из модуля logging.config, чтобы применить загруженную конфигурацию к логгерам в проекте.
Если файл logging.yaml не существует или не доступен для чтения, выводится сообщение об ошибке.
Получение и обработка данных
Для простого примера в качестве базы данных будем использовать список событий, который содержит элементы:
event_list = [
{
'name': 'Music Show',
'date': '2023:12:12',
'time': '17:00',
'mentor': 'Jazz Band',
'guests': [
{'name': 'Ivan Gubov', 'phone': '79007771101'},.....
{'name': 'Mansur Berdiev', 'phone': '79800002001'}]
},...........,
{
'name': 'Music Show',
'date': '2023:11:14',
'time': '20:00',
'mentor': 'Jazz Band',
'guests': [{'name': 'Olga Lomova', 'phone': '79055551101'}]}]
По ключу guests получим список гостей с номерами телефонов, которые зарегистрировались на событие. Данные разместим в файле info_db.py.
Далее перейдем в файл handle_data.py инапишем простую функцию, возвращающую список событий, которые приходятся на текущую дату.
import datetime
from example_db.info_db import event_list
def get_event_today(event_list):
current_date = datetime.date.today().strftime("%Y:%m:%d")
result_list = []
for event in event_list:
if event['date'] == current_date:
result_list.append(event)
return result_list
В реальном проекте, когда нужны события за текущую дату, возможно придётся составить более сложные запросы к базе данных или API. Например, может потребоваться выполнить запрос к базе данных для получения списка событий с определёнными условиями, включая текущую дату. Однако, для нашего простого примера, функция get_event_today подходит, чтобы показать основной механизм выборки событий.
Ограничение числа запросов в единицу времени и контроль ошибок
Для того, чтобы контролировать число запросов в единицу времени и обработки ошибок, связанных с перегрузкой сервера (например, ошибку 500), создадим функцию декоратор rate_limit.
Декоратор rate_limit имеет два параметра: limit и interval. Параметр limit определяет максимальное количество запросов, которое можно выполнить в указанную единицу времени interval. Внутри декоратора определена функция wrapper. Это обёртка для оригинальной асинхронной функции. Внутри wrapper происходит контроль скорости выполнения запросов и обработка ошибок.
import asyncio
import logging
import functools
import httpx
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
def rate_limit(limit: int, interval: int):
def decorator(func):
last_call = datetime.min
retries = 0
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
nonlocal last_call, retries
elapsed = datetime.now() - last_call
if elapsed < timedelta(seconds=interval):
await asyncio.sleep((timedelta(seconds=interval) - elapsed).total_seconds())
last_call = datetime.now()
try:
return await func(self, *args, **kwargs)
except httpx.HTTPError as http_err:
if http_err.response.status_code == 500 and retries <= 5:
retries += 1
await asyncio.sleep(3)
logger.info(f"HTTP ошибка:{func.__name__} \n Дополнительная попытка запроса: {retries}")
return await wrapper(self, *args, **kwargs)
else:
logger.error(f":{http_err}. \n Код ошибки: {http_err.response.status_code}")
raise
except Exception as e:
logger.error(f"Программная ошибка:{e} \n В функции: {func.__name__}")
raise
return wrapper
return decorator
Если при выполнении запроса возникает ошибка типа httpx.HTTPError, wrapper проверяет код ошибки. Если код ошибки равен 500 и количество попыток повторного выполнения запроса меньше или равно 5, то wrapper делает паузу в 3 секунды с помощью await asyncio.sleep (3) и повторно вызывает себя для выполнения запроса ещё раз:
return await wrapper(self, *args, **kwargs)
Если код ошибки не равен 500 или количество попыток повторного выполнения запроса превышает 5, то wrapper генерирует исключение, которое будет обработано в вызывающем коде.
Ограничение числа запросов в единицу времени помогает снизить нагрузку на сервер и повысить производительность приложения. Обработка 500 ошибки и повторное выполнение запроса позволяет улучшить стабильность работы приложения, особенно в случаях, когда сервер временно недоступен или перегружен.
Использование асинхронного декоратора позволяет эффективно управлять асинхронными запросами и обеспечивает отзывчивость приложения даже при выполнении большого количества запросов.
Классы клиентов для работы с МТС Exolve
В представленном ниже коде модуля mtt_client.py есть два класса: MTSClient и SendingSMS. Оба класса предназначены для работы с API МТС Exolve и обеспечивают функциональность отправки сообщений.
import logging
import typing
import httpx
import asyncio
from config import API_KEY, PHONE_SEND, BASE_URL
from helper.decorators import rate_limit
logger = logging.getLogger(__name__)
class MTSClient:
"""Клиент умеющий делать запрос к API MTC."""
def __init__(
self,
base_url: str,
token: str,
aclient: httpx.AsyncClient or None = None
):
"""
:param base_url: API URL
:param token: Токен
:param aclient: Асинхронный клиент
"""
self.base_url = base_url
self.token = token
self.aclient = aclient or httpx.AsyncClient()
self.headers = {"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}"}
@rate_limit(limit=1, interval=1)
async def send_message(self, body) -> dict:
"""
Отправить sms c напоминанием о событии.
:param body: Тело запроса
:return: Тело ответа в формате json с message_id
"""
url = f"{self.base_url}"
response = await self.aclient.post(url=url, json=body, headers=self.headers, timeout=5)
response.raise_for_status()
decoded_response = response.json()
if response.status_code == 200:
logger.info(f"id сообщения: {decoded_response}")
logger.info(f"Напоминание о событии успешно отправлено пользователю с номером: {body['destination']}")
return decoded_response
class SendingSMS():
def __init__(self, client: MTSClient):
self.client = client
async def send_all_guests(self, event_today: list) -> bool:
"""
Рассылает напоминание всем гостям по списку.
:param event_today: Cписок событий на сегодня
:return: Логическое значение в случае успеха
"""
for event in event_today:
guests_data = event["guests"]
for element in guests_data:
sms_data = {
"number": PHONE_SEND,
"destination": element["phone"],
"text": f"Ждем Вас сегодня в {event['time']} на событие {event['name']}"}
message_info = await self.client.send_message(body=sms_data)
return True
Класс MTSClient
Класс MTSClient — клиент для работы с API МТС Exolve. Он содержит методы и атрибуты для отправки запросов и обработки ответов от сервера.
В конструкторе класса MTSClient инициализируются следующие атрибуты:
base_url: URL API MTS Exolve.
token: токен для аутентификации при отправке запросов.
aclient: асинхронный клиент httpx.AsyncClient или None (по умолчанию).
Класс MTSClient также содержит метод send_message, который асинхронно отправляет SMS-сообщение с напоминанием о событии. Метод принимает тело запроса body и возвращает словарь с информацией о сообщении. На данный метод для контроля ошибок и частоты запросов мы помеcтили упомянутый ранее декоратор rate_limit.
Назначение этого класса инкапсулировать логику отправки запросов и обработки ответов, что позволяет легко использовать его в различных частях приложения. В случае изменения API МТС Exolve, достаточно внести изменения только внутри класса MTSClient, не затрагивая другие части кода.
Класс SendingSMS
Класс SendingSMS представляет собой обертку над клиентом MTSClient.
В конструкторе класса SendingSMS инициализируется атрибут client, это экземпляр класса MTSClient. Он позволяет использовать функциональность MTSClient для отправки сообщений.
Класс SendingSMS содержит метод send_all_guests, который асинхронно отправляет напоминание о событии всем гостям из списка. Метод принимает список событий event_today и возвращает True, если все сообщения успешно отправлены.
Этот класс предоставляет удобный интерфейс для отправки SMS-сообщений гостям событий. Он скрывает детали взаимодействия с API МТС Exolve и позволяет сосредоточиться на бизнес-логике приложения.
Функция amain
Создадим функцию amain, которая будет асинхронной точкой входа в приложение.
async def amain(event_today):
async with httpx.AsyncClient() as aclient:
mtt_client = MTSClient(
base_url=BASE_URL,
token=API_KEY,
aclient=aclient)
sms_sender = SendingSMS(client=mtt_client)
tasks = [sms_sender.send_all_guests(event_today)]
result_work_send = await asyncio.gather(*tasks)
Внутри функции создается экземпляр класса MTSClient и класса SendingSMS с передачей асинхронного клиента httpx.AsyncClient. Затем создаются асинхронные задания для отправки сообщений гостям событий.
Функцию amain можно вызывать в главном файле приложения с помощью asyncio.run, что позволяет асинхронно выполнить задачи отправки сообщений.
В модуле mtt_client.py представленные классы и функция обеспечивают удобство, гибкость и расширяемость при работе с API МТС Exolve для отправки SMS-сообщений.
Основной файл приложения
Сформируем главный файл приложения main.py
import sys
import logging
import asyncio
import apimodul.mtt_client as mtt
from example_db import handle_data, info_db
logger = logging.getLogger(__name__)
if __name__ == "__main__":
while True:
try:
event_today = handle_data.get_event_today(info_db.event_list)
if event_today:
asyncio.run(mtt.amain(event_today))
break
except AssertionError as e:
logger.error(e, exc_info=True)
logger.error("Произошла ошибка при работе с данными.")
except KeyboardInterrupt:
logger.error(f" Произошло прерывание программы с клавиатуры.")
sys.exit(1)
except Exception as e:
logger.error(f" Ошибка выполнения программы: {e}")
sys.exit(1)
sys.exit(0)
Внутри цикла while True происходит вызов функции get_event_today из модуля handle_data, которая получает список событий, запланированных на текущий день. Если в списке есть события, вызывается асинхронная функция amain из модуля mtt_client.py, которая обрабатывает эти события.
Такая структура главного файла проекта обеспечивает контроль выполнения программы и обработку возможных ошибок. Цикл while True позволяет повторять выполнение программы до тех пор, пока не будет выполнено условие выхода из цикла.
В блоке try-except происходит обработка исключений, которые могут возникнуть во время выполнения кода. Различные типы исключений обрабатываются по-разному:
AssertionError: Если возникает утверждение (assertion) внутри функции get_event_today, то генерируется исключение AssertionError. В этом случае, в лог-файл записывается ошибка и информация о произошедшем исключении.
KeyboardInterrupt: Если пользователь прерывает выполнение программы с клавиатуры (нажимает Ctrl+C), то генерируется исключение KeyboardInterrupt. В этом случае, в лог-файл записывается информация о прерывании программы.
Exception: Если возникает любое другое исключение, не попадающее в предыдущие категории, то генерируется исключение Exception. В этом случае, в лог-файл записывается информация об ошибке выполнения программы.
После обработки исключений в случае успешного выполнения, программа завершается с помощью вызова sys.exit (0) или sys.exit (1), если возникли ошибки, где:
0- сигнал в операционную систему об успешном завершении программы, его можно учитывать для последующего запуска других приложений.
1- сигнал в операционную систему об ошибке.
Запуск по расписанию
Наша программа рассылки готова. Теперь необходимо подумать об автоматическом запуске в определённое время. Для этого используем планировщик заданий Cron. Откроем терминал Linux и введём команду:
crontab -e
У нас откроется файл с интервалами запуска. Время и дата внутри файла задаётся особым синтаксисом, рассмотрим настройку одной задачи Cron:
* * * * * cd /home/projects/program
[минута][час][день][месяц][день недели][команда(ы)]
В нашем случае запустим скрипт один раз в день в 11:15 утра. Тогда итоговая запись в cron-файле будет выглядеть так:
15 11 * * * cd PycharmProjects/aclient_exemp && venv/bin/python3.8 main.py
То есть, сначала задаём время, переходим в папку с проектом и через виртуальное окружение запускаем скрипт. Так как проект находится в папке PycharmProjects, и запускаю я планировщик не из под супер-пользователя, а из под обычного, здесь нет необходимости записывать полный путь до папки с проектом, потому что пользователь находится по умолчанию в папке /home.
Теперь скрипт каждый день ровно в 11:30 будет отправлять SMS гостям с напоминанием о событии.
Более подробно о планировщике можно прочитать в этой статье. Стоит отметить, что среди плюсов планировщика Cron простота, надёжность и поддержка работы на большинстве систем Linux и Unix. Это означает, что вы без проблем можете использовать его на различных платформах.
Заключение
Этот проект — пример того, как можно использовать httpx и планировщик Cron для создания системы SMS-оповещений. Клиент может послужить отправной точкой для разработки более сложных приложений для напоминаний о мероприятии. С помощью этого простого примера удалось показать, что Cron предоставляет простой и понятный способ планирования задач. Мы можем легко настроить их выполнение по определённому расписанию, указав время и дату их выполнения.