Проектирование DWH с помощью Data Vault

7a23af439bb5df2028470dfbb1e9a26f.jpg

Привет, Хабр!

Методология Data Vault была разработана Дэном Линстедом в конце 1990-х годов и предлагает гибкий, масштабируемый и проверяемый способ управления данными. Data Vault сочетает в себе самые лучшие черты нормализованных моделей данных и звездных схем.

В этой статье мы рассмотрим эту методологию и как с помощью нее проектировать DWH на примере.

Основные компоненты

Hubs (хабы)

Hubs представляют собой центральные таблицы, которые хранят бизнес-ключи. Бизнес-ключи идентифицируют основные бизнес-сущности, типо как клиенты, продукты или транзакции. Каждый бизнес-ключ является уникальным и неизменяемым идентификатором, который связывает данные между различными системами и модулями.

Хабы состоят из:

  • Бизнес-ключ — уникальный идентификатор бизнес-объекта.

  • Суррогатный ключ — внутренний идентификатор, используемый для связи хаба с другими таблицами.

  • Источник записи — информация о системе, из которой был загружен бизнес-ключ.

Пример хаба — таблица «Customer» H_Customer, где бизнес-ключом будет уникальный идентификатор клиента Customer_ID, а суррогатный ключ используется для внутренних связей.

Links (ссылки)

Links представляют отношения между хабами. Т.е таблицы, которые фиксируют взаимодействия или транзакции между бизнес-ключами. Основные элементы links:

  • Бизнес-ключи — идентификаторы из связанных хабов.

  • Суррогатный ключ — уникальный ключ для каждой комбинации бизнес-ключей.

  • Дата и время загрузки — метка времени, указывающая, когда запись была впервые загружена.

  • Источник записи — информация о системе, из которой была загружена связь​.

Ссылки дают возможность добавлять новые отношения между хабами без изменения существующей структуры. Например, ссылка между таблицами «Customer» и «Product» может фиксировать транзакции покупки.

Satellites (сателлиты)

Спутники содержат дескриптивные атрибуты, связанные с бизнес-ключами или отношениями в хабах и ссылках. Спутники позволяют хранить исторические данные, фиксируя изменения атрибутов во времени. Основные элементы спутников:

  • Идентификатор хаба или ссылки — уникальный ключ, связанный с хабом или ссылкой.

  • Атрибуты — дескриптивные данные, такие как имя клиента, адрес, характеристики продукта и т.д.

  • Дата начала и окончания — временные метки, указывающие период действия данных.

  • Источник записи — информация о системе, из которой были загружены данные​.

Например, спутник для хаба «Customer» может содержать атрибуты, как имя клиента, адрес и номер телефона.

Этапы создания Data Vault на примере

Допустим, есть компания, которая занимается электронной коммерцией и она хочет построить хранилище данных для анализа покупательских данных и управления запасами.

Первый шаг — идентификация ключевых бизнес-объектов и создание соответствующих хабов. В нашем случае это будут клиенты, продукты и заказы. Хабы будут содержать уникальные идентификаторы для каждого бизнес-объекта.

Создаем хабы для клиентов:

CREATE TABLE dv.H_Customer (
    Customer_HK VARCHAR(64) PRIMARY KEY, -- Hash Key
    Customer_ID VARCHAR(64) NOT NULL,    -- Business Key
    Load_DTS TIMESTAMP NOT NULL,         -- Load Timestamp
    Record_Source VARCHAR(64) NOT NULL   -- Record Source
);

Customer_HK является хэш-ключом, созданным на основе бизнес-ключа Customer_ID.

Следующий шаг — создание ссылок для фиксации отношений между бизнес-объектами. Например, ссылка между клиентами и заказами.

Создаем ссылки между клиентами и заказами:

CREATE TABLE dv.L_Customer_Order (
    Customer_Order_HK VARCHAR(64) PRIMARY KEY, -- Hash Key
    Customer_HK VARCHAR(64) NOT NULL,          -- Foreign Key to H_Customer
    Order_HK VARCHAR(64) NOT NULL,             -- Foreign Key to H_Order
    Load_DTS TIMESTAMP NOT NULL,               -- Load Timestamp
    Record_Source VARCHAR(64) NOT NULL         -- Record Source
);

Customer_Order_HK — хэш-ключ, созданный на основе комбинации ключей Customer_HK и Order_HK.

Спутники позволят содержать дескриптивные атрибуты, связанные с бизнес-объектами. Создадим спутники для хранения информации о клиентах:

CREATE TABLE dv.S_Customer (
    Customer_HK VARCHAR(64) NOT NULL,    -- Foreign Key to H_Customer
    Load_DTS TIMESTAMP NOT NULL,         -- Load Timestamp
    End_DTS TIMESTAMP,                   -- End Timestamp
    Record_Source VARCHAR(64) NOT NULL,  -- Record Source
    Customer_Name VARCHAR(255),          -- Customer Name
    Customer_Email VARCHAR(255),         -- Customer Email
    Customer_Phone VARCHAR(20)           -- Customer Phone
);

Customer_HK используется для связи со соответствующим хабом, а столбцы Customer_Name, Customer_Email и Customer_Phone хранят дескриптивную информацию о клиентах.

После определения структуры необходимо загрузить данные в хабы, ссылки и спутники. Для этого используется процесс ETL.

ETL-процесс для загрузки данных в хаб клиентов:

INSERT INTO dv.H_Customer (Customer_HK, Customer_ID, Load_DTS, Record_Source)
SELECT 
    MD5(CONCAT(Customer_ID, Record_Source)) AS Customer_HK, -- Creating the Hash Key
    Customer_ID,
    NOW() AS Load_DTS,
    'SourceSystem1' AS Record_Source
FROM 
    staging.Customer
WHERE 
    NOT EXISTS (
        SELECT 1 
        FROM dv.H_Customer 
        WHERE dv.H_Customer.Customer_ID = staging.Customer.Customer_ID
    );

Важно мониторить изменения. Для этого используются столбцы Load_DTS и End_DTS в спутниках.

Пример обновления спутника при изменении данных о клиентах:

UPDATE dv.S_Customer
SET End_DTS = NOW()
WHERE Customer_HK = :Customer_HK
  AND End_DTS IS NULL;

INSERT INTO dv.S_Customer (Customer_HK, Load_DTS, Record_Source, Customer_Name, Customer_Email, Customer_Phone)
VALUES (:Customer_HK, NOW(), 'SourceSystem1', :Customer_Name, :Customer_Email, :Customer_Phone);

Этот процесс сначала закрывает текущую запись, установив End_DTS, а затем вставляет новую запись с обновленными данными.

Бизнес-слой добавляется поверх сырых данных для включения бизнес-правил, вычислений и агрегатов. слой помогает обеспечить более удобный доступ к данным и повышает их полезность для аналитики и отчетности.

Пример создания бизнес-слоя для вычисления жизненной ценности клиента:

CREATE TABLE dv.B_Customer_CLV (
    Customer_HK VARCHAR(64) NOT NULL,    -- Foreign Key to H_Customer
    CLV DECIMAL(18, 2),                  -- Calculated Lifetime Value
    Load_DTS TIMESTAMP NOT NULL,         -- Load Timestamp
    Record_Source VARCHAR(64) NOT NULL   -- Record Source
);

INSERT INTO dv.B_Customer_CLV (Customer_HK, CLV, Load_DTS, Record_Source)
SELECT 
    H_Customer.Customer_HK,
    SUM(O.Order_Amount) AS CLV,        -- Summing the order amounts to calculate CLV
    NOW() AS Load_DTS,
    'BusinessCalculation' AS Record_Source
FROM 
    dv.H_Customer AS H_Customer
JOIN 
    dv.L_Customer_Order AS L_Customer_Order
    ON H_Customer.Customer_HK = L_Customer_Order.Customer_HK
