Асинхронный SQLAlchemy 2: улучшение кода, методы обновления и удаления данных

dba434295afaeb8d18dac0df78a5fb48.jpg

Друзья, приветствую!

Надеюсь, вы ждали выхода третьей статьи из серии «Асинхронный SQLAlchemy 2». Напоминаю, что на Хабре уже можно найти мои предыдущие статьи:

  1. Асинхронный SQLAlchemy 2: простой пошаговый гайд по настройке, моделям, связям и миграциям с использованием Alembic.

  2. Асинхронный SQLAlchemy 2: пошаговый гайд по управлению сессиями, добавлению и извлечению данных с помощью Pydantic.

Ознакомьтесь с ними перед изучением данного материала, так как я буду опираться на предыдущие наработки.

Чем займемся сегодня?

В этой статье мы сделаем значительный шаг вперед в освоении асинхронного SQLAlchemy 2.

Программа действий:

  • Оптимизация кода:  усовершенствуем базовый класс (BaseDao) для работы с таблицами и декоратор для генерации сессий, сделав их более гибкими и эффективными.

  • Обновление данных:  научимся выполнять одиночные и массовые обновления записей в таблицах.

  • Удаление данных:  освоим методы удаления отдельных записей и групп данных.

  • Асинхронный подход:  все операции будут выполняться асинхронно, что позволит приложениям работать быстрее и эффективнее.

Материала будет много, так что запасаемся терпением, открываем IDE и начинаем кодить!

Улучшение кода и реорганизация проекта

Сразу начнем с улучшения и реорганизации кода.

  1. Переместим файл database.py в пакет dao.

  2. В пакете dao создадим файл session_maker.py.

  3. Перенесем декоратор connection в файл session_maker.py.

Вот, как выглядел старый код декоратора:

def connection(method):    async def wrapper(*args, **kwargs):        async with async_session_maker() as session:            try:                # Явно не открываем транзакции, так как они уже есть в контексте                return await method(*args, session=session, **kwargs)            except Exception as e:                await session.rollback()  # Откатываем сессию при ошибке                raise e  # Поднимаем исключение дальше            finally:                await session.close()  # Закрываем сессию    return wrapper

Основная цель этого декоратора — избежать ручного создания сессии каждый раз. Мы его немного переработаем и добавим пояснения.

Новый декоратор connection (файл dao/session_maker.py):

from functools import wrapsfrom typing import Optionalfrom sqlalchemy import textfrom dao.database import async_session_makerdef connection(self, isolation_level: Optional[str] = None, commit: bool = True):    """    Декоратор для управления сессией с возможностью настройки уровня изоляции и коммита.    Параметры:    - `isolation_level`: уровень изоляции для транзакции (например, "SERIALIZABLE").    - `commit`: если `True`, выполняется коммит после вызова метода.    """    def decorator(method):        @wraps(method)        async def wrapper(*args, **kwargs):            async with self.session_maker() as session:                try:                    if isolation_level:                        await session.execute(text(f"SET TRANSACTION ISOLATION LEVEL {isolation_level}"))                    result = await method(*args, session=session, **kwargs)                    if commit:                        await session.commit()                    return result                except Exception as e:                    await session.rollback()                    raise                finally:                    await session.close()        return wrapper    return decorator

Основные улучшения нового декоратора:

  1. Управление уровнем изоляции

    Теперь можно выбирать подходящий уровень изоляции для разных операций:

    • READ COMMITTED — для обычных запросов (по умолчанию в PostgreSQL).

    • SERIALIZABLE — для финансовых операций, требующих максимальной надежности.

    • REPEATABLE READ — для отчетов и аналитики.

  2. Управление коммитами

    Гибкое управление коммитами полезно в следующих случаях:

    • Объединение нескольких операций в одну транзакцию.

    • Дополнительная логика до коммита.

    • Коммит выполняется в другом месте кода.

    • При чтении данных, где коммит не требуется.

  3. Улучшенная обработка результатов

    result = await method(*args, session=session, **kwargs)if commit:    await session.commit()return result

