Автоматизация процессов в DWH с помощью Python и Snowflake

1ae79cd7a361a8c72e2bb9e90e6473d5.png

Привет, Хабр!

Сегодня рассмотрим тему автоматизации процессов в хранилищах данных с помощью мощного тандема — Snowflake и Python. Разберем, как с помощью Python можно легко подключаться к Snowflake, загружать данные, управлять таблицами и автоматизировать повседневные задачи.

Настройка среды

Создаем виртуальное окружение для проекта, чтобы изолировать зависимости:

# Для Windows
python -m venv venv
venv\Scripts\activate

# Для macOS/Linux
python3 -m venv venv
source venv/bin/activate

Устанавливаем официальный коннектор Snowflake для Python с помощью pip:

pip install snowflake-connector-python

Затем настраиваем подключение к базе данных Snowflake:

import snowflake.connector

conn = snowflake.connector.connect(
    user='YOUR_USERNAME',
    password='YOUR_PASSWORD',
    account='YOUR_ACCOUNT_IDENTIFIER',
    warehouse='YOUR_WAREHOUSE',
    database='YOUR_DATABASE',
    schema='YOUR_SCHEMA'
)

# курсор для выполнения операций
cursor = conn.cursor()

Помимо коннектора Snowflake, могут понадобиться следующие библиотеки:

  • pandas;

  • sqlalchemy и snowflake-sqlalchemy: для использования ORM и облегчения взаимодействия с БД;

  • python-dotenv: для управления переменными окружения и безопасного хранения учетных данных;

  • schedule или APScheduler: для планирования и автоматизации задач.

Теперь создаем файл .env и добавляем в него учетные данные:

SNOWFLAKE_USER=YOUR_USERNAME
SNOWFLAKE_PASSWORD=YOUR_PASSWORD
SNOWFLAKE_ACCOUNT=YOUR_ACCOUNT_IDENTIFIER

Затем в скрипте загружаем эти переменные:

import os
from dotenv import load_dotenv

load_dotenv()

user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
account = os.getenv('SNOWFLAKE_ACCOUNT')

#  переменные для подключения
conn = snowflake.connector.connect(
    user=user,
    password=password,
    account=account
)

Синтаксис Python для работы с Snowflake

После настройки окружения и установки необходимых библиотек, можно приступать к работе с Snowflake через Python. Основным инструментом для этого является Snowflake Connector for Python.

Пример подключения и выполнения простого запроса:

import snowflake.connector

# соединение
conn = snowflake.connector.connect(
    user='YOUR_USERNAME',
    password='YOUR_PASSWORD',
    account='YOUR_ACCOUNT_IDENTIFIER'
)

# курсор
cursor = conn.cursor()

# SQL-запрос
cursor.execute("SELECT CURRENT_VERSION()")

# результат
version = cursor.fetchone()
print(f"Текущая версия Snowflake: {version[0]}")

# закрытие соединения
cursor.close()
conn.close()

В этом примере подключаемся к Snowflake, выполняем запрос для получения текущей версии базы данных и выводим результат.

При работе с базами данных важно правильно управлять транзакциями и обрабатывать возможные ошибки. В Snowflake Connector for Python управление транзакциями осуществляется с помощью методов commit() и rollback().

Пример управления транзакциями и обработки исключений:

try:
    # начало транзакции
    conn.cursor().execute("BEGIN")

    # выполнение операций
    cursor.execute("INSERT INTO employees (id, name) VALUES (1, 'Artem')")
    cursor.execute("INSERT INTO employees (id, name) VALUES (2, 'Ivan')")

    # подтверждение транзакции
    conn.cursor().execute("COMMIT")
except Exception as e:
    # откат транзакции в случае ошибки
    conn.cursor().execute("ROLLBACK")
    print(f"Ошибка: {e}")
finally:
    cursor.close()
    conn.close()

В этом коде, если во время выполнения операций произойдет ошибка, транзакция будет откатана, и данные не будут сохранены.

Загрузка данных может быть выполнена с помощью команд PUT и COPY INTO, либо с использованием pandas.

Пример загрузки данных из CSV-файла с помощью COPY INTO:

# загрузка файла в внутренний stage
cursor.execute("""
    PUT file://path/to/data.csv @%your_table
""")

# копирование данных из файла в таблицу
cursor.execute("""
    COPY INTO your_table
    FROM @%your_table/data.csv
    FILE_FORMAT = (TYPE = 'CSV', FIELD_DELIMITER = ',', SKIP_HEADER = 1)
""")

Можно выгрузить данные из таблицы в локальный файл или работать с ними непосредственно в pandas.

Пример выгрузки данных в DataFrame:

import pandas as pd

# запрос и получение данных в DataFrame
df = pd.read_sql("SELECT * FROM your_table", conn)

Используя pandas, можно выполнять различные преобразования данных перед их загрузкой или после выгрузки.

Пример преобразования данных перед загрузкой:

# Чтение данных из CSV
df = pd.read_csv('path/to/data.csv')

# Преобразование данных
df['total_price'] = df['quantity'] * df['unit_price']

# Загрузка данных в Snowflake
from sqlalchemy import create_engine
engine = create_engine('snowflake://...')

df.to_sql('your_table', engine, if_exists='append', index=False)

5 сценариев применения

Автоматическая ежедневная загрузка данных

Допустим, нужно ежедневно загружать новые данные из локального CSV-файла в таблицу Snowflake:

def update_aggregates():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    # Обновление агрегатов
    cursor.execute("""
        INSERT INTO hourly_aggregates
        SELECT CURRENT_TIMESTAMP, COUNT(*)
        FROM transactions
        WHERE transaction_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP)
    """)

    conn.commit()
    cursor.close()
    conn.close()
    print("Агрегированные данные обновлены.")

# Планирование задачи на каждый час
schedule.every().hour.do(update_aggregates)

Обновление агрегированных данных

Необходимость в ежечасном обновление агрегированные данные для отчетности:

def clean_old_data():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    # Удаление старых записей
    cursor.execute("""
        DELETE FROM user_activity
        WHERE activity_date < DATEADD(year, -1, CURRENT_DATE)
    """)

    conn.commit()
    cursor.close()
    conn.close()
    print("Старые данные удалены.")

# Планирование задачи на каждое воскресенье в 03:00
schedule.every().sunday.at("03:00").do(clean_old_data)

Мониторинг и уведомления об ошибках

Сценарий: Отслеживать ошибки в процессе ETL и отправлять уведомления ответственному лицу.

Код:

def backup_critical_tables():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    critical_tables = ['customers', 'orders']
    for table in critical_tables:
        backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
        cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")

    conn.commit()
    cursor.close()
    conn.close()
    print("Резервные копии созданы.")

# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)

Очистка устаревших данных

Допустим, нужно еженедельно удалять данные старше одного года для оптимизации хранилища:

def backup_critical_tables():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    critical_tables = ['customers', 'orders']
    for table in critical_tables:
        backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
        cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")

    conn.commit()
    cursor.close()
    conn.close()
    print("Резервные копии созданы.")

# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)

Автоматическое резервное копирование данных

Сценарий: Ежемесячно создавать резервные копии критически важных таблиц.

Код:

def backup_critical_tables():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    critical_tables = ['customers', 'orders']
    for table in critical_tables:
        backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
        cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")

    conn.commit()
    cursor.close()
    conn.close()
    print("Резервные копии созданы.")

# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)

Заключение

Автоматизация процессов в Snowflake с помощью Python открывает широкие возможности для оптимизации работы с хранилищем данных.

Напоминаю про открытый урок «Эффективный анализ данных: Погружение в мир DWH и аналитической инженерии», который пройдет в Otus 23 сентября. На этом уроке участники узнают:

  • Основы DWH (Data Warehouse): Понимание архитектуры и ключевых компонентов хранилищ данных, а также их роли в бизнес-аналитике.

  • Инструменты и технологии: Обзор современных инструментов для работы с DWH, таких как ETL-процессы, BI-платформы и языки запросов (SQL).

  • Практические кейсы: Разбор реальных примеров использования DWH для принятия обоснованных бизнес-решений и оптимизации процессов.

Записаться можно на странице курса «Data Warehouse Analyst».

© Habrahabr.ru