Как построить асинхронное Python-приложение для рассылки уведомлений о событии

08cb3265d6c2850e817c4dad03df583b.png

В этой статье рассмотрим создание асинхронного приложения на Python с использованием библиотеки httpx для рассылки уведомлений пользователям о предстоящих событиях, на которые они зарегистрировались.

Приложение будет запускаться раз в сутки с помощью планировщика Cron на Linux сервере. Для отправки SMS-уведомлений воспользуемся платформой МТС Exolve.

Зачем это нужно

Рассылка уведомлений пользователям — важная задача во многих сервисах. Она позволяет информировать пользователей о важных обновлениях, новостях или других интересных им событиях. При этом SMS — это инструмент с гарантированной доставкой и высокой открываемостью, сообщения дойдут даже до пользователей с отключенным интернетом.

Асинхронное приложение на Python с использованием библиотеки httpx позволяет эффективно реализовать такую рассылку, обеспечивая высокую производительность и отзывчивость системы.

Почему асинхронно

В нашем случае, использование асинхронного подхода позволяет приложению эффективно обрабатывать большое количество запросов на рассылку уведомлений, минимизируя задержки и обеспечивая отзывчивость системы.

Пример асинхронной реализации клиента на httpx

Рассмотрим пример реализации отправки SMS-уведомлений через приложение на языке python. Для начала, убедитесь, что у вас установлены все необходимые зависимости. Мы используем httpx для создания приложения и сопутствующую библиотеку asyncio.

Проект будет со следующей структурой:

aclient_exmp/
    /venv
    /example_db
        __init__.py
        handle_data.py
        info_db.py
    /apimodul
        __init__.py
        mtt_client.py
   /helper
        __init__.py
        decorators.py
    logging.yaml
    config.py
    main.py
    dev.env

В файле dev.env хранятся переменные окружения: API-ключ, URL-адрес запроса к МТС Exolve и номер телефона для отправки SMS. 

В файле config.py получим данные из переменных окружения.

from dotenv import dotenv_values

info_env = dotenv_values('dev.env')

API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
BASE_URL = info_env.get('BASE_URL')

Логирование

Добавим логирование к нашему проекту. Файл logging.yaml представляет собой файл конфигурации, в котором определяются форматирование, обработчики и уровни логирования для различных логгеров в проекте. В данном примере файла logging.yaml мы определяем форматирование для логов, создаём обработчик для вывода логов в консоль и настраиваем корневой логгер для уровня DEBUG.

version: 1

formatters:
  info:
    format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

handlers:
  console_handler:
    class: logging.StreamHandler
    level: INFO
    formatter: info
    stream: ext://sys.stdout

root:
  level: DEBUG
  handlers: [console_handler]

Дополним код в файле config.py, который выполнит загрузку конфигурации логирования из файла logging.yaml и настройку логгеров в проекте.

import logging.config
import os
import sys

import yaml
from dotenv import dotenv_values


info_env = dotenv_values('dev.env')

API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
BASE_URL=f"{info_env.get('BASE_URL')}"



CURRENT_FILE_PATH = os.path.abspath(__file__)
BASE_DIR = os.path.dirname(CURRENT_FILE_PATH)
LOGGING_CONF = BASE_DIR + '/logging.yaml'

if os.path.isfile(LOGGING_CONF) and os.access(LOGGING_CONF, os.R_OK):
	_lc_stream = open(LOGGING_CONF, 'r')
	_lc_conf = yaml.load(_lc_stream, Loader=yaml.FullLoader)
	_lc_stream.close()
	logging.config.dictConfig(_lc_conf)
else:
	print(
    	"ERROR: logger config file '%s' not exsits or not readable\n" %
    	LOGGING_CONF)
	sys.exit(1)

В этом примере кода мы проверяем, существует ли файл logging.yaml и доступен ли он для чтения. Затем мы открываем файл, загружаем его содержимое в переменную _lc_conf с помощью модуля yaml, и закрываем файл. Далее мы используем метод dictConfig из модуля logging.config, чтобы применить загруженную конфигурацию к логгерам в проекте.

Если файл logging.yaml не существует или не доступен для чтения, выводится сообщение об ошибке.

Получение и обработка данных

Для простого примера в качестве базы данных будем использовать список событий, который содержит элементы:

event_list = [
	{
    	'name': 'Music Show',
    	'date': '2023:12:12',
    	'time': '17:00',
    	'mentor': 'Jazz Band',
    	'guests': [
        	{'name': 'Ivan Gubov', 'phone': '79007771101'},.....
        	{'name': 'Mansur Berdiev', 'phone': '79800002001'}]
	},...........,
{
'name': 'Music Show',
'date': '2023:11:14',
    	'time': '20:00',
    	'mentor': 'Jazz Band',
    	'guests': [{'name': 'Olga Lomova', 'phone': '79055551101'}]}]

По ключу guests получим список гостей с номерами телефонов, которые зарегистрировались на событие. Данные разместим в файле info_db.py.

Далее перейдем в файл handle_data.py инапишем простую функцию, возвращающую список событий, которые приходятся на текущую дату.

import datetime
from example_db.info_db import event_list

def get_event_today(event_list):
	current_date = datetime.date.today().strftime("%Y:%m:%d")
	result_list = []
	for event in event_list:
if event['date'] == current_date:
result_list.append(event)
	return result_list

В реальном проекте, когда нужны события за текущую дату, возможно придётся составить более сложные запросы к базе данных или API. Например, может потребоваться выполнить запрос к базе данных для получения списка событий с определёнными условиями, включая текущую дату. Однако, для нашего простого примера, функция get_event_today подходит, чтобы показать основной механизм выборки событий.

Ограничение числа запросов в единицу времени и контроль ошибок

Для того, чтобы контролировать число запросов в единицу времени и обработки ошибок, связанных с перегрузкой сервера (например, ошибку 500), создадим функцию декоратор rate_limit.

Декоратор rate_limit имеет два параметра: limit и interval. Параметр limit определяет максимальное количество запросов, которое можно выполнить в указанную единицу времени interval. Внутри декоратора определена функция wrapper. Это обёртка для оригинальной асинхронной функции. Внутри wrapper происходит контроль скорости выполнения запросов и обработка ошибок.

import asyncio
import logging
import functools
import httpx
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

def rate_limit(limit: int, interval: int):
	def decorator(func):
    	last_call = datetime.min
    	retries = 0

    	@functools.wraps(func)
    	async def wrapper(self, *args, **kwargs):
        	nonlocal last_call, retries
        	elapsed = datetime.now() - last_call
        	if elapsed < timedelta(seconds=interval):
            	await asyncio.sleep((timedelta(seconds=interval) - elapsed).total_seconds())
        	last_call = datetime.now()

        	try:

            	return await func(self, *args, **kwargs)
        	except httpx.HTTPError as http_err:
            	if http_err.response.status_code == 500 and retries <= 5:
                		retries += 1
                		await asyncio.sleep(3)
                		logger.info(f"HTTP ошибка:{func.__name__} \n Дополнительная попытка запроса: {retries}")
                		return await wrapper(self, *args, **kwargs)
            	else:
logger.error(f":{http_err}. \n Код ошибки: {http_err.response.status_code}")
                		raise
        	except Exception as e:
            	logger.error(f"Программная ошибка:{e} \n В функции: {func.__name__}")
            	raise
    		return wrapper
return decorator

Если при выполнении запроса возникает ошибка типа httpx.HTTPError, wrapper проверяет код ошибки. Если код ошибки равен 500 и количество попыток повторного выполнения запроса меньше или равно 5, то wrapper делает паузу в 3 секунды с помощью await asyncio.sleep (3) и повторно вызывает себя для выполнения запроса ещё раз:

return await wrapper(self, *args, **kwargs)

Если код ошибки не равен 500 или количество попыток повторного выполнения запроса превышает 5, то wrapper генерирует исключение, которое будет обработано в вызывающем коде.

Ограничение числа запросов в единицу времени помогает снизить нагрузку на сервер и повысить производительность приложения. Обработка 500 ошибки и повторное выполнение запроса позволяет улучшить стабильность работы приложения, особенно в случаях, когда сервер временно недоступен или перегружен.

Использование асинхронного декоратора позволяет эффективно управлять асинхронными запросами и обеспечивает отзывчивость приложения даже при выполнении большого количества запросов.

Классы клиентов для работы с МТС Exolve

В представленном ниже коде модуля mtt_client.py есть два класса: MTSClient и SendingSMS. Оба класса предназначены для работы с API МТС Exolve и обеспечивают функциональность отправки сообщений. 

import logging
import typing

import httpx
import asyncio