Сравнение с предыдущей версией:

  • Гибкость:  была одинаковая для всех операций, теперь можно настроить под каждую задачу.

  • Безопасность:  отсутствовал контроль изоляции, теперь гарантирован уровень изоляции для критичных операций.

  • Контроль транзакций:  не было управления коммитами, теперь есть гибкие настройки для различных сценариев.

Примеры использования:

# Чтение данных@connection(isolation_level="READ COMMITTED")async def get_user(self, session, user_id: int):    ...# Финансовая операция@connection(isolation_level="SERIALIZABLE", commit=False)async def transfer_money(self, session, from_id: int, to_id: int):    ...

Новый декоратор существенно повышает гибкость и безопасность работы с БД, позволяя точно настраивать поведение транзакций под конкретные задачи.

В сегодняшней статье мы будем использовать подход декоратора, но в будущем мы создадим универсальный класс, который сможет генерировать методы для создания сессии:

Этот класс можно описать всего один раз, а затем использовать в самых разных проектах, где требуется работа с базами данных: от простых Telegram-ботов до сложных FastApi-сервисов.

В моем Telegram‑канале «Легкий путь в Python» вы можете найти готовую заготовку для разработки приложений на FastAPI, которая включает:

  • Классовую реализацию session_maker.

  • Расширенный класс BaseDao с богатым функционалом.

  • Модуль для авторизации и аутентификации.

Улучшаем BaseDao

Мы вынесли коммиты в декоратор. Следовательно, теперь нет необходимости использовать commit в методах класса. Давайте заменим все await session.commit() на await session.flush().

В прошлой статье мы подробно рассматривали, зачем нужен метод flush() и как он работает. Если кратко, то, в отличие от commit(),  flush() выполняет временное сохранение в базе данных. С учетом того, что commit() мы вынесли на уровень декоратора, использование flush() в некоторых сценариях может дать значительный прирост производительности.

Кроме того, если в рамках ваших функций необходимо выполнять несколько связанных фиксаций в базе данных за пределами базового класса BaseDao, вы также можете использовать flush().

Наша реализация декоратора теперь позволяет выполнять commit() (сохранение изменений в базе данных) автоматически. Это создает ситуацию, при которой вы можете выполнить множество временных фиксаций через flush() быстро и оптимально, а затем не переживать о том, что изменения будут сохранены.

Надеюсь, что объяснение было понятным.

Это не все изменения, которые мы сегодня внесем в код. Следующее важное изменение — отказ от использования **values для передачи параметров в методы.

Текущая реализация, хоть и удобная, гибкая и быстрая, может привести к множеству проблем, таким как передача несуществующих параметров или неверных ключей, что потенциально может вызвать серьезные ошибки и проблемы.

Для улучшения класса в этой части нам поможет Pydantic. Напоминаю, что в прошлой статье мы учились его использовать для получения данных. Кроме того, в моем блоге на Хабре вы найдете подробную статью про Pydantic 2, которая имеет название «Pydantic 2: Полное руководство для Python-разработчиков — от основ до продвинутых техник». Желательно ознакомиться с ней перед тем, как продолжить чтение.

Для примера, внесем изменения в метод find_one_or_none и разберем их на практике:

@classmethodasync def find_one_or_none(cls, session: AsyncSession, filters: BaseModel):    # Найти одну запись по фильтрам    filter_dict = filters.model_dump(exclude_unset=True)    try:        query = select(cls.model).filter_by(**filter_dict)        result = await session.execute(query)        record = result.scalar_one_or_none()        return record    except SQLAlchemyError as e:        raise

Вместо передачи фильтров напрямую через **kwargs, теперь метод ожидает объект класса BaseModel из Pydantic.

Затем мы преобразуем объект класса BaseModel в обычный питоновский словарь с помощью метода model_dump. Параметр exclude_unset=True гарантирует, что в словарь попадут только явно заданные поля.

Благодаря флагу exclude_unset=True исключаются поля со значениями по умолчанию, которые не были явно установлены. Это позволяет создавать более гибкие и точные запросы, учитывая только те фильтры, которые действительно были заданы пользователем.

