Инфраструктура для Data-Engineer DBT
Введение
dbt является мощным фреймворком, который включает в себя два популярных языка: SQL + Python.
При помощи dbt можно создавать разные »слои» данных или выделить dbt только под один слой, к примеру dm
.
При помощи понятного и всем известного SQL интерфейса можно создавать консистентные модели, которыми смогут пользоваться и понимать все участники data-команды. Ну также по моему мнению плюс dbt — возможность дать аналитикам самостоятельно создавать dm
слой со своей логикой, что освобождает ресурс data-команды. Но стоит иметь ввиду, что нужен хороший процесс доставки кода: Code Review, паттерны, линтеры, принципы и прочее.
Стоит также упомянуть, что сервис может работать как отдельный инстанс через докер, так и питон пакетом. Все дальнейшие примеры я буду проводить через питон пакет.
Инициализация проекта
Код всего проекта вы можете найти в моём репозитории.
В начале поднимем нужные нам сервисы используя команду: docker-compose up -d
После этого давайте настроим локальное Python окружение для дальнейшей работы:
python3.12 -m venv venv && \
source venv/bin/activate && \
pip install --upgrade pip && \
pip install poetry && \
poetry lock && \
poetry install
После того, когда мы установили все зависимости, мы можем проверить версию dbt командой: dbt --version
.
Следующим этапом при работе с dbt является — инициализация проекта, поэтому выполним команду: dbt init pg_analytics --profiles-dir .
и укажем все необходимые параметры, которые он будет спрашивать во время создания проекта. Наш проект я назвал: pg_analytics
.
Важно: При инициализации проекта вы можете не указывать параметр --profiles-dir
.
Если не указать --profiles-dir
, то при инициализации проекта у нас создатся глобальный profiles.yml
, который будет хранится в ~/.dbt/
.
Но для демонстрации мы не будем его использовать, а воспользуемся локальным profiles.yml
внутри самого проекта. Поэтому мы выполним команду dbt init pg_analytics --profiles-dir .
чтобы создать profiles.yml
внутри нашего проекта.
Важно: рекомендуется создавать profiles.yml
вне проекта, чтобы все параметры для подключения держать вне проекта и также profiles.yml
будет содержать информацию по всем проектам, если у вас их больше одного.
После инициализации мы получим следующий profiles.yml
:
pg_analytics:
outputs:
dev:
dbname: dbt
host: localhost
pass: postgres
port: 5432
schema: dbt
threads: 4
type: postgres
user: postgres
target: dev
Проверить коннект можно командой dbt debug
(предварительно зайдя в проект pg_analytics
).
Также стоит обратить, что когда мы создали проект, то он содержит стандартный набор директорий:
analyses
logs
macros
models
seeds
snapshots
target
tests Более подробно о логике и работе директорий вы можете узнать из документации. Ниже мы рассмотрим часть из них на примерах
И файлов:
Стандартные примеры dbt
Все настройки завершены и поэтому мы можем запустить наш проект. В начале у нас в папке models хранятся модели, которые будут созданы изначально. На них проверим корректность работы dbt.
Для этого выполним команду: dbt run
и увидим логи нашей команды:
08:01:36 Running with dbt=1.8.8
08:01:36 Registered adapter: postgres=1.8.2
08:01:37 Unable to do partial parsing because saved manifest not found. Starting full parse.
08:01:37 Found 2 models, 4 data tests, 423 macros
08:01:37
08:01:37 Concurrency: 4 threads (target='dev')
08:01:37
08:01:37 1 of 2 START sql table model dbt.my_first_dbt_model ............................ [RUN]
08:01:37 1 of 2 OK created sql table model dbt.my_first_dbt_model ....................... [SELECT 2 in 0.07s]
08:01:37 2 of 2 START sql view model dbt.my_second_dbt_model ............................ [RUN]
08:01:37 2 of 2 OK created sql view model dbt.my_second_dbt_model ....................... [CREATE VIEW in 0.06s]
08:01:37
08:01:37 Finished running 1 table model, 1 view model in 0 hours 0 minutes and 0.27 seconds (0.27s).
08:01:37
08:01:37 Completed successfully
08:01:37
08:01:37 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
И если сейчас подключиться к нашему PostgreSQL, то мы сможем увидеть, что у нас создалась схема dbt
и в ней создалась таблица my_first_dbt_model
и представление my_second_dbt_model
.
Также мы можем »скомпилировать» наши модели и сразу провести тестирование.
А сейчас давайте выполним команду dbt build
и мы увидим, что у нас часть процессов не прошла, потому что один из тестов провален.
Мы видим, что у нас провален этап not_null_my_first_dbt_model_id
чтобы это исправить, уберём комментарий в модели where id is not null
и если заново выполнить build
проекта, то у нас пройдут все тесты и все этапы.
Практика
Всё, мы убедились, что наш dbt работает, поэтому давайте создадим нашу первую модель.
Но в начале нам необходимо запустить файл create_and_fill_users.py
из папки py_files
.
После создания и заполнения таблицы мы можем над ней работать при помощи dbt-моделей.
Для этого нам нужно создать новую папку dm, которая будет содержать dm-таблицы, затем создадим fct_registred_by_date.sql
, в котором напишем простой запрос:
SELECT
created_at::date AS registred_date,
count(*)
FROM
users AS u
GROUP BY
1
Важно: Как вы назовёте файл, так и будет называться ваша таблица, поэтому выбирайте правильно.
Стоит отметить, что основной dbt-проекта является dbt_project.yml
и в нём указывается вся информация по проекту: настройки компиляции, материализация таблиц, какие папки включать и пр.
И так как мы добавили новую директорию dm, то её необходимо добавить в проект, чтобы модели внутри директории запускались.
Для этого необходимо в dbt_project.yml
добавить нашу директорию и указать то, как мы хотим видеть наши модели в БД. Я сделаю таблицы с уровнем материализации table
:
...
dm:
+materialized: table
Более подробно о материализации.
Дополнительно мы можем прописать мета-данные для нашей модели, чтобы в дальнейшем было удобнее ориентироваться в ней.
Для этого создадим файл schema.yaml
:
models:
- name: fct_registred_by_date
description: "Количество регистраций за день"
columns:
- name: registred_date
description: "Дата регистрации"
- name: count
description: "Количество регистраций"
Важно: Если вы некорректно укажите имя столбца, к примеру вместо name
укажите namefoo
, то у вас не отобразится никаких исключений.
Теперь если выполнить команду dbt run
, то получим следующее сообщение:
...
OK created sql table model dbt.fct_registred_by_date ...[SELECT 365 in 0.37s]
...
И теперь наша таблица fct_registred_by_date
готова к использованию.
Стоит обратить внимание, что таблица заполнила значения за текущий момент и если в таблице users
появятся новые записи, то мы в нашей витрине этого не увидим. Поэтому это всё нужно оркестрировать.
Оркестрация доступна в dbt Cloud или сторонними решениями. Как одно из популярных решений — это объединение dbt + Airflow. Об одной из таких реализаций вы можете ознакомиться из доклада: dbt — ядро современной платформы данных.
Теперь давайте посмотрим как будет отрабатывать модель, если в таблице users
появятся новые записи.
Для этого выполните файл add_new_month.py
и у нас появятся записи за январь 2025-го.
И если после этого снова выполнить команду dbt run
, то получим следующее сообщение:
...
OK created sql table model dbt.fct_registred_by_date ... [SELECT 396 in 0.08s]
...
Важно: он перезаписывает всю таблицу, в дальнейшем мы сделаем заполнение недостающих данных.
Для того, чтобы исключить полное перезаписывание витрины в dbt существует формат материализации incremental
. Давайте им воспользуемся и создадим для этого новый файл fct_registred_by_date_incremental.sql
:
{{
config(
materialized='incremental',
unique_key='registred_date'
)
}}
SELECT
created_at::date AS registred_date,
count(*)
FROM
users AS u
{% if is_incremental() %}
WHERE created_at >= (SELECT coalesce(max(registred_date), '1900-01-01') FROM {{ this }})
{% endif %}
GROUP BY
1
На что здесь стоит обратить внимание:
Ранее в
dbt_project.yml
мы указали, что все модели в директори dm будут создаваться какtable
. Но в данном случае мы переопределяем основной config наincremental
.Для
incremental
является обязательным указаниемunique_key
, по которому он как раз и будет выполнять вставки.
Теперь если мы повторим действия над таблицей users
как ранее, то получим корректный результат. Наша таблица после каждого выполнения команды dbt run
будет инкрементально заполняться.
Больше подробностей и тонкостей инкрементальной материализации описано в документации.
Source
Для корректного обращения к источникам давайте их пропишем в нашей папке models, чтобы мы могли на них ссылаться через Jinja-шаблоны, а не прописывая их самостоятельно.
Для этого необходимо создать файл source.yml
и в нём прописать конфигурацию для источников:
version: 2
sources:
- name: public
database: dbt
schema: public
tables:
- name: users
И теперь давайте попробуем воспользоваться данным источником, чтобы скопировать таблицу users
к себе в схему dbt
.
Для этого создадим файл dbt_source_users.sql
в новой директории as_is внутри models и добавим такой код:
SELECT * FROM {{ source("public", "users") }}
Данный код на первый взгляд может показаться не понятным, но сейчас мы его скомпилируем и получим привычный нам SQL.
Давайте выполним команду: dbt compile
и если у нас всё будет хорошо, то в папке target мы сможем найти наш скомпилированный файл, который выглядит следующим образом:
SELECT * FROM "dbt"."public"."users"
Если мы возьмём этот запрос и выполним его в нашей БД, то получим корректный результат. Теперь мы можем запустить эту модель в БД, чтобы она выполнила заложенную логику.
Важно: компилятор вызовет исключение и укажет на ошибку, если ваш запрос составлен некорректно.
Выполним команду: dbt run
и увидим, что у нас создалось представление, а это потому, что мы не указали как материализовывать наши модели из папки as_is, давайте укажем в dbt_project.yml
, по какой логике должна работать наша модель:
as_is:
+materialized: table
И если ещё раз выполнить команду dbt run
, то мы увидим, что у нас создалась таблица, а не представление (в логах будет указано SELECT ***
, а не CREATE VIEW
).
В дальнейшем все наши источники, которые будут использованы во время проекта будем указывать внутри source.yml
Snapshots
Для аналитики довольным важным моментом является — »путешествие во времени». Что это означает? Это означает, что аналитик желает посмотреть какое у нас было состояние по пользователям/продажам/прочее на тот или иной момент.
Поэтому наша задача как дата-инженеров организовать такую возможность и её можно реализовать при помощи SCD2. Более подробно о SCD можете почитать по ссылке.
При инициализации проекта у нас создалась папка snapshots и мы в ней будем создавать нашу SCD2 таблицу.
В начале нам нужно создать файл users_scd2.sql
:
{% snapshot users_timestamp %}
{{
config(
target_database='dbt',
target_schema='snapshots',
strategy='timestamp',
unique_key='id',
updated_at='updated_at',
)
}}
SELECT * FROM {{ source('public', 'users') }}
{% endsnapshot %}
Здесь мы также используем Jinja-шаблоны:
Указываем, что это таблица типа
snapshot
.Указываем все необходимые атрибуты для работы с таблицей.
Указываем запрос на извлечение данных из таблицы. Здесь также используем наш
source
, который мы определили ранее.
После создания данного файла мы можем выполнить команду: dbt snapshot
и увидим в логе SELECT ***
— это означает, что таблица создана и перенесла все записи из таблицы users.
Давайте теперь изменим какую-нибудь запись, чтобы увидеть работу SCD2.
Так как в поле у нас хранится uuid и он каждый раз различный, вы можете выбрать любого »пользователя» для изменения.
UPDATE users SET (first_name, updated_at) = ('dummy', now())
WHERE id = '86433147-fafe-4d74-b67c-59c3a283974c'
Мы изменили имя пользователя и чтобы создать новую запись в SCD-таблице необходимо снова выполнить команду: dbt snapshot
. После выполнения мы увидим, что у нас добавилась новая строка в таблицу:
id | created_at | updated_at | first_name | dbt_scd_id | dbt_updated_at | dbt_valid_from | dbt_valid_to |
---|---|---|---|---|---|---|---|
86433147-fafe-4d74-b67c-59c3a283974c | 2024–08–16 17:31:30.670 +0000 | 2024–02–29 14:53:29.191 +0000 | Алина | 0cea94932fa3921c59e771f5c0b53bb4 | 2024–02–29 14:53:29.191 +0000 | 2024–02–29 14:53:29.191 +0000 | 2024–10–26 03:07:44.519 +0000 |
86433147-fafe-4d74-b67c-59c3a283974c | 2024–08–16 17:31:30.670 +0000 | 2024–10–26 03:07:44.519 +0000 | dummy | cf61eb52bac629a0af70cd2c46e63f8a | 2024–10–26 03:07:44.519 +0000 | 2024–10–26 03:07:44.519 +0000 |
Теперь наша таблица users
будет версионироваться после каждого запуска команды dbt snapshot
.
Более подробно о возможностях snapshots
описано в официальной документации.
Функция ref
Ref помогает нам выстроить data lineage, для того чтобы он корректно отображался на графе и также мы могли правильно запускать/перезапускать наши модели.
Важным моментом здесь является то, что мы должны »затянуть» к себе в проект какую-то модель, чтобы ей в дальней пользоваться.
Основное отличие от source
здесь в том, что source
— это просто ссылка на объект в БД, а ref
— это ссылка на объект внутри dbt-проекта.
Давайте рассмотрим это на примере и для этого создадим файл count_users_by_city.sql
в папке dm со следующим кодом:
SELECT
city,
count(*)
FROM
{{ ref('dbt_source_users') }}
GROUP BY
1
В данном примере мы используем ранее созданную таблицу dbt_source_users
с пользователями. Если мы захотим обратиться изначально к нашей таблице указав {{ ref('users') }}
, то получим исключение:
Compilation Error
Model 'model.pg_analytics.count_users_by_city' (models/dm/count_users_by_city.sql) depends on a node named 'users' which was not found
Важно: Во время работы с ref
мы должны использовать только dbt-модели.
Более подробно о ref
функции описано в документации.
WEB UI
Мы сейчас с вами построили несколько моделей и теперь мы можем посмотреть взаимосвязи между ними.
Для этого нам сначала нужно выполнить команду: dbt docs generate
, которая выстроит зависимости между моделями и создаст документацию по нашему проекту.
Для того, чтобы запустить наш веб-сервер необходимо выполнить команду: dbt docs serve
. После этого страница откроется автоматически, а если не открылась, то будет доступна по адресу: http://localhost:8080/#!/overview
После запуска нашего сервера мы можем изучить список взаимосвязей в проекте. Что мы имеем сейчас:
Здесь стоит обратить внимание на висящую ноду fct_registred_by_date
, потому что она у нас была создана таким запросом:
SELECT
created_at::date AS registred_date,
count(*)
FROM
users AS u
GROUP BY
1
И как мы видим в данном запросе у нас не используется ни source
ни ref
. И поэтому в случае дебага или проверки зависимостей нужно будет самостоятельно изучать dbt-модели.
При работе с графом зависимостей часто говорят о запуске модели и её зависимостей. В dbt это реализовано довольно гибко. О чём описано в документации.
Но из основного я бы отметил, что мы можем регулировать что запускать при запуске модели. К примеру:
Запустить саму модель:
dbt run -m name_of_model
Запустить саму модель и зависимости ДО:
dbt run -m +name_of_model
Запустить саму модель и зависимости ПОСЛЕ:
dbt run -m name_of_model+
Запустить саму модель и зависимости ДО и ПОСЛЕ:
dbt run -m +name_of_model+
Теги
Помимо деления по папкам мы также можем добавить деление по tags. Для этого необходимо внутри самого главного конфиг файла dbt_project.yml
задать tags:
models:
pg_analytics:
example:
+materialized: view
dm:
+materialized: table
tags: "marts"
as_is:
+materialized: table
И теперь благодаря такой настройке мы можем запускать модели по tags такой командой: dbt run -m tag:marts
Более подробно о работе tags описано в документации.
Тесты
Тесты — это неотъемлемая часть в дата-инженерии и аналитике. Наша задача — сделать так, чтобы данным могли доверять.
И один из способов обеспечить доверие данным — это покрыть их тестами и проверками.
Давайте начнём со стандартных тестов, который предоставляет dbt »из коробки»:
unique
: проверяет, что значения должны быть уникальным, без дублей.not_null
: проверяет, что в колонке нетNULL
значений.accepted_values
: проверяет, что колонка содержит нужные значения. К примеру: для колонкиorders
должны быть значения:'placed'
,'shipped'
,'completed'
,'returned'
relationships
: Проверяет, что внешний ключ целостный и консистентный и у нас в витрине или другой модели не появилось »новых» ключей.
Все тесты указываются для колонок, к примеру так можно указать тесты для нашей витрины fct_registred_by_date
:
version: 2
models:
- name: fct_registred_by_date
description: "Количество регистраций за день"
columns:
- name: registred_date
description: "Дата регистрации"
data_tests:
- unique
- not_null
- name: count
description: "Количество регистраций"
Все тесты указываются в schema.yml
для нужной нам модели и нужной колонки
Но помимо стандартных тестов мы можем самостоятельно писать тесты для наших моделей. Основная логика dbt тестов — запрос ничего не должен возвращать. Если тест вернул хотя бы одну строку, то тест считается проваленным.
Давайте сделаем свой not_null
для колонки count
таблицы fct_registred_by_date
.
Для этого создадим файл not_null_count_fct_registred_by_date.sql
в папке tests со следующим кодом:
SELECT
*
FROM
{{ ref('fct_registred_by_date') }}
WHERE
"count" IS NULL
И если мы снова запустим наши тесты командой dbt test
, то сможем увидеть, что наш тест запустился и прошёл успешно.
...
test not_null_count_fct_registred_by_date ... [RUN]
...
not_null_count_fct_registred_by_date ... [PASS in 0.06s]
...
Макросы
Макросы — это создание некого шаблона, который мы сможем использовать в нашем коде при помощи Jinja.
Давайте создадим наш первый макрос. Для этого создадим файл cast_to_date.sql
в папке macros:
{% macro cast_to_date(column_name) %}
{{ column_name }}::date
{% endmacro %}
И теперь давайте создадим ещё раз нашу таблицу fct_registred_by_date
, но уже с использованием макроса:
SELECT
{{ cast_to_date('created_at') }} AS registred_date,
count(*)
FROM
users AS u
GROUP BY
1
И если сейчас мы выполним команду: dbt compile
, то получим такой скомпилированный файл (форматирование оставил, как сделал dbt):
SELECT
created_at::date
AS registred_date,
count(*)
FROM
users AS u
GROUP BY
1
Важно: Не стоит делать сильно сложные макросы, потому что их довольно сложно дебажить и также стоит учитывать, что макрос будет использоваться в разных частях проектах и при дальнейших его изменениях можно сломать часть бизнес-логики.
Более подробно о работе и возможностях макросов описано в документации.
Пакеты
Для dbt существует пакетный менеджер, как и для Python. Со всеми пакетами вы можете ознакомиться на официальном dbt-hub.
Но мы сейчас рассмотрим самый популярный пакет для dbt — dbt_utils.
Данный пакет предоставляет: новый тесты для наших моделей, макросы для создания моделей и разные полезные инструменты для более комфортной работы с dbt.
Для того, чтобы установить dbt_utils необходимо в корне нашего проекта создать файл packages.yml
и указать следующие параметры:
packages:
- package: dbt-labs/dbt_utils
version: 1.3.0
И затем необходимо выполнить команду: dbt deps
Теперь мы имеем возможность использовать функции dbt_utils, давайте рассмотрим одну из — dbt_utils.star
.
Для этого примера мы продублируем таблицу fct_registred_by_date
. Давайте создадим файл dbt_utils_source_fct_registred_by_date.sql
и напишем такой код:
SELECT
{{ dbt_utils.star(ref("fct_registred_by_date")) }}
FROM
{{ ref("fct_registred_by_date") }}
И если сейчас выполнить команду: dbt compile
, то получим такой код:
SELECT
"registred_date",
"count"
FROM
"dbt"."dbt"."fct_registred_by_date"
Он сделал нам перечисление всех колонок из таблицы, а не использовал стандартную команду: *
.
Если мы хотим получать не все колонки, то можно их исключить следующим образом:
SELECT
{{ dbt_utils.star(ref("fct_registred_by_date"), except=["count"]) }}
FROM
{{ ref("fct_registred_by_date") }}
И после компиляции получим такой код:
SELECT
"registred_date"
FROM
"dbt"."dbt"."fct_registred_by_date"
Понятное дело, что я привожу простой пример, на практике таблицы могут иметь более одной колонки. В данном случае я хотел продемонстрировать возможности dbt_utils, которых великое множество.
Прочие коннекты
На текущий момент dbt имеет множество коннектов к различным системам: OLAP БД, OLTP БД, Data Lake.
Более подробно о подключении дата платформ в dbt описано в документации.
Давайте попробуем создать подключение к DuckDB.
Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей — Всё что нужно знать про DuckDB.
Так как dbt не позволяет держать несколько разных типов коннекта нам необходимо будет создать новый dbt-проект и для этого выполним команду: dbt init duckdb_analytics --profiles-dir .
После инициализации проекта необходимо внутри него создать profiles.yml
:
duckdb_analytics:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 4
extensions:
- httpfs
- parquet
- postgres
target: dev
Я создал также папку duckdb_example в models и поэтому нам необходимо указать как материализовывать модели из неё.
...
models:
duckdb_analytics:
duckdb_example:
+materialized: view
После данной подготовки мы готовы создать нашу первую модель с использованием duckdb, для этого создадим файл users_from_pg_extension.sql
:
SELECT
*
FROM
postgres_scan(
'host=localhost port=5432 dbname=dbt user=postgres password=postgres',
'public',
'users'
)
Важно: данный пример является учебным. Поэтому не стоит в открытую использовать параметры для подключения. Их лучше прятать в окружение или при cicd процессе проставлять. Подробнее о том, как скрыть параметры для подключения описано в документации.
Также стоит отметить, что я в примере использую старый способ подключения к PostgreSQL, так как новый через ATTACH
в dbt не работает. Подробнее о работе с PostgreSQL через DuckDB описано в документации. И ещё можно под себя настроить коннект, с указанием конкретных плагинов и их настроек, подробнее описано в Configuring dbt-duckdb Plugins.
Так как мы создали нашу первую модель мы можем делать ref
на неё, поэтому давайте повторим задачу, которую мы делали в PostgreSQL — посчитаем количество регистраций за каждый день.
Для этого cоздадим fct_registred_by_date.sql
:
SELECT
created_at::date AS registred_date,
count(id) AS count
FROM
{{ ref("users_from_pg_extension") }}
GROUP BY
1
И если выполнить команду dbt run
, то у нас создастся ещё одно представление в DuckDB. И эта информация также отобразится в графе:
Давайте теперь поработаем с S3 через DuckDB.
Если вы не знаете что такое S3 и как его можно использовать, то можете ознакомиться с моей статьей — Инфраструктура для data engineer S3.
Добавим новый сервис в наш изначальный docker-compose.yaml
:
minio:
image: minio/minio:RELEASE.2024-10-13T13-34-11Z
restart: always
command: server /data --console-address ":9001"
volumes:
- ./data:/data
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
- MINIO_DOMAIN=minio
ports:
- "9000:9000" # MinIO S3 API
- "9001:9001" # MinIO Console
Важно: создание бакета, генерацию ключей я пропущу, так как она описана в статье выше.
В самом начале нашей работы S3 + DuckDB необходимо создать параметры для подключения. Поэтому мы немного изменим profiles.yml
:
duckdb_analytics:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 4
extensions:
- httpfs
- parquet
- postgres
settings:
s3_access_key_id: "GYZDiyIV4wnNoNTzp2Q5"
s3_secret_access_key: "zHOERNJFxcBYXXsa4XUXXSSy0apl8OED7BVp1A3Q"
s3_endpoint: "localhost:9000"
s3_use_ssl: "FALSE"
s3_url_style: "path"
target: dev
Теперь при работе DuckDB у нас будут использоваться настройки, которые мы прописали.
Давайте в начале запишем пользователей в S3, для этого необходимо выполнить скрипт create_and_fill_users_s3.py
.
После этого у нас появится .parquet
с пользователями в нашем бакете prod
.
Теперь мы можем создать первую модель, которая будет читать файл из S3, создадим файл users_from_s3_extension.sql
:
SELECT * FROM read_parquet('s3://prod/ods/users.parquet')
Если выполнить команду dbt run
, то в нашей БД будет создано представление.
Теперь давайте выполним наш пример с подсчётом регистраций и для этого создадим модель fct_registred_by_date_from_s3.sql
:
{{ config(materialized='table') }}
SELECT
created_at::date AS registred_date,
count(id) AS count
FROM
{{ ref("users_from_s3_extension") }}
GROUP BY
1
Важно: Я изменил основной конфиг материализации на таблицу, чтобы проверить корректность работы модели.
Стоит также отметить, что когда мы создаём таблицы/представления при помощи dbt, те настройки, которые мы указали в profiles.yml
создаются на сессию.
Это означает то, что если мы подключимся к нашей физической dev.duckdb
и попробуем выполнить запрос к S3 у нас не будет работать коннект, потому что мы создали новую сессию и в ней нужно заново инициализировать подключение к S3.
Повторюсь, если вы не знаете о DuckDB и том как создавать подключение к S3, то рекомендую ознакомиться с моей статьей — Всё что нужно знать про DuckDB.
И теперь после создания всех нужных нам моделей мы можем выполнить dbt run
и подключиться к DuckDB чтобы увидеть результат.
И также все наши модели отображаются в графе зависимостей:
Также данный пакет позволяет писать в S3. Для того чтобы совершить запись в S3 необходимо обратиться к документации — Writing to external files. Запись во внешние таблицы имеет специфический синтаксис и о нём стоит знать.
Давайте запишем результат нашего привычного запроса в S3 и для этого создадим файл fct_registred_by_date_from_s3_to_s3.sql
:
{{ config(materialized='external', location='s3://prod/dm/fct_registred_date.gzip.parquet') }}
SELECT
created_at::date AS registred_date,
count(id) AS count
FROM
{{ ref("users_from_s3_extension") }}
GROUP BY
1
И теперь помимо того, что мы сделали запись в S3 так это ещё отображается корректно в нашем графе:
Важно: я показал только пример использования dbt + DuckDB, но таких интеграций может быть множество: ClickHouse, MySQL и прочие коннекторы.
Заключение
Как я сказал в начале — dbt является мощным инструментом.
Я постарался показать основные моменты, но в dbt есть множество тонкостей и особенностей работы, с которыми вы сможете познакомиться на практике.
dbt также является довольно популярным инструментом, вы его можете найти у многих компаний. Поэтому знание dbt вам точно пригодится.
На что я бы обратил внимание:
Необходимо организовать хранение моделей так, чтобы было удобно всем участникам data-команды, но и было понятно как происходит деление моделей (по слоям, по логике и пр.).
Не стоит избегать тегов при работе с моделями.
Заполняйте мета-данные по моделям сразу.
Определите владельца мета-данных, чтобы не было свалки по моделям и колонкам.
Покрывайте модели тестами и не только стандартными, но из модуля dbt_utils.
Осторожнее с макросами.
Старайтесь использовать
ref
иsource
, чтобы выстраивать корректный data lineage и иметь возможность прогрузить всю цепочку.У dbt хорошая документация, не стоит ей пренебрегать.
Если вы не можете найти ответ на свой вопрос, то вы всегда можете обратиться в официальный Slack dbt.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.