from config import API_KEY, PHONE_SEND, BASE_URL
from helper.decorators import rate_limit

logger = logging.getLogger(__name__)

class MTSClient:
	"""Клиент умеющий делать запрос к API MTC."""

	def __init__(
        	self,
    	base_url: str,
    	token: str,
    	aclient: httpx.AsyncClient or None = None
	):
    		"""
    		:param base_url: API URL
    		:param token: Токен
    		:param aclient: Асинхронный клиент
    		"""
    		self.base_url = base_url
    		self.token = token


    		self.aclient = aclient or httpx.AsyncClient()
    		self.headers = {"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}"}

	@rate_limit(limit=1, interval=1)
	async def send_message(self, body) -> dict:
    		"""
    		Отправить sms c напоминанием о событии.

    		:param body: Тело запроса
    		:return: Тело ответа в формате json с message_id
    		"""
    		url = f"{self.base_url}"
    		response = await self.aclient.post(url=url, json=body, headers=self.headers, timeout=5)
    		response.raise_for_status()
    		decoded_response = response.json()
    		if response.status_code == 200:
        		logger.info(f"id сообщения: {decoded_response}")
        		logger.info(f"Напоминание о событии успешно отправлено пользователю с номером: {body['destination']}")
    		return decoded_response


class SendingSMS():
	def __init__(self, client: MTSClient):
    	self.client = client

	async def send_all_guests(self, event_today: list) -> bool:
    	"""
    	Рассылает напоминание всем гостям по списку.
	:param event_today: Cписок событий на сегодня
    	:return: Логическое значение в случае успеха
    	"""

    		for event in event_today:
        		guests_data = event["guests"]
        		for element in guests_data:
            		sms_data = {
                		"number": PHONE_SEND,
                		"destination": element["phone"],
                		"text": f"Ждем Вас сегодня в {event['time']} на событие {event['name']}"}

            		message_info = await self.client.send_message(body=sms_data)

    		return True

Класс MTSClient

Класс MTSClient — клиент для работы с API МТС Exolve. Он содержит методы и атрибуты для отправки запросов и обработки ответов от сервера.

В конструкторе класса MTSClient инициализируются следующие атрибуты:

  • base_url: URL API MTS Exolve.

  • token: токен для аутентификации при отправке запросов.

  • aclient: асинхронный клиент httpx.AsyncClient или None (по умолчанию).

Класс MTSClient также содержит метод send_message, который асинхронно отправляет SMS-сообщение с напоминанием о событии. Метод принимает тело запроса body и возвращает словарь с информацией о сообщении. На данный метод для контроля ошибок и частоты запросов мы помеcтили упомянутый ранее декоратор rate_limit.

Назначение этого класса инкапсулировать логику отправки запросов и обработки ответов, что позволяет легко использовать его в различных частях приложения. В случае изменения API МТС Exolve, достаточно внести изменения только внутри класса MTSClient, не затрагивая другие части кода.

Класс SendingSMS

Класс SendingSMS представляет собой обертку над клиентом MTSClient.

В конструкторе класса SendingSMS инициализируется атрибут client, это экземпляр класса MTSClient. Он позволяет использовать функциональность MTSClient для отправки сообщений.

Класс SendingSMS содержит метод send_all_guests, который асинхронно отправляет напоминание о событии всем гостям из списка. Метод принимает список событий event_today и возвращает True, если все сообщения успешно отправлены.

Этот класс предоставляет удобный интерфейс для отправки SMS-сообщений гостям событий. Он скрывает детали взаимодействия с API МТС Exolve и позволяет сосредоточиться на бизнес-логике приложения.

Функция amain

Создадим функцию amain, которая будет асинхронной точкой входа в приложение.

async def amain(event_today):
	async with httpx.AsyncClient() as aclient:
    		mtt_client = MTSClient(
base_url=BASE_URL,
        			token=API_KEY,
        			aclient=aclient)
    		sms_sender = SendingSMS(client=mtt_client)
    		tasks = [sms_sender.send_all_guests(event_today)]
    		result_work_send = await asyncio.gather(*tasks)

Внутри функции создается экземпляр класса MTSClient и класса SendingSMS с передачей асинхронного клиента httpx.AsyncClient. Затем создаются асинхронные задания для отправки сообщений гостям событий.

Функцию amain можно вызывать в главном файле приложения с помощью asyncio.run, что позволяет асинхронно выполнить задачи отправки сообщений.