Далее логика работы остается как в старой реализации. Полученный словарь фильтров применяется к запросу с помощью filter_by(**filter_dict), и мы получаем либо нужное значение, либо None.

Динамические и обычные модели в Pydantic

Перед началом тестирования нового подхода в методах хочу поделиться трюком, который позволяет «на лету» формировать простые модели Pydantic. Это будет полезно, когда не требуется глубокая валидация и описание полей.

Для этой цели в Pydantic есть функция create_model. Она позволяет описывать модели, передавая в них простое описание полей. Вот простой пример применения:

from pydantic import create_modelMyModel = create_model('DynamicModel',                        name=(str, ...),                       age=(int, 42))instance = MyModel(name="test")print(instance.dict())print(MyModel.__name__)  # Выведет 'DynamicModel'
  • MyModel — переменная, в которой сохраняется созданная модель.

  • DynamicModel (первый аргумент) — строка, задающая имя модели для внутреннего представления и вывода информации о ней.

Это разделение позволяет создавать модели с одинаковыми именами, но разными структурами в разных частях кода, избегая конфликтов имен.

Практическое применение create_model

Ранее у нас был пример старого подхода к методу find_one_or_none:

@connectionasync def select_full_user_info_email(session, user_id, email):    rez = await UserDAO.find_one_or_none(session=session, id=user_id, email=email)    if rez:        return UserPydantic.from_orm(rez).dict()    return {'message': f'Пользователь с ID {user_id} не найден!'}

Теперь у нас многое изменилось. Во-первых, наш декоратор стал фабрикой декораторов, поэтому его нужно использовать со скобками, и в данном контексте можно передать commit=False, так как мы просто получаем данные из базы.

Метод find_one_or_none больше не принимает kwargs. Теперь нам нужна модель Pydantic, в которую мы передадим корректные значения.

Кроме того, для преобразования данных из SQLAlchemy ORM в модель Pydantic я использовал методы from_orm() и dict() для создания словаря. В новой версии Pydantic 2 эти методы были переименованы, хотя их прежние названия пока остаются поддерживаемыми:

Вот итоговая реализация:

@connection(commit=False)async def select_full_user_info_email(session: AsyncSession, user_id: int, email: str):    FilterModel = create_model(        'FilterModel',        id=(int, ...),        email=(EmailStr, ...)    )    user = await UserDAO.find_one_or_none(session=session, filters=FilterModel(id=user_id, email=email))    if user:        # Преобразуем ORM-модель в Pydantic-модель и затем в словарь        return UserPydantic.model_validate(user).model_dump()    return {'message': f'Пользователь с ID {user_id} не найден!'}

Также можно описать модель фильтра следующим образом:

class FilterModel(BaseModel):    id: int    email: EmailStr

Этот вариант работал бы аналогично.

В итоге, кода стало немного больше, но это дает ясность для вас и других разработчиков, какие параметры нужно передавать и чего ожидать. Поэтому настоятельно рекомендую внедрять Pydantic в свои проекты.

Добавление дженериков

Прежде чем перепишем оставшиеся методы BaseDAO, давайте сделаем еще одно улучшение в нашем базовом классе с методами, а именно, добавим дженерики.

Сначала пропишем код, а после будем разбираться.

from typing import Generic, TypeVar, Listfrom pydantic import BaseModelfrom sqlalchemy import selectfrom sqlalchemy.exc import SQLAlchemyErrorfrom sqlalchemy.ext.asyncio import AsyncSessionfrom dao.database import Base# Объявляем типовой параметр T с ограничением, что это наследник BaseT = TypeVar("T", bound=Base)class BaseDAO(Generic[T]):    model: type[T]

Это изменение связано с улучшением типизации и использованием дженериков в Python.

Давайте разберем ключевые аспекты:

Generic[T]:

  • Делает класс BaseDAO дженериком.

  • Позволяет указать конкретный тип модели при наследовании.

