Строим Data Vault на данных TPC-H – Greenplum + dbtVault
Привет! На связи Артемий — энтузиаст в сфере Data Warehousing, Analytics, DataOps.
Уже продолжительное время я занимаюсь моделированием DWH с использованием dbt, и сегодня пришло время познакомить вас с package для построения Data Vault — dbtVault.
В публикации:
Готовим датасет TPC-H
Поднимаем кластер Greenplum в Яндекс.Облаке
Погружаемся в кодогенерацию и макросы dbtVault
Cимулируем инкрементальное наполнение Data Vault
Кодогенерация для Data Vault
Подход к построению Хранилища Данных на основе методолгии Data Vault или гибридных, схожих с ней, обретает новый виток популярности и интереса в последнее время. Это неудивительно — несмотря на сложность и обилие объектов в БД, преимущества и гибкость однозначно перевешивают в долгосрочной перспективе:
Единая логическая модель данных — мыслим бизнес-сущностями, а не системами-источниками
Возможность быстрого, параллельного и инкрементального наполнения Хранилища
Гибкость расширения модели и схемы данных — новые сущности и атрибуты
Хэш-сумма для генерации суррогатных ключей и отслеживания изменений атрибутов
Простая схема Data Vault
И любой, кто когда-либо изучал Data Vault согласится, что обойтись без инструментов кодогенерации будет весьма затруднительно. Инструменты этого класса призваны решить ряд задач:
Генерация кода по шаблонам для десятков и сотен объектов
Управление метаданными
Построение графа зависимостей (DAG)
Документация проекта
Одним из таких инструментов является проект dbtVault, который представляет из себя модуль для dbt.
Готовим исходные данные — TPC-H
Мы будем работать со знаменитым датасетом для сравнения производительности баз данных (benchmarking) TPC-H. Это синтетические данные, описывающие предметную область оптовых поставок-продаж. К тому же, при генерации можно указать scale factor и получить данные в кратном объеме (х10, х100, х1000).
Для тех, кому не терпится приступить к моделированию, я заботливо сгенерировал исходные файлы общим объемом 10Гб и разместил в Yandex Object Storage:
mkdir tpch && cd tpch
# option 1 – curl
curl -O "https://storage.yandexcloud.net/otus-dwh/tpch-dbgen/{customer,lineitem,nation,orders,part,partsupp,region,supplier}.csv"
# option 2 – aws s3
aws configure # enter your Key ID / Secret Key
aws --endpoint-url=https://storage.yandexcloud.net s3 ls s3://otus-dwh/tpch-dbgen/ # list files
aws --endpoint-url=https://storage.yandexcloud.net s3 sync s3://otus-dwh/tpch-dbgen/ . # sync files
В github gist есть инструкция по генерации файлов самостоятельно: Generate data with DBGen
Готовим кластер Greenplum в Яндекс.Облаке
В целях демонстрации я предлагаю использовать кластер Greenplum в Яндекс.Облаке. Следующей конфигурации будет достаточно для работы с нашим датасетом:
Альтернативно — можно пробовать использовать любую другую СУБД семейства PostgreSQL: Redshift, Vertica, Greenplum. Еще более альтернативно — с минимальными адаптациями код может быть исполнен почти в любой СУБД на ваш выбор. Об этом чуть ниже.
Наполним Greenplum данными
Сначала создадим определения для таблиц:
DDL scripts to create table
CREATE TABLE customer
(C_CUSTKEY INT,
C_NAME VARCHAR(25),
C_ADDRESS VARCHAR(40),
C_NATIONKEY INTEGER,
C_PHONE CHAR(15),
C_ACCTBAL DECIMAL(15,2),
C_MKTSEGMENT CHAR(10),
C_COMMENT VARCHAR(117))
WITH (appendonly=true, orientation=column)
DISTRIBUTED BY (C_CUSTKEY);
CREATE TABLE lineitem
(L_ORDERKEY BIGINT,
L_PARTKEY INT,
L_SUPPKEY INT,
L_LINENUMBER INTEGER,
L_QUANTITY DECIMAL(15,2),
L_EXTENDEDPRICE DECIMAL(15,2),
L_DISCOUNT DECIMAL(15,2),
L_TAX DECIMAL(15,2),
L_RETURNFLAG CHAR(1),
L_LINESTATUS CHAR(1),
L_SHIPDATE DATE,
L_COMMITDATE DATE,
L_RECEIPTDATE DATE,
L_SHIPINSTRUCT CHAR(25),
L_SHIPMODE CHAR(10),
L_COMMENT VARCHAR(44))
WITH (appendonly=true, orientation=column, compresstype=ZSTD)
DISTRIBUTED BY (L_ORDERKEY,L_LINENUMBER)
PARTITION BY RANGE (L_SHIPDATE)
(start('1992-01-01') INCLUSIVE end ('1998-12-31') INCLUSIVE every (30),
default partition others);
CREATE TABLE nation
(N_NATIONKEY INTEGER,
N_NAME CHAR(25),
N_REGIONKEY INTEGER,
N_COMMENT VARCHAR(152))
WITH (appendonly=true, orientation=column)
DISTRIBUTED BY (N_NATIONKEY);
CREATE TABLE orders
(O_ORDERKEY BIGINT,
O_CUSTKEY INT,
O_ORDERSTATUS CHAR(1),
O_TOTALPRICE DECIMAL(15,2),
O_ORDERDATE DATE,
O_ORDERPRIORITY CHAR(15),
O_CLERK CHAR(15),
O_SHIPPRIORITY INTEGER,
O_COMMENT VARCHAR(79))
WITH (appendonly=true, orientation=column, compresstype=ZSTD)
DISTRIBUTED BY (O_ORDERKEY)
PARTITION BY RANGE (O_ORDERDATE)
(start('1992-01-01') INCLUSIVE end ('1998-12-31') INCLUSIVE every (30),
default partition others);
CREATE TABLE part
(P_PARTKEY INT,
P_NAME VARCHAR(55),
P_MFGR CHAR(25),
P_BRAND CHAR(10),
P_TYPE VARCHAR(25),
P_SIZE INTEGER,
P_CONTAINER CHAR(10),
P_RETAILPRICE DECIMAL(15,2),
P_COMMENT VARCHAR(23))
WITH (appendonly=true, orientation=column)
DISTRIBUTED BY (P_PARTKEY);
CREATE TABLE partsupp
(PS_PARTKEY INT,
PS_SUPPKEY INT,
PS_AVAILQTY INTEGER,
PS_SUPPLYCOST DECIMAL(15,2),
PS_COMMENT VARCHAR(199))
WITH (appendonly=true, orientation=column)
DISTRIBUTED BY (PS_PARTKEY,PS_SUPPKEY);
CREATE TABLE region
(R_REGIONKEY INTEGER,
R_NAME CHAR(25),
R_COMMENT VARCHAR(152))
WITH (appendonly=true, orientation=column)
DISTRIBUTED BY (R_REGIONKEY);
CREATE TABLE supplier
(S_SUPPKEY INT,
S_NAME CHAR(25),
S_ADDRESS VARCHAR(40),
S_NATIONKEY INTEGER,
S_PHONE CHAR(15),
S_ACCTBAL DECIMAL(15,2),
S_COMMENT VARCHAR(101))
WITH (appendonly=true, orientation=column)
DISTRIBUTED BY (S_SUPPKEY);
Затем наполним таблицы данными. На машине с установленной CLI-утилитой psql загрузим csv-файлы в базу:
export GREENPLUM_URI="postgres://greenplum:@:5432/postgres"
psql $GREENPLUM_URI
\copy customer from '/home/dbgen/tpch-dbgen/data/customer.csv' WITH (FORMAT csv, DELIMITER '|');
\copy lineitem from '/home/dbgen/tpch-dbgen/data/lineitem.csv' WITH (FORMAT csv, DELIMITER '|');
\copy nation from '/home/dbgen/tpch-dbgen/data/nation.csv' WITH (FORMAT csv, DELIMITER '|');
\copy orders from '/home/dbgen/tpch-dbgen/data/orders.csv' WITH (FORMAT csv, DELIMITER '|');
\copy part from '/home/dbgen/tpch-dbgen/data/part.csv' WITH (FORMAT csv, DELIMITER '|');
\copy partsupp from '/home/dbgen/tpch-dbgen/data/partsupp.csv' WITH (FORMAT csv, DELIMITER '|');
\copy region from '/home/dbgen/tpch-dbgen/data/region.csv' WITH (FORMAT csv, DELIMITER '|');
\copy supplier from '/home/dbgen/tpch-dbgen/data/supplier.csv' WITH (FORMAT csv, DELIMITER '|');
Ура! Теперь мы готовы к наполнению Data Vault.
Инициируем проект dbt
1. Склонируйте себе репо с проектом dbt dbtvault_greenplum_demo
git clone https://github.com/kzzzr/dbtvault_greenplum_demo.git
2. Настройте подключение к СУБД Greenplum
dbt будет искать файл с описанием подключения (хост/порт/логин/пасс) в директории ~/.dbt/profiles.yml
. Подробнее можно почитать в документации dbt — Configure your profile. По понятным причинам файл не версионируется в репозитории.
Пример содержимого файла profiles.yml
:
config:
send_anonymous_usage_stats: False
use_colors: True
partial_parse: True
dbtvault_greenplum_demo:
outputs:
dev:
type: postgres
threads: 2
host: {yc-greenplum-host}
port: 5432
user: greenplum
pass: {yc-greenplum-pass}
dbname: postgres
schema: public
target: dev
3. Установите dbt версии 0.19.0
Проект был подготовлен и протестирован именно на этой версии. dbt — это не что иное как python-приложение. Есть множество вариантов установки dbt. Но самый простой вариант — использовать готовый Pipfile в репозитории:
pipenv install
pipenv shell
Проверьте корректность установки и подключение к СУБД:
dbt --version
dbt debug
4. Импортируем модуль dbtVault
Здесь начинается особая магия. Для кодогенерации Data Vault нам понадобится зависимость (модуль или package) dbtVault. Оригинальная версия модуля предназначена для работы только с СУБД Snowflake. Но после ряда нехитрых манипуляций модуль готов к работе с Greenplum (PostgreSQL): 47e0261.
Устанавливаемые модули объявляются в файле packages.yml
проекта:
packages:
# - package: Datavault-UK/dbtvault
# version: 0.7.3
- git: "https://github.com/kzzzr/dbtvault.git"
revision: master
warn-unpinned: false
Установим модуль командой:
dbt deps
Cимулируем инкрементальную загрузку данных для TPC-H
Одно из ключевых преимуществ Data Vault в быстром инкрементальном наполнении детального слоя данных. Из статического датасета TPC-H мы попытаемся симулировать ежедневные инкрементальные пакеты данных, нарезая исходный набор данных по дням.
Всего в TPC-H имеем 4 атрибута с датами:
ORDERDATE (ORDERS)
SHIPDATE (LINEITEM)
RECEIPTDATE (LINEITEM)
COMMITDATE (LINEITEM)
В большинстве случаев факты идут в хрнологическом порядке: ORDERDATE (заказ), SHIPDATE (отправка), RECEIPTDATE (оплата), COMMITDATE (получение). Минималная дата ORDERDATE в датасете — 1992-01-01
, максимальная — 1998-08-02
. 2405 дней — вполне достаточно, чтобы имитировать инкрементальное историческое наполнение.
В итоге, из 8-ми таблиц исходного TPC-H мы формируем слой Raw Stage
состоящий из 3-х таблиц:
raw_inventory — статический датасет складского учета
raw_orders — заказы, которые будем грузить подневно
raw_transactions — транзакции к заказам
dbt run -m tag:raw
Готовим слой Stage
А теперь давайте приступим к подготовке атрибутов, необходимых для наполнения Data Vault.
Хэш-суммы для суррогатных ключей и отслеживания изменений
Переимнование атрибутов
Константы и метаданные
Для подготовки этого слоя моделей мы воспользуемся макросом dbtvault.stage()
:
{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
RECORD_SOURCE: '!RAW_TRANSACTIONS'
LOAD_DATE: (TRANSACTION_DATE + 1 * INTERVAL '1 day')
EFFECTIVE_FROM: 'TRANSACTION_DATE'
hashed_columns:
TRANSACTION_PK:
- 'CUSTOMER_ID'
- 'TRANSACTION_NUMBER'
CUSTOMER_PK: 'CUSTOMER_ID'
ORDER_PK: 'ORDER_ID'
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
В рамках самого кода модели мы задаем метаданные и передаем в качестве аргументов в макрос:
Таблица с исходными данными:
source_model
Расчетные колонки:
RECORD_SOURCE
,LOAD_DATE
,EFFECTIVE_FROM
Колонки с хэш-суммой:
TRANSACTION_PK
,CUSTOMER_PK
,ORDER_PK
С кодом самого макроса stage.sql можно ознакомиться в репозитории dbtVault или в папке ./dbt_modules/dbtvault/macros/
нашего dbt-проекта, как и со всеми остальными макросами, которые помогают строить Data Vault. Это и есть те самые шаблоны для кодогенерации.
В результате, к исходным данным добавляется ряд новых и необходимых атрибутов:
Наполняем Raw Data Vault день за днём
Каждая модель типа hub, link, satellite собирается соответствующим макросом. Пример:
{%- set source_model = "v_stg_orders" -%}
{%- set src_pk = "CUSTOMER_PK" -%}
{%- set src_nk = "CUSTOMERKEY" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) }}
Мы готовы к наполнению нашего детального слоя Data Vault:
dbt run -m tag:raw_vault
Итого:
Обработан пакет данных за одни сутки 1992–01–08
Сформировано 25 моделей за 19 секунд
Среди них: 7 hubs, 8 links, 11 satellites
Обратите внимание на то, что повторный запуск за те же самые сутки будет выполнен почти мгновенно и вставит 0 записей. Это происхоит потому, что таблицы Data Vault наполняются инкрементально и те записи, которые уже попали в детальный слой, вставлены повторно не будут!
Чтобы загрузить инкремент за следующие сутки, просто поменяйте значение переменной load_date
в файле dbt_profiles.yml
на следующий день и запустите загрузку повторно:
# dbt_profiles.yml
vars:
load_date: '1992-01-08' # increment by one day '1992-01-09'
Дальнейшие шаги
1. Посмотрите историю адаптации исходного проекта для нашего демо на Greenplum — commit history:
eafed95 — configure dbt_project.yml for greenplum
aa25600 — configure package (adapted dbt_vault) for greenplum
bba7437 — configure data sources for greenplum
dfc5866 — configure raw layer for greenplum
a97a224 — adapt prepared staging layer for greenplum
А также github gist Data Vault 2.0 + Greenplum + dbtVault assignment.
2. Изучите документацию проекта, визуальный граф моделей (DAG) в автоматически сгенерированном веб-приложении:
dbt docs generate
dbt docs serve
3. Разберитесь с макросами и кодогенерацией dbtVault
4. Ознакомьтесь с литературой по теме:
5. Приходите на live сессии и вебинары
Я и мои коллеги стремимся делиться своим лучшим опытом и знаниями в рамках занятий на курсах Data Engineer и Analytics Engineer:
Практики от лидеров отрасли в рамках живого общения
Ваучер Яндекс.Облака на все эксперименты и задания
3 вебинара в программе только по теме Data Vault
Движуха в Slack и сообщество
Спасибо за внимание!