Проектирование DWH с помощью Data Vault
Привет, Хабр!
Методология Data Vault была разработана Дэном Линстедом в конце 1990-х годов и предлагает гибкий, масштабируемый и проверяемый способ управления данными. Data Vault сочетает в себе самые лучшие черты нормализованных моделей данных и звездных схем.
В этой статье мы рассмотрим эту методологию и как с помощью нее проектировать DWH на примере.
Основные компоненты
Hubs (хабы)
Hubs представляют собой центральные таблицы, которые хранят бизнес-ключи. Бизнес-ключи идентифицируют основные бизнес-сущности, типо как клиенты, продукты или транзакции. Каждый бизнес-ключ является уникальным и неизменяемым идентификатором, который связывает данные между различными системами и модулями.
Хабы состоят из:
Бизнес-ключ — уникальный идентификатор бизнес-объекта.
Суррогатный ключ — внутренний идентификатор, используемый для связи хаба с другими таблицами.
Источник записи — информация о системе, из которой был загружен бизнес-ключ.
Пример хаба — таблица «Customer» H_Customer
, где бизнес-ключом будет уникальный идентификатор клиента Customer_ID
, а суррогатный ключ используется для внутренних связей.
Links (ссылки)
Links представляют отношения между хабами. Т.е таблицы, которые фиксируют взаимодействия или транзакции между бизнес-ключами. Основные элементы links:
Бизнес-ключи — идентификаторы из связанных хабов.
Суррогатный ключ — уникальный ключ для каждой комбинации бизнес-ключей.
Дата и время загрузки — метка времени, указывающая, когда запись была впервые загружена.
Источник записи — информация о системе, из которой была загружена связь.
Ссылки дают возможность добавлять новые отношения между хабами без изменения существующей структуры. Например, ссылка между таблицами «Customer» и «Product» может фиксировать транзакции покупки.
Satellites (сателлиты)
Спутники содержат дескриптивные атрибуты, связанные с бизнес-ключами или отношениями в хабах и ссылках. Спутники позволяют хранить исторические данные, фиксируя изменения атрибутов во времени. Основные элементы спутников:
Идентификатор хаба или ссылки — уникальный ключ, связанный с хабом или ссылкой.
Атрибуты — дескриптивные данные, такие как имя клиента, адрес, характеристики продукта и т.д.
Дата начала и окончания — временные метки, указывающие период действия данных.
Источник записи — информация о системе, из которой были загружены данные.
Например, спутник для хаба «Customer» может содержать атрибуты, как имя клиента, адрес и номер телефона.
Этапы создания Data Vault на примере
Допустим, есть компания, которая занимается электронной коммерцией и она хочет построить хранилище данных для анализа покупательских данных и управления запасами.
Первый шаг — идентификация ключевых бизнес-объектов и создание соответствующих хабов. В нашем случае это будут клиенты, продукты и заказы. Хабы будут содержать уникальные идентификаторы для каждого бизнес-объекта.
Создаем хабы для клиентов:
CREATE TABLE dv.H_Customer (
Customer_HK VARCHAR(64) PRIMARY KEY, -- Hash Key
Customer_ID VARCHAR(64) NOT NULL, -- Business Key
Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp
Record_Source VARCHAR(64) NOT NULL -- Record Source
);
Customer_HK
является хэш-ключом, созданным на основе бизнес-ключа Customer_ID
.
Следующий шаг — создание ссылок для фиксации отношений между бизнес-объектами. Например, ссылка между клиентами и заказами.
Создаем ссылки между клиентами и заказами:
CREATE TABLE dv.L_Customer_Order (
Customer_Order_HK VARCHAR(64) PRIMARY KEY, -- Hash Key
Customer_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Customer
Order_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Order
Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp
Record_Source VARCHAR(64) NOT NULL -- Record Source
);
Customer_Order_HK
— хэш-ключ, созданный на основе комбинации ключей Customer_HK
и Order_HK
.
Спутники позволят содержать дескриптивные атрибуты, связанные с бизнес-объектами. Создадим спутники для хранения информации о клиентах:
CREATE TABLE dv.S_Customer (
Customer_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Customer
Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp
End_DTS TIMESTAMP, -- End Timestamp
Record_Source VARCHAR(64) NOT NULL, -- Record Source
Customer_Name VARCHAR(255), -- Customer Name
Customer_Email VARCHAR(255), -- Customer Email
Customer_Phone VARCHAR(20) -- Customer Phone
);
Customer_HK
используется для связи со соответствующим хабом, а столбцы Customer_Name
, Customer_Email
и Customer_Phone
хранят дескриптивную информацию о клиентах.
После определения структуры необходимо загрузить данные в хабы, ссылки и спутники. Для этого используется процесс ETL.
ETL-процесс для загрузки данных в хаб клиентов:
INSERT INTO dv.H_Customer (Customer_HK, Customer_ID, Load_DTS, Record_Source)
SELECT
MD5(CONCAT(Customer_ID, Record_Source)) AS Customer_HK, -- Creating the Hash Key
Customer_ID,
NOW() AS Load_DTS,
'SourceSystem1' AS Record_Source
FROM
staging.Customer
WHERE
NOT EXISTS (
SELECT 1
FROM dv.H_Customer
WHERE dv.H_Customer.Customer_ID = staging.Customer.Customer_ID
);
Важно мониторить изменения. Для этого используются столбцы Load_DTS
и End_DTS
в спутниках.
Пример обновления спутника при изменении данных о клиентах:
UPDATE dv.S_Customer
SET End_DTS = NOW()
WHERE Customer_HK = :Customer_HK
AND End_DTS IS NULL;
INSERT INTO dv.S_Customer (Customer_HK, Load_DTS, Record_Source, Customer_Name, Customer_Email, Customer_Phone)
VALUES (:Customer_HK, NOW(), 'SourceSystem1', :Customer_Name, :Customer_Email, :Customer_Phone);
Этот процесс сначала закрывает текущую запись, установив End_DTS
, а затем вставляет новую запись с обновленными данными.
Бизнес-слой добавляется поверх сырых данных для включения бизнес-правил, вычислений и агрегатов. слой помогает обеспечить более удобный доступ к данным и повышает их полезность для аналитики и отчетности.
Пример создания бизнес-слоя для вычисления жизненной ценности клиента:
CREATE TABLE dv.B_Customer_CLV (
Customer_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Customer
CLV DECIMAL(18, 2), -- Calculated Lifetime Value
Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp
Record_Source VARCHAR(64) NOT NULL -- Record Source
);
INSERT INTO dv.B_Customer_CLV (Customer_HK, CLV, Load_DTS, Record_Source)
SELECT
H_Customer.Customer_HK,
SUM(O.Order_Amount) AS CLV, -- Summing the order amounts to calculate CLV
NOW() AS Load_DTS,
'BusinessCalculation' AS Record_Source
FROM
dv.H_Customer AS H_Customer
JOIN
dv.L_Customer_Order AS L_Customer_Order
ON H_Customer.Customer_HK = L_Customer_Order.Customer_HK
JOIN
dv.S_Order AS S_Order
ON L_Customer_Order.Order_HK = S_Order.Order_HK
GROUP BY
H_Customer.Customer_HK;
Создали новую таблицу в бизнес-слое, которая вычисляет и хранит жизненную ценность клиентов, основываясь на данных о заказах.
Можно автоматизировать процессы ETL, чтобы минимизировать ручной труд. Например, с помощью Apache Airflow:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def load_customer_data(**kwargs): # загрузка данных в хаб клиентов
import psycopg2
from psycopg2 import sql
import hashlib
conn = psycopg2.connect(
dbname="your_db",
user="your_user",
password="your_password",
host="your_host",
port="your_port"
)
cursor = conn.cursor()
# SQL запрос для загрузки данных в хаб клиентов
load_customer_sql = """
INSERT INTO dv.H_Customer (Customer_HK, Customer_ID, Load_DTS, Record_Source)
SELECT
MD5(CONCAT(Customer_ID, Record_Source)) AS Customer_HK,
Customer_ID,
NOW() AS Load_DTS,
'SourceSystem1' AS Record_Source
FROM
staging.Customer
WHERE
NOT EXISTS (
SELECT 1
FROM dv.H_Customer
WHERE dv.H_Customer.Customer_ID = staging.Customer.Customer_ID
);
"""
cursor.execute(load_customer_sql)
conn.commit()
cursor.close()
conn.close()
def load_order_data(**kwargs): # загрузка данных в хаб заказов
import psycopg2
from psycopg2 import sql
import hashlib
conn = psycopg2.connect(
dbname="your_db",
user="your_user",
password="your_password",
host="your_host",
port="your_port"
)
cursor = conn.cursor()
# SQL запрос для загрузки данных в хаб заказов
load_order_sql = """
INSERT INTO dv.H_Order (Order_HK, Order_ID, Load_DTS, Record_Source)
SELECT
MD5(CONCAT(Order_ID, Record_Source)) AS Order_HK,
Order_ID,
NOW() AS Load_DTS,
'SourceSystem1' AS Record_Source
FROM
staging.Order
WHERE
NOT EXISTS (
SELECT 1
FROM dv.H_Order
WHERE dv.H_Order.Order_ID = staging.Order.Order_ID
);
"""
cursor.execute(load_order_sql)
conn.commit()
cursor.close()
conn.close()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 21),
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG('data_vault_etl', default_args=default_args, schedule_interval='@daily')
start = DummyOperator(task_id='start', dag=dag)
load_customers = PythonOperator(
task_id='load_customer_data',
provide_context=True,
python_callable=load_customer_data,
dag=dag
)
load_orders = PythonOperator(
task_id='load_order_data',
provide_context=True,
python_callable=load_order_data,
dag=dag
)
end = DummyOperator(task_id='end', dag=dag)
start >> load_customers >> load_orders >> end
Можно интегрировать Grafana и Prometheus.
Пример настройки мониторинга с использованием Grafana и Prometheus:
Установка Prometheus для сбора метрик:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'airflow'
static_configs:
- targets: ['localhost:8080']
Настройка Grafana для визуализации данных из Prometheus:
{
"datasource": "Prometheus",
"query": "sum(rate(airflow_task_duration_seconds_sum[1m])) by (task)"
}
В общих чертах проектирование выглядит так.
Не забывайте документировать все процессы, архитектуру и используемые инструменты.
А больше про инструменты архитектуры можно узнать в рамках практических онлайн-курсов. Подробнее в каталоге.