T = TypeVar («T», bound=Base):

  • Создает типовую переменную T.

  • Ограничивает T только наследниками класса Base (SQLAlchemy модели).

model: type[T]:

  • Указывает, что атрибут model будет типом, являющимся подклассом T.

Преимущества:

  • Улучшенная статическая типизация.

  • Более точные подсказки IDE.

  • Возможность использовать специфичные методы модели в дочерних DAO.

  • Предотвращение ошибок при работе с неправильными типами моделей.

Это изменение делает код более надежным и удобным для разработки, особенно в больших проектах с множеством моделей и DAO.

Изменения теперь касаются и дочерних классов. Теперь мы не просто наследуемся от BaseDAO, а можем указывать конкретные модели, с которыми работает дочерний класс.

Например, было:

class UserDAO(BaseDAO):    model = User

Стало:

class UserDAO(BaseDAO[User]):    model = User

Преимущества такого подхода:

  • Статическая типизация: IDE и инструменты статического анализа кода смогут правильно определить тип модели, с которой работает DAO.

  • Безопасность типов: Вы получите предупреждения или ошибки, если попытаетесь использовать методы с неправильными типами.

  • Улучшенные подсказки: IDE сможет предоставлять более точные подсказки при работе с методами DAO.

Теперь мы можем переписать все методы BaseDAO под новый формат и двигаться дальше.

from typing import Generic, TypeVar, Listfrom pydantic import BaseModelfrom sqlalchemy import selectfrom sqlalchemy.exc import SQLAlchemyErrorfrom sqlalchemy.ext.asyncio import AsyncSessionfrom dao.database import Base# Объявляем типовой параметр T с ограничением, что это наследник BaseT = TypeVar("T", bound=Base)class BaseDAO(Generic[T]):    model: type[T]    @classmethod    async def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):        # Найти запись по ID        try:            query = select(cls.model).filter_by(id=data_id)            result = await session.execute(query)            record = result.scalar_one_or_none()            return record        except SQLAlchemyError as e:            raise    @classmethod    async def find_one_or_none(cls, session: AsyncSession, filters: BaseModel):        # Найти одну запись по фильтрам        filter_dict = filters.model_dump(exclude_unset=True)        try:            query = select(cls.model).filter_by(**filter_dict)            result = await session.execute(query)            record = result.scalar_one_or_none()            return record        except SQLAlchemyError as e:            raise    @classmethod    async def find_all(cls, session: AsyncSession, filters: BaseModel | None):        if filters:            filter_dict = filters.model_dump(exclude_unset=True)        else:            filter_dict = {}        try:            query = select(cls.model).filter_by(**filter_dict)            result = await session.execute(query)            records = result.scalars().all()            return records        except SQLAlchemyError as e:            raise    @classmethod    async def add(cls, session: AsyncSession, values: BaseModel):        # Добавить одну запись        values_dict = values.model_dump(exclude_unset=True)        new_instance = cls.model(**values_dict)        session.add(new_instance)        try:            await session.flush()        except SQLAlchemyError as e:            await session.rollback()            raise e        return new_instance    @classmethod    async def add_many(cls, session: AsyncSession, instances: List[BaseModel]):        # Добавить несколько записей        values_list = [item.model_dump(exclude_unset=True) for item in instances]        new_instances = [cls.model(**values) for values in values_list]        session.add_all(new_instances)        try:            await session.flush()        except SQLAlchemyError as e:            await session.rollback()            raise e        return new_instances

Как видите, изменения не особо глобальные. Тут мы просто добавили больше порядка, безопасности и читаемости кода.

Обновление данных через SQLAlchemy

К этому моменту у вас должны быть:

  • База данных с таблицами и заполненными строками в PostgreSQL

  • Описаны модели таблиц

  • Создан декоратор для генерации сессий

  • Создан базовый и дочерний классы для работы с информацией в базе данных (в базовом классе должны быть прописаны методы для добавления и получения данных из таблиц).

Если это так, то мы можем приступить к методу update.