В модуле mtt_client.py представленные классы и функция обеспечивают удобство, гибкость и расширяемость при работе с API МТС Exolve для отправки SMS-сообщений.

Основной файл приложения

Сформируем главный файл приложения main.py

import sys
import logging
import asyncio
import apimodul.mtt_client as mtt
from example_db import handle_data, info_db

logger = logging.getLogger(__name__)

if __name__ == "__main__":
	while True:
    		try:
        		event_today = handle_data.get_event_today(info_db.event_list)
        		if event_today:
            		asyncio.run(mtt.amain(event_today))
        		break
    		except AssertionError as e:
        		logger.error(e, exc_info=True)
        		logger.error("Произошла ошибка при работе с данными.")
    		except KeyboardInterrupt:
        		logger.error(f" Произошло прерывание программы с клавиатуры.")
        		sys.exit(1)
    		except Exception as e:
        		logger.error(f" Ошибка выполнения программы: {e}")
        		sys.exit(1)
	sys.exit(0)

Внутри цикла while True происходит вызов функции get_event_today из модуля handle_data, которая получает список событий, запланированных на текущий день. Если в списке есть события, вызывается асинхронная функция amain из модуля mtt_client.py, которая обрабатывает эти события.

Такая структура главного файла проекта обеспечивает контроль выполнения программы и обработку возможных ошибок. Цикл while True позволяет повторять выполнение программы до тех пор, пока не будет выполнено условие выхода из цикла.

В блоке try-except происходит обработка исключений, которые могут возникнуть во время выполнения кода. Различные типы исключений обрабатываются по-разному:

  • AssertionError: Если возникает утверждение (assertion) внутри функции get_event_today, то генерируется исключение AssertionError. В этом случае, в лог-файл записывается ошибка и информация о произошедшем исключении.

  • KeyboardInterrupt: Если пользователь прерывает выполнение программы с клавиатуры (нажимает Ctrl+C), то генерируется исключение KeyboardInterrupt. В этом случае, в лог-файл записывается информация о прерывании программы.

  • Exception: Если возникает любое другое исключение, не попадающее в предыдущие категории, то генерируется исключение Exception. В этом случае, в лог-файл записывается информация об ошибке выполнения программы.

После обработки исключений в случае успешного выполнения, программа завершается с помощью вызова sys.exit (0) или sys.exit (1), если возникли ошибки, где:  

  • 0- сигнал в операционную систему об успешном завершении программы, его можно учитывать для последующего запуска других приложений. 

  • 1- сигнал в операционную систему об ошибке.

Запуск по расписанию

Наша программа рассылки готова. Теперь необходимо подумать об автоматическом запуске в определённое время. Для этого используем планировщик заданий Cron. Откроем терминал Linux и введём команду:

crontab -e

У нас откроется файл с интервалами запуска. Время и дата внутри файла задаётся особым синтаксисом, рассмотрим настройку одной задачи Cron:

 *		*   * 	*	   * 		 cd /home/projects/program
[минута][час][день][месяц][день недели][команда(ы)]

В нашем случае запустим скрипт один раз в день в 11:15 утра. Тогда итоговая запись в cron-файле будет выглядеть так:

15 11 * * * cd PycharmProjects/aclient_exemp && venv/bin/python3.8 main.py

То есть, сначала задаём время, переходим в папку с проектом и через виртуальное окружение запускаем скрипт. Так как проект находится в папке PycharmProjects, и запускаю я планировщик не из под супер-пользователя, а из под обычного, здесь нет необходимости записывать полный путь до папки с проектом, потому что пользователь находится по умолчанию в папке /home.

Теперь скрипт каждый день ровно в 11:30 будет отправлять SMS гостям с напоминанием о событии.

Более подробно о планировщике можно прочитать в этой статье. Стоит отметить, что среди плюсов планировщика Cron простота, надёжность и поддержка работы на большинстве систем Linux и Unix. Это означает, что вы без проблем можете использовать его на различных платформах.

Заключение

Этот проект — пример того, как можно использовать httpx и планировщик Cron для создания системы SMS-оповещений. Клиент может послужить отправной точкой для разработки более сложных приложений для напоминаний о мероприятии. С помощью этого простого примера удалось показать, что Cron предоставляет простой и понятный способ планирования задач. Мы можем легко настроить их выполнение по определённому расписанию, указав время и дату их выполнения.

© Habrahabr.ru