JOIN 
    dv.S_Order AS S_Order
    ON L_Customer_Order.Order_HK = S_Order.Order_HK
GROUP BY 
    H_Customer.Customer_HK;

Создали новую таблицу в бизнес-слое, которая вычисляет и хранит жизненную ценность клиентов, основываясь на данных о заказах.

Можно автоматизировать процессы ETL, чтобы минимизировать ручной труд. Например, с помощью Apache Airflow:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def load_customer_data(**kwargs): # загрузка данных в хаб клиентов
    import psycopg2
    from psycopg2 import sql
    import hashlib

    conn = psycopg2.connect(
        dbname="your_db",
        user="your_user",
        password="your_password",
        host="your_host",
        port="your_port"
    )
    cursor = conn.cursor()

    # SQL запрос для загрузки данных в хаб клиентов
    load_customer_sql = """
    INSERT INTO dv.H_Customer (Customer_HK, Customer_ID, Load_DTS, Record_Source)
    SELECT 
        MD5(CONCAT(Customer_ID, Record_Source)) AS Customer_HK, 
        Customer_ID,
        NOW() AS Load_DTS,
        'SourceSystem1' AS Record_Source
    FROM 
        staging.Customer
    WHERE 
        NOT EXISTS (
            SELECT 1 
            FROM dv.H_Customer 
            WHERE dv.H_Customer.Customer_ID = staging.Customer.Customer_ID
        );
    """

    cursor.execute(load_customer_sql)
    conn.commit()
    cursor.close()
    conn.close()

def load_order_data(**kwargs):  # загрузка данных в хаб заказов
    import psycopg2
    from psycopg2 import sql
    import hashlib

    conn = psycopg2.connect(
        dbname="your_db",
        user="your_user",
        password="your_password",
        host="your_host",
        port="your_port"
    )
    cursor = conn.cursor()

    # SQL запрос для загрузки данных в хаб заказов
    load_order_sql = """
    INSERT INTO dv.H_Order (Order_HK, Order_ID, Load_DTS, Record_Source)
    SELECT 
        MD5(CONCAT(Order_ID, Record_Source)) AS Order_HK, 
        Order_ID,
        NOW() AS Load_DTS,
        'SourceSystem1' AS Record_Source
    FROM 
        staging.Order
    WHERE 
        NOT EXISTS (
            SELECT 1 
            FROM dv.H_Order 
            WHERE dv.H_Order.Order_ID = staging.Order.Order_ID
        );
    """

    cursor.execute(load_order_sql)
    conn.commit()
    cursor.close()
    conn.close()


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 5, 21),
    'email_on_failure': False,
    'email_on_retry': False,
}

dag = DAG('data_vault_etl', default_args=default_args, schedule_interval='@daily')

start = DummyOperator(task_id='start', dag=dag)

load_customers = PythonOperator(
    task_id='load_customer_data',
    provide_context=True,
    python_callable=load_customer_data,
    dag=dag
)

load_orders = PythonOperator(
    task_id='load_order_data',
    provide_context=True,
    python_callable=load_order_data,
    dag=dag
)

end = DummyOperator(task_id='end', dag=dag)

start >> load_customers >> load_orders >> end

Можно интегрировать Grafana и Prometheus.

Пример настройки мониторинга с использованием Grafana и Prometheus:

Установка Prometheus для сбора метрик:

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'airflow'
    static_configs:
      - targets: ['localhost:8080']

Настройка Grafana для визуализации данных из Prometheus:

{
  "datasource": "Prometheus",
  "query": "sum(rate(airflow_task_duration_seconds_sum[1m])) by (task)"
}

В общих чертах проектирование выглядит так.

Не забывайте документировать все процессы, архитектуру и используемые инструменты.

А больше про инструменты архитектуры можно узнать в рамках практических онлайн-курсов. Подробнее в каталоге.

© Habrahabr.ru