Что касается обновлений данных, то в SQLAlchemy можно выделить два основных подхода:

  1. Обновление отдельного объекта

  2. Использование метода update()

Обновление отдельного объекта

Этот метод использует паттерн Unit of Work и идеально подходит для обновления одной записи:

async def update_user(session: AsyncSession, user_id: int, new_name: str):    user = await session.get(User, user_id)    if user:        user.name = new_name        await session.commit()        return user    return None

В этом подходе:

  • Мы извлекаем объект из базы данных

  • Изменяем его атрибуты

  • SQLAlchemy отслеживает изменения

  • При вызове session.commit() изменения сохраняются в базе

Синхронный метод get

Обратите внимание, что я использовал метод get для получения данных. В прошлой статье я ввел вас в небольшое заблуждение: в последней версии SQLAlchemy этот метод стал асинхронным, тогда как раньше он был только синхронным.

Суть метода get в том, что он принимает модель таблицы и идентификатор PrimaryKey. Примечательно, что неважно, как называется колонка с PrimaryKey — get автоматически подставит имя нужной колонки.

Пример использования метода get:

@connection(commit=False)async def get_select(session: AsyncSession, user_id: int):    user = await session.get(User, user_id)    print(UserPydantic.model_validate(user).model_dump())    asyncio.run(get_select(user_id=21))

Результат:

{    "username": "bob_smith",    "email": "bob.smith@example.com",    "profile": {        "first_name": "Bob",        "last_name": "Smith",        "age": 25,        "gender": "мужчина",        "profession": "дизайнер",        "interests": ["gaming", "photography", "traveling"],        "contacts": {            "phone": "+987654321",            "email": "bob.smith@example.com"        }    }}

Теперь можно изменить метод базового класса BaseDao для получения информации о пользователе или None по ID.

Было:

@classmethodasync def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):    # Найти запись по ID    try:        query = select(cls.model).filter_by(id=data_id)        result = await session.execute(query)        record = result.scalar_one_or_none()        return record    except SQLAlchemyError as e:        raise

Стало:

@classmethodasync def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):    # Найти запись по ID    try:        return await session.get(cls.model, data_id)    except SQLAlchemyError as e:        print(f"Error occurred: {e}")        raise

Проверка:

@connection(commit=False)async def get_select(session: AsyncSession, user_id: int):    user = await UserDAO.find_one_or_none_by_id(session=session, data_id=user_id)    print(UserPydantic.model_validate(user).model_dump())asyncio.run(get_select(user_id=21))

Результат остаётся таким же.

Использование метода update()

Этот метод эффективен для массового обновления или когда нам не нужно загружать объект в память:

from sqlalchemy import updateasync def update_users_status(session: AsyncSession, age: int, new_status: str):    stmt = (        update(User)        .where(User.age > age)        .values(status=new_status)    )    result = await session.execute(stmt)    await session.commit()    return result.rowcount

Преимущества этого метода:

  • Более эффективен для массовых обновлений

  • Не требует предварительной загрузки объектов

  • Позволяет использовать сложные условия в where

Как мы видим, в этом случае снова необходимо обратиться к ранее изученной теме выборки данных. Вы можете использовать как метод where (другое название filter), так и filter_by. Однако важно помнить, что основной принцип остается неизменным.

Сначала вы должны определить, с какими объектами или одним объектом будет работать SQLAlchemy, а затем выполнить необходимые действия по обновлению.

Универсальные методы для обновления данных

Теперь, когда мы знаем общие подходы к обновлению информации, мы можем прописать универсальные методы в BaseDao.

Первый метод будет принимать идентификатор и Pydantic модель с новыми значениями и выполнять обновление.

@classmethodasync def update_one_by_id(cls, session: AsyncSession, data_id: int, values: BaseModel):    values_dict = values.model_dump(exclude_unset=True)    try:        record = await session.get(cls.model, data_id)        for key, value in values_dict.items():            setattr(record, key, value)        await session.flush()    except SQLAlchemyError as e:        print(e)        raise e

Метод update_one_by_id демонстрирует элегантное применение объектно-ориентированного программирования (ООП) в Python для обновления записей в базе данных. Вот как это работает:

  1. Получение объекта записи:

    • Используя уникальный идентификатор (data_id), мы извлекаем соответствующую запись из базы данных.

    • Эта запись представлена в виде объекта модели таблицы, что позволяет работать с ней как с обычным Python-объектом.

  2. Обновление атрибутов объекта:

    • Новые значения, предоставленные через Pydantic модель, применяются к атрибутам объекта.

    • Мы используем setattr() для динамического обновления каждого атрибута, что обеспечивает гибкость.

  3. Фиксация изменений:

    • После обновления атрибутов вызываем session.flush(), синхронизируя изменения объекта с базой данных без полного коммита транзакции.

Этот подход объединяет мощь ORM (Object-Relational Mapping) с гибкостью динамического обновления атрибутов.

Протестируем метод

Создадим файл update_methods_dao.py и попробуем обновить имя конкретного пользователя:

import asynciofrom pydantic import create_modelfrom sqlalchemy.ext.asyncio import AsyncSessionfrom dao.dao import UserDAOfrom dao.session_maker import connection@connection(commit=True)async def update_username(session: AsyncSession, user_id: int, new_username: str):    ValueModel = create_model('ValueModel', username=(str, ...))    await UserDAO.update_one_by_id(session=session, data_id=user_id, values=ValueModel(username=new_username))asyncio.run(update_username(user_id=1, new_username='yakvenalexx'))

Метод массовой обработки по переданному ID

@connection(commit=True)async def update_user(session: AsyncSession, user_id: int, new_username: str, email: int):    ValueModel = create_model('ValueModel', username=(str, ...), email=(EmailStr, ...))    await UserDAO.update_one_by_id(session=session, data_id=user_id, values=ValueModel(username=new_username, email=email))asyncio.run(update_user(user_id=1, email='mail@mail.ru', new_username='admin'))

Чтобы обновить значение username из таблицы users и возраст из связанной таблицы profiles, нам нужно создать отдельный метод в дочернем классе UserDAO. Этот метод будет явно указывать, где хранятся соответствующие переменные. В остальном логика работы не будет сильно отличаться от универсального метода из BaseDAO.

Вот пример кода:

class UserDAO(BaseDAO[User]):    model = User    @classmethod    async def update_username_age_by_id(cls, session: AsyncSession, data_id: int, username: str, age: int):        user = await session.get(cls.model, data_id)        user.username = username        user.profile.age = age        await session.flush()

Массовое обновление через SQLAlchemy update

Пример массового обновления, где выбираются все пользователи с фамилией Smith, и каждому устанавливается возраст в 22 года:

@connection(commit=True)async def update_age_mass(session: AsyncSession, new_age: int, last_name: str):    try:        stmt = (            update(Profile)            .filter_by(last_name=last_name)            .values(age=new_age)        )        result = await session.execute(stmt)        updated_count = result.rowcount        print(f'Обновлено {updated_count} записей')        return updated_count    except SQLAlchemyError as e:        print(f"Error updating profiles: {e}")        raiseasyncio.run(update_age_mass(new_age=22, last_name='Smith'))

Универсальный метод для массового обновления

Опираясь на полученные знания, мы можем разработать универсальный метод, который будет использоваться для массового обновления записей в нашем классе BaseDao.

@classmethodasync def update_many(cls, session: AsyncSession, filter_criteria: BaseModel, values: BaseModel):    filter_dict = filter_criteria.model_dump(exclude_unset=True)    values_dict = values.model_dump(exclude_unset=True)    try:        stmt = (            update(cls.model)            .filter_by(**filter_dict)            .values(**values_dict)        )        result = await session.execute(stmt)        await session.flush()        return result.rowcount    except SQLAlchemyError as e:        print(f"Error in mass update: {e}")        raise

Напишем на основании него функцию.

@connection(commit=True)async def update_age_mass_dao(session: AsyncSession, new_age: int, last_name: str):    filter_criteria = create_model('FilterModel', last_name=(str, ...))    values = create_model('ValuesModel', age=(int, ...))    await ProfileDAO.update_many(session=session,                                 filter_criteria=filter_criteria(last_name=last_name),                                 values=values(age=new_age))asyncio.run(update_age_mass_dao(new_age=33, last_name='Smith'))

Обратите внимание, тут я уже импортировал ProfileDAO. Сама запись получилась достаточно читаемой, а сам код понятный.

Удаление в ООП-стиле

При объектно-ориентированном подходе сначала нужно получить нужный объект из базы данных. Это можно сделать через методы get() или select():

  • Метод get() позволяет быстро найти запись по первичному ключу (например,  id). Это полезно, когда нужно удалить запись, ID которой известен заранее.

  • Метод select() позволяет создавать более сложные запросы, например, на основе нескольких полей, чтобы выбрать нужные объекты.

После получения объекта его можно удалить с помощью session.delete(obj), а затем подтвердить изменения вызовом commit():

# Пример удаления одного объектаobj = await session.get(User, 1)  # Получаем объект по первичному ключуif obj:    await session.delete(obj)  # Удаляем объект    await session.commit()  # Подтверждаем удаление

В этом примере, если объект с ID = 1 существует, он будет удален. Такой подход удобен, если перед удалением нужно выполнить проверки или другую логику.

Если вы выбираете несколько объектов через select, удаление можно выполнить в цикле:

# Получаем объекты, которые нужно удалитьresult = await session.execute(select(User).where(User.age > 30))users_to_delete = result.scalars().all()# Удаляем объекты в циклеfor user in users_to_delete:    await session.delete(user)# Подтверждаем измененияawait session.commit()

Этот способ подходит для выборочного удаления, но менее удобен для массового удаления большого числа записей.

Массовое удаление на уровне базы данных

Когда нужно удалить сразу несколько записей, лучше выполнять массовое удаление на стороне базы данных. Такой подход экономит ресурсы, так как позволяет избежать загрузки объектов в память.

Для этого используется метод delete() на уровне Session, что позволяет SQLAlchemy отправить один запрос на удаление напрямую в базу данных:

# Пример массового удаленияawait session.execute(    delete(User).where(User.age > 30))await session.commit()  # Подтверждаем массовое удаление

В этом примере все записи пользователей старше 30 лет удаляются одним запросом. Этот метод эффективен, когда нет необходимости в проверках перед удалением каждого объекта, и важна производительность.

Практика удаления

Теперь закрепим на практике удаление записей с нашей базы данных, а далее добавим универсальные методы для удаления записей с наших таблиц. Для написания функций для удаления записей создадим новый файл delete_methods_dao.py.

Для начала удалим пользователя с ID 5. Код будет выглядеть так:

from sqlalchemy.ext.asyncio import AsyncSessionfrom dao.session_maker import connectionfrom models import User@connection(commit=True)async def delete_user_by_id(session: AsyncSession, user_id: int):    user = await session.get(User, user_id)    if user:        await session.delete(user)asyncio.run(delete_user_by_id(user_id=5))

Как вы видите, код максимально «питоновский». Строкой user = await session.get(User, user_id) мы получаем объект пользователя, а далее его удаляем. Commit тут не прописан, так как он предусмотрен на уровне сессии в декораторе.

Тут сразу вырисовывается универсальный метод для BaseDao. Давайте его напишем.

@classmethodasync def delete_one_by_id(cls, data_id: int, session: AsyncSession):    # Найти запись по ID    try:        data = await session.get(cls.model, data_id)        if data:            await session.delete(data)            await session.flush()    except SQLAlchemyError as e:        print(f"Error occurred: {e}")        raise

Теперь напишем функцию уже с новым методом:

@connection(commit=True)async def delete_user_by_id_dao(session: AsyncSession, user_id: int):    await UserDAO.delete_one_by_id(session=session, data_id=user_id)asyncio.run(delete_user_by_id_dao(user_id=10))

Теперь напишем метод для удаления записей по условию. Тоже пока на уровне обычной функции, которую затем трансформируем в универсальный метод BaseDao.

Давайте удалим всех пользователей, у которых username начинается на «ja». Код будет выглядеть так:

@connection(commit=True)async def delete_user_username_ja(session: AsyncSession, start_letter: str = 'ja'):    stmt = delete(User).where(User.username.like(f"{start_letter}%"))    await session.execute(stmt)asyncio.run(delete_user_username_ja())

В этом методе сначала мы формируем SQL-запрос для удаления записи с базы данных:

stmt = delete(User).where(User.username.like(f"{start_letter}%"))

Далее, методом session.execute() мы просто выполняем этот запрос. Подход удобен тем, что он гибкий в связке с where / filter_by, и у нас нет необходимости сохранять данные на стороне приложения перед удалением, как в методе get, когда мы выполняем запрос await session.delete(obj).

Напишем универсальный метод для удаления записей с базы данных, но в его основу положим filter_by. Сложные фильтры, такие как where (filter), будем выносить на сторону дочерних классов, когда это будет необходимо.

Метод будет иметь следующий вид:

@classmethodasync def delete_many(cls, session: AsyncSession, filters: BaseModel | None):    if filters:        filter_dict = filters.model_dump(exclude_unset=True)        stmt = delete(cls.model).filter_by(**filter_dict)    else:        stmt = delete(cls.model)    try:        result = await session.execute(stmt)        await session.flush()        return result.rowcount    except SQLAlchemyError as e:        print(f"Error occurred: {e}")        raise

Я сделал метод универсальным и для удаления всех данных из таблицы, так что будьте внимательны. Если, используя его, вы не передадите никаких фильтров (модели Pydantic), то все данные будут удалены.

Проверим.

@connection(commit=True)async def delete_user_by_password(session: AsyncSession, password: str):    filter_criteria = create_model('FilterModel', password=(str, ...))    await UserDAO.delete_many(session=session, filters=filter_criteria(password=password))asyncio.run(delete_user_by_password(password='asdasd'))

Этой записью мы удалили всех пользователей с паролем «asdasd».

Для удаления всех данных о пользователях можно было бы написать такую функцию:

@connection(commit=True)async def delete_all_users(session: AsyncSession):    await UserDAO.delete_many(session=session, filters=None)

Заключение

Сегодня мы подробно разобрали ключевые аспекты работы с SQLAlchemy, сосредоточив внимание на операциях удаления и обновления данных. Я старался донести материал так, чтобы у вас сформировалось не только понимание конкретных действий, но и целостное представление о принципах работы с SQLAlchemy.

Хотя этот фреймворк может показаться многослойным и сложным, SQLAlchemy остаётся гибким и удобным инструментом, который доступен для освоения каждому. Главное — понять основные концепции классов и постепенно углубляться в практическую работу с кодом. Если вы внимательно изучили предыдущие статьи об асинхронном SQLAlchemy 2 и сегодняшнюю, у вас уже есть все необходимые знания для уверенного использования табличных баз данных в Python. Дальнейшее развитие — это лишь вопрос опыта и практики.

Мы обсудили два основных подхода к обновлению и удалению данных:

  1. Объектно‑ориентированный подход, при котором данные загружаются на сторону приложения и обрабатываются как объекты. Этот метод интуитивно понятен и удобен, но может потребовать больше ресурсов.

  2. Прямое выполнение SQL‑запросов на стороне базы данных, что помогает оптимизировать использование ресурсов приложения, особенно при массовых операциях.

Полный исходный код проекта и эксклюзивные материалы доступны в моем Telegram‑канале «Легкий путь в Python», где уже более 1000 единомышленников! А если вам требуется развернуть проект на сервере и доставлять в него обновления тремя командами в IDE, зарегистрируйтесь в Amvera Cloud, и получите 111 руб. на тестирование функционала.

В планах — ещё несколько статей по асинхронной работе с SQLAlchemy. Продолжение этой серии будет зависеть от вашего отклика и интереса.

Спасибо за внимание! До скорых встреч!

© Habrahabr.ru