Инфраструктура для Data-Engineer DBT

Введение

dbt является мощным фреймворком, который включает в себя два популярных языка: SQL + Python.

При помощи dbt можно создавать разные »слои» данных или выделить dbt только под один слой, к примеру dm.

При помощи понятного и всем известного SQL интерфейса можно создавать консистентные модели, которыми смогут пользоваться и понимать все участники data-команды. Ну также по моему мнению плюс dbt — возможность дать аналитикам самостоятельно создавать dm слой со своей логикой, что освобождает ресурс data-команды. Но стоит иметь ввиду, что нужен хороший процесс доставки кода: Code Review, паттерны, линтеры, принципы и прочее.

Стоит также упомянуть, что сервис может работать как отдельный инстанс через докер, так и питон пакетом. Все дальнейшие примеры я буду проводить через питон пакет.

Инициализация проекта

Код всего проекта вы можете найти в моём репозитории.

В начале поднимем нужные нам сервисы используя команду: docker-compose up -d

После этого давайте настроим локальное Python окружение для дальнейшей работы:

python3.12 -m venv venv && \
source venv/bin/activate && \
pip install --upgrade pip && \
pip install poetry && \
poetry lock && \
poetry install

После того, когда мы установили все зависимости, мы можем проверить версию dbt командой: dbt --version.

Следующим этапом при работе с dbt является — инициализация проекта, поэтому выполним команду: dbt init pg_analytics --profiles-dir . и укажем все необходимые параметры, которые он будет спрашивать во время создания проекта. Наш проект я назвал: pg_analytics.

Важно: При инициализации проекта вы можете не указывать параметр --profiles-dir.

Если не указать --profiles-dir, то при инициализации проекта у нас создатся глобальный profiles.yml, который будет хранится в ~/.dbt/.

Но для демонстрации мы не будем его использовать, а воспользуемся локальным profiles.yml внутри самого проекта. Поэтому мы выполним команду dbt init pg_analytics --profiles-dir . чтобы создать profiles.yml внутри нашего проекта.

Важно: рекомендуется создавать profiles.yml вне проекта, чтобы все параметры для подключения держать вне проекта и также profiles.yml будет содержать информацию по всем проектам, если у вас их больше одного.

После инициализации мы получим следующий profiles.yml:

pg_analytics:  
  outputs:  
    dev:  
      dbname: dbt  
      host: localhost  
      pass: postgres  
      port: 5432  
      schema: dbt  
      threads: 4  
      type: postgres  
      user: postgres  
  target: dev

Проверить коннект можно командой dbt debug (предварительно зайдя в проект pg_analytics).

Также стоит обратить, что когда мы создали проект, то он содержит стандартный набор директорий:

  • analyses

  • logs

  • macros

  • models

  • seeds

  • snapshots

  • target

  • tests Более подробно о логике и работе директорий вы можете узнать из документации. Ниже мы рассмотрим часть из них на примерах

И файлов:

Стандартные примеры dbt

Все настройки завершены и поэтому мы можем запустить наш проект. В начале у нас в папке models хранятся модели, которые будут созданы изначально. На них проверим корректность работы dbt.

Для этого выполним команду: dbt run и увидим логи нашей команды:

08:01:36  Running with dbt=1.8.8
08:01:36  Registered adapter: postgres=1.8.2
08:01:37  Unable to do partial parsing because saved manifest not found. Starting full parse.
08:01:37  Found 2 models, 4 data tests, 423 macros
08:01:37  
08:01:37  Concurrency: 4 threads (target='dev')
08:01:37  
08:01:37  1 of 2 START sql table model dbt.my_first_dbt_model ............................ [RUN]
08:01:37  1 of 2 OK created sql table model dbt.my_first_dbt_model ....................... [SELECT 2 in 0.07s]
08:01:37  2 of 2 START sql view model dbt.my_second_dbt_model ............................ [RUN]
08:01:37  2 of 2 OK created sql view model dbt.my_second_dbt_model ....................... [CREATE VIEW in 0.06s]
08:01:37  
08:01:37  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 0.27 seconds (0.27s).
08:01:37  
08:01:37  Completed successfully
08:01:37  
08:01:37  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

И если сейчас подключиться к нашему PostgreSQL, то мы сможем увидеть, что у нас создалась схема dbt и в ней создалась таблица my_first_dbt_model и представление my_second_dbt_model.

Также мы можем »скомпилировать» наши модели и сразу провести тестирование.

А сейчас давайте выполним команду dbt build и мы увидим, что у нас часть процессов не прошла, потому что один из тестов провален.

Мы видим, что у нас провален этап not_null_my_first_dbt_model_id чтобы это исправить, уберём комментарий в модели where id is not null и если заново выполнить build проекта, то у нас пройдут все тесты и все этапы.

Практика

Всё, мы убедились, что наш dbt работает, поэтому давайте создадим нашу первую модель.

Но в начале нам необходимо запустить файл create_and_fill_users.py из папки py_files.

После создания и заполнения таблицы мы можем над ней работать при помощи dbt-моделей.

Для этого нам нужно создать новую папку dm, которая будет содержать dm-таблицы, затем создадим fct_registred_by_date.sql, в котором напишем простой запрос:

SELECT  
    created_at::date AS registred_date,  
    count(*)  
FROM  
    users AS u  
GROUP BY  
    1

Важно: Как вы назовёте файл, так и будет называться ваша таблица, поэтому выбирайте правильно.

Стоит отметить, что основной dbt-проекта является dbt_project.yml и в нём указывается вся информация по проекту: настройки компиляции, материализация таблиц, какие папки включать и пр.

И так как мы добавили новую директорию dm, то её необходимо добавить в проект, чтобы модели внутри директории запускались.

Для этого необходимо в dbt_project.yml добавить нашу директорию и указать то, как мы хотим видеть наши модели в БД. Я сделаю таблицы с уровнем материализации table:

...
dm:  
  +materialized: table

Более подробно о материализации.

Дополнительно мы можем прописать мета-данные для нашей модели, чтобы в дальнейшем было удобнее ориентироваться в ней.

Для этого создадим файл schema.yaml:

models:  
  - name: fct_registred_by_date  
    description: "Количество регистраций за день"  
    columns:  
      - name: registred_date  
        description: "Дата регистрации"  
      - name: count  
        description: "Количество регистраций"

Важно: Если вы некорректно укажите имя столбца, к примеру вместо name укажите namefoo, то у вас не отобразится никаких исключений.

Теперь если выполнить команду dbt run, то получим следующее сообщение:

...
OK created sql table model dbt.fct_registred_by_date ...[SELECT 365 in 0.37s]
...

И теперь наша таблица fct_registred_by_date готова к использованию.

Стоит обратить внимание, что таблица заполнила значения за текущий момент и если в таблице users появятся новые записи, то мы в нашей витрине этого не увидим. Поэтому это всё нужно оркестрировать.

Оркестрация доступна в dbt Cloud или сторонними решениями. Как одно из популярных решений — это объединение dbt + Airflow. Об одной из таких реализаций вы можете ознакомиться из доклада: dbt — ядро современной платформы данных.

Теперь давайте посмотрим как будет отрабатывать модель, если в таблице users появятся новые записи.

Для этого выполните файл add_new_month.py и у нас появятся записи за январь 2025-го.

И если после этого снова выполнить команду dbt run, то получим следующее сообщение:

...
OK created sql table model dbt.fct_registred_by_date ... [SELECT 396 in 0.08s]
...

Важно: он перезаписывает всю таблицу, в дальнейшем мы сделаем заполнение недостающих данных.

Для того, чтобы исключить полное перезаписывание витрины в dbt существует формат материализации incremental. Давайте им воспользуемся и создадим для этого новый файл fct_registred_by_date_incremental.sql:

{{  
    config(  
        materialized='incremental',  
        unique_key='registred_date'  
    )  
}}  
  
SELECT  
    created_at::date AS registred_date,  
    count(*)  
FROM  
    users AS u  
  
{% if is_incremental() %}  
  
  WHERE created_at >= (SELECT coalesce(max(registred_date), '1900-01-01') FROM {{ this }})  
  
{% endif %}  
  
GROUP BY  
    1

На что здесь стоит обратить внимание:

  1. Ранее в dbt_project.yml мы указали, что все модели в директори dm будут создаваться как table. Но в данном случае мы переопределяем основной config на incremental.

  2. Для incremental является обязательным указанием unique_key, по которому он как раз и будет выполнять вставки.

Теперь если мы повторим действия над таблицей users как ранее, то получим корректный результат. Наша таблица после каждого выполнения команды dbt run будет инкрементально заполняться.

Больше подробностей и тонкостей инкрементальной материализации описано в документации.

Source

Для корректного обращения к источникам давайте их пропишем в нашей папке models, чтобы мы могли на них ссылаться через Jinja-шаблоны, а не прописывая их самостоятельно.

Для этого необходимо создать файл source.yml и в нём прописать конфигурацию для источников:

version: 2  
  
sources:  
  - name: public  
    database: dbt  
    schema: public  
    tables:  
      - name: users

И теперь давайте попробуем воспользоваться данным источником, чтобы скопировать таблицу users к себе в схему dbt.

Для этого создадим файл dbt_source_users.sql в новой директории as_is внутри models и добавим такой код:

SELECT * FROM {{ source("public", "users") }}

Данный код на первый взгляд может показаться не понятным, но сейчас мы его скомпилируем и получим привычный нам SQL.

Давайте выполним команду: dbt compile и если у нас всё будет хорошо, то в папке target мы сможем найти наш скомпилированный файл, который выглядит следующим образом:

SELECT * FROM "dbt"."public"."users"

Если мы возьмём этот запрос и выполним его в нашей БД, то получим корректный результат. Теперь мы можем запустить эту модель в БД, чтобы она выполнила заложенную логику.

Важно: компилятор вызовет исключение и укажет на ошибку, если ваш запрос составлен некорректно.

Выполним команду: dbt run и увидим, что у нас создалось представление, а это потому, что мы не указали как материализовывать наши модели из папки as_is, давайте укажем в dbt_project.yml, по какой логике должна работать наша модель:

as_is:  
  +materialized: table

И если ещё раз выполнить команду dbt run, то мы увидим, что у нас создалась таблица, а не представление (в логах будет указано SELECT ***, а не CREATE VIEW).

В дальнейшем все наши источники, которые будут использованы во время проекта будем указывать внутри source.yml

Snapshots

Для аналитики довольным важным моментом является — »путешествие во времени». Что это означает? Это означает, что аналитик желает посмотреть какое у нас было состояние по пользователям/продажам/прочее на тот или иной момент.

Поэтому наша задача как дата-инженеров организовать такую возможность и её можно реализовать при помощи SCD2. Более подробно о SCD можете почитать по ссылке.

При инициализации проекта у нас создалась папка snapshots и мы в ней будем создавать нашу SCD2 таблицу.

В начале нам нужно создать файл users_scd2.sql:

{% snapshot users_timestamp %}  
  
    {{  
        config(  
            target_database='dbt',  
            target_schema='snapshots',  
            strategy='timestamp',  
            unique_key='id',  
            updated_at='updated_at',  
        )  
    }}  
  
    SELECT * FROM {{ source('public', 'users') }}  
  
{% endsnapshot %}

Здесь мы также используем Jinja-шаблоны:

  • Указываем, что это таблица типа snapshot.

  • Указываем все необходимые атрибуты для работы с таблицей.

  • Указываем запрос на извлечение данных из таблицы. Здесь также используем наш source, который мы определили ранее.

После создания данного файла мы можем выполнить команду: dbt snapshot и увидим в логе SELECT *** — это означает, что таблица создана и перенесла все записи из таблицы users.

Давайте теперь изменим какую-нибудь запись, чтобы увидеть работу SCD2.

Так как в поле у нас хранится uuid и он каждый раз различный, вы можете выбрать любого »пользователя» для изменения.

   UPDATE users SET (first_name, updated_at) = ('dummy', now())
   WHERE id = '86433147-fafe-4d74-b67c-59c3a283974c'

Мы изменили имя пользователя и чтобы создать новую запись в SCD-таблице необходимо снова выполнить команду: dbt snapshot. После выполнения мы увидим, что у нас добавилась новая строка в таблицу:

id

created_at

updated_at

first_name

dbt_scd_id

dbt_updated_at

dbt_valid_from

dbt_valid_to

86433147-fafe-4d74-b67c-59c3a283974c

2024–08–16 17:31:30.670 +0000

2024–02–29 14:53:29.191 +0000

Алина

0cea94932fa3921c59e771f5c0b53bb4

2024–02–29 14:53:29.191 +0000

2024–02–29 14:53:29.191 +0000

2024–10–26 03:07:44.519 +0000

86433147-fafe-4d74-b67c-59c3a283974c

2024–08–16 17:31:30.670 +0000

2024–10–26 03:07:44.519 +0000

dummy

cf61eb52bac629a0af70cd2c46e63f8a

2024–10–26 03:07:44.519 +0000

2024–10–26 03:07:44.519 +0000

Теперь наша таблица users будет версионироваться после каждого запуска команды dbt snapshot.

Более подробно о возможностях snapshots описано в официальной документации.

Функция ref

Ref помогает нам выстроить data lineage, для того чтобы он корректно отображался на графе и также мы могли правильно запускать/перезапускать наши модели.

Важным моментом здесь является то, что мы должны »затянуть» к себе в проект какую-то модель, чтобы ей в дальней пользоваться.

Основное отличие от source здесь в том, что source — это просто ссылка на объект в БД, а ref — это ссылка на объект внутри dbt-проекта.

Давайте рассмотрим это на примере и для этого создадим файл count_users_by_city.sql в папке dm со следующим кодом:

SELECT  
    city,  
    count(*)  
FROM  
    {{ ref('dbt_source_users') }}  
GROUP BY  
    1

В данном примере мы используем ранее созданную таблицу dbt_source_users с пользователями. Если мы захотим обратиться изначально к нашей таблице указав {{ ref('users') }}, то получим исключение:

Compilation Error

Model 'model.pg_analytics.count_users_by_city' (models/dm/count_users_by_city.sql) depends on a node named 'users' which was not found

Важно: Во время работы с ref мы должны использовать только dbt-модели.

Более подробно о ref функции описано в документации.

WEB UI

Мы сейчас с вами построили несколько моделей и теперь мы можем посмотреть взаимосвязи между ними.

Для этого нам сначала нужно выполнить команду: dbt docs generate, которая выстроит зависимости между моделями и создаст документацию по нашему проекту.

Для того, чтобы запустить наш веб-сервер необходимо выполнить команду: dbt docs serve. После этого страница откроется автоматически, а если не открылась, то будет доступна по адресу: http://localhost:8080/#!/overview

После запуска нашего сервера мы можем изучить список взаимосвязей в проекте. Что мы имеем сейчас:

2e92dd20d12df53884dad9d1966c4b11.png

Здесь стоит обратить внимание на висящую ноду fct_registred_by_date, потому что она у нас была создана таким запросом:

SELECT  
    created_at::date AS registred_date,  
    count(*)  
FROM  
    users AS u  
GROUP BY  
    1

И как мы видим в данном запросе у нас не используется ни source ни ref. И поэтому в случае дебага или проверки зависимостей нужно будет самостоятельно изучать dbt-модели.

При работе с графом зависимостей часто говорят о запуске модели и её зависимостей. В dbt это реализовано довольно гибко. О чём описано в документации.

Но из основного я бы отметил, что мы можем регулировать что запускать при запуске модели. К примеру:

  • Запустить саму модель: dbt run -m name_of_model

  • Запустить саму модель и зависимости ДО: dbt run -m +name_of_model

  • Запустить саму модель и зависимости ПОСЛЕ: dbt run -m name_of_model+

  • Запустить саму модель и зависимости ДО и ПОСЛЕ: dbt run -m +name_of_model+

Теги

Помимо деления по папкам мы также можем добавить деление по tags. Для этого необходимо внутри самого главного конфиг файла dbt_project.yml задать tags:

models:  
  pg_analytics:  
    example:  
      +materialized: view  
    dm:  
      +materialized: table  
      tags: "marts"  
    as_is:  
      +materialized: table

И теперь благодаря такой настройке мы можем запускать модели по tags такой командой: dbt run -m tag:marts

Более подробно о работе tags описано в документации.

Тесты

Тесты — это неотъемлемая часть в дата-инженерии и аналитике. Наша задача — сделать так, чтобы данным могли доверять.

И один из способов обеспечить доверие данным — это покрыть их тестами и проверками.

Давайте начнём со стандартных тестов, который предоставляет dbt »из коробки»:

  • unique: проверяет, что значения должны быть уникальным, без дублей.

  • not_null: проверяет, что в колонке нет NULL значений.

  • accepted_values: проверяет, что колонка содержит нужные значения. К примеру: для колонки orders должны быть значения:  'placed',  'shipped',  'completed',  'returned'

  • relationships: Проверяет, что внешний ключ целостный и консистентный и у нас в витрине или другой модели не появилось »новых» ключей.

Все тесты указываются для колонок, к примеру так можно указать тесты для нашей витрины fct_registred_by_date:

version: 2  
  
models:  
  - name: fct_registred_by_date  
    description: "Количество регистраций за день"  
    columns:  
      - name: registred_date  
        description: "Дата регистрации"  
        data_tests:  
          - unique  
          - not_null  
      - name: count  
        description: "Количество регистраций"

Все тесты указываются в schema.yml для нужной нам модели и нужной колонки

Но помимо стандартных тестов мы можем самостоятельно писать тесты для наших моделей. Основная логика dbt тестов — запрос ничего не должен возвращать. Если тест вернул хотя бы одну строку, то тест считается проваленным.

Давайте сделаем свой not_null для колонки count таблицы fct_registred_by_date.

Для этого создадим файл not_null_count_fct_registred_by_date.sql в папке tests со следующим кодом:

SELECT  
    *  
FROM  
    {{ ref('fct_registred_by_date') }}  
WHERE  
    "count" IS NULL

И если мы снова запустим наши тесты командой dbt test, то сможем увидеть, что наш тест запустился и прошёл успешно.

...
test not_null_count_fct_registred_by_date ... [RUN]
...
not_null_count_fct_registred_by_date ... [PASS in 0.06s]
...

Макросы

Макросы — это создание некого шаблона, который мы сможем использовать в нашем коде при помощи Jinja.

Давайте создадим наш первый макрос. Для этого создадим файл cast_to_date.sql в папке macros:

{% macro cast_to_date(column_name) %}  
    {{ column_name }}::date  
{% endmacro %}

И теперь давайте создадим ещё раз нашу таблицу fct_registred_by_date, но уже с использованием макроса:

SELECT  
    {{ cast_to_date('created_at') }} AS registred_date,  
    count(*)  
FROM  
    users AS u  
GROUP BY  
    1

И если сейчас мы выполним команду: dbt compile, то получим такой скомпилированный файл (форматирование оставил, как сделал dbt):

SELECT  
    created_at::date  
 AS registred_date,  
    count(*)  
FROM  
    users AS u  
GROUP BY  
    1

Важно: Не стоит делать сильно сложные макросы, потому что их довольно сложно дебажить и также стоит учитывать, что макрос будет использоваться в разных частях проектах и при дальнейших его изменениях можно сломать часть бизнес-логики.

Более подробно о работе и возможностях макросов описано в документации.

Пакеты

Для dbt существует пакетный менеджер, как и для Python. Со всеми пакетами вы можете ознакомиться на официальном dbt-hub.

Но мы сейчас рассмотрим самый популярный пакет для dbt — dbt_utils.

Данный пакет предоставляет: новый тесты для наших моделей, макросы для создания моделей и разные полезные инструменты для более комфортной работы с dbt.

Для того, чтобы установить dbt_utils необходимо в корне нашего проекта создать файл packages.yml и указать следующие параметры:

packages:
  - package: dbt-labs/dbt_utils
    version: 1.3.0

И затем необходимо выполнить команду: dbt deps

Теперь мы имеем возможность использовать функции dbt_utils, давайте рассмотрим одну из — dbt_utils.star.

Для этого примера мы продублируем таблицу fct_registred_by_date. Давайте создадим файл dbt_utils_source_fct_registred_by_date.sql и напишем такой код:

SELECT  
    {{ dbt_utils.star(ref("fct_registred_by_date")) }}  
FROM  
    {{ ref("fct_registred_by_date") }}

И если сейчас выполнить команду: dbt compile, то получим такой код:

SELECT  
	"registred_date",  
	"count"  
FROM  
    "dbt"."dbt"."fct_registred_by_date"

Он сделал нам перечисление всех колонок из таблицы, а не использовал стандартную команду: *.

Если мы хотим получать не все колонки, то можно их исключить следующим образом:

SELECT  
    {{ dbt_utils.star(ref("fct_registred_by_date"), except=["count"]) }}  
FROM  
    {{ ref("fct_registred_by_date") }}

И после компиляции получим такой код:

SELECT  
    "registred_date"  
FROM  
    "dbt"."dbt"."fct_registred_by_date"

Понятное дело, что я привожу простой пример, на практике таблицы могут иметь более одной колонки. В данном случае я хотел продемонстрировать возможности dbt_utils, которых великое множество.

Прочие коннекты

На текущий момент dbt имеет множество коннектов к различным системам: OLAP БД, OLTP БД, Data Lake.

Более подробно о подключении дата платформ в dbt описано в документации.

Давайте попробуем создать подключение к DuckDB.

Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей — Всё что нужно знать про DuckDB.

Так как dbt не позволяет держать несколько разных типов коннекта нам необходимо будет создать новый dbt-проект и для этого выполним команду: dbt init duckdb_analytics --profiles-dir .

После инициализации проекта необходимо внутри него создать profiles.yml:

duckdb_analytics:  
  outputs:  
    dev:  
      type: duckdb  
      path: dev.duckdb  
      threads: 4  
      extensions:  
        - httpfs  
        - parquet  
        - postgres  
  target: dev

Я создал также папку duckdb_example в models и поэтому нам необходимо указать как материализовывать модели из неё.

...
models:  
  duckdb_analytics:  
    duckdb_example:  
      +materialized: view

После данной подготовки мы готовы создать нашу первую модель с использованием duckdb, для этого создадим файл users_from_pg_extension.sql:

SELECT  
    *  
FROM  
    postgres_scan(  
        'host=localhost port=5432 dbname=dbt user=postgres password=postgres',  
        'public',  
        'users'  
    )

Важно: данный пример является учебным. Поэтому не стоит в открытую использовать параметры для подключения. Их лучше прятать в окружение или при cicd процессе проставлять. Подробнее о том, как скрыть параметры для подключения описано в документации.

Также стоит отметить, что я в примере использую старый способ подключения к PostgreSQL, так как новый через ATTACH в dbt не работает. Подробнее о работе с PostgreSQL через DuckDB описано в документации. И ещё можно под себя настроить коннект, с указанием конкретных плагинов и их настроек, подробнее описано в Configuring dbt-duckdb Plugins.

Так как мы создали нашу первую модель мы можем делать ref на неё, поэтому давайте повторим задачу, которую мы делали в PostgreSQL — посчитаем количество регистраций за каждый день.

Для этого cоздадим fct_registred_by_date.sql:

SELECT  
    created_at::date AS registred_date,  
    count(id) AS count  
FROM  
     {{ ref("users_from_pg_extension") }}  
GROUP BY  
    1

И если выполнить команду dbt run, то у нас создастся ещё одно представление в DuckDB. И эта информация также отобразится в графе:

9cee6332058f28cf150dd4b05b6fddcf.png

Давайте теперь поработаем с S3 через DuckDB.

Если вы не знаете что такое S3 и как его можно использовать, то можете ознакомиться с моей статьей — Инфраструктура для data engineer S3.

Добавим новый сервис в наш изначальный docker-compose.yaml:

minio:  
    image: minio/minio:RELEASE.2024-10-13T13-34-11Z  
    restart: always  
    command: server /data --console-address ":9001"  
    volumes:  
      - ./data:/data  
    environment:  
      - MINIO_ROOT_USER=minioadmin  
      - MINIO_ROOT_PASSWORD=minioadmin  
      - MINIO_DOMAIN=minio  
    ports:  
      - "9000:9000"  # MinIO S3 API  
      - "9001:9001"  # MinIO Console

Важно: создание бакета, генерацию ключей я пропущу, так как она описана в статье выше.

В самом начале нашей работы S3 + DuckDB необходимо создать параметры для подключения. Поэтому мы немного изменим profiles.yml:

duckdb_analytics:  
  outputs:  
    dev:  
      type: duckdb  
      path: dev.duckdb  
      threads: 4  
      extensions:  
        - httpfs  
        - parquet  
        - postgres  
      settings:  
        s3_access_key_id: "GYZDiyIV4wnNoNTzp2Q5"  
        s3_secret_access_key: "zHOERNJFxcBYXXsa4XUXXSSy0apl8OED7BVp1A3Q"  
        s3_endpoint: "localhost:9000"  
        s3_use_ssl: "FALSE"  
        s3_url_style: "path"  
  target: dev

Теперь при работе DuckDB у нас будут использоваться настройки, которые мы прописали.

Давайте в начале запишем пользователей в S3, для этого необходимо выполнить скрипт create_and_fill_users_s3.py.

После этого у нас появится .parquet с пользователями в нашем бакете prod.

Теперь мы можем создать первую модель, которая будет читать файл из S3, создадим файл users_from_s3_extension.sql:

SELECT * FROM read_parquet('s3://prod/ods/users.parquet')

Если выполнить команду dbt run, то в нашей БД будет создано представление.

Теперь давайте выполним наш пример с подсчётом регистраций и для этого создадим модель fct_registred_by_date_from_s3.sql:

{{ config(materialized='table') }}  
  
SELECT  
    created_at::date AS registred_date,  
    count(id) AS count  
FROM  
     {{ ref("users_from_s3_extension") }}  
GROUP BY  
    1

Важно: Я изменил основной конфиг материализации на таблицу, чтобы проверить корректность работы модели.

Стоит также отметить, что когда мы создаём таблицы/представления при помощи dbt, те настройки, которые мы указали в profiles.yml создаются на сессию.

Это означает то, что если мы подключимся к нашей физической dev.duckdb и попробуем выполнить запрос к S3 у нас не будет работать коннект, потому что мы создали новую сессию и в ней нужно заново инициализировать подключение к S3.

Повторюсь, если вы не знаете о DuckDB и том как создавать подключение к S3, то рекомендую ознакомиться с моей статьей — Всё что нужно знать про DuckDB.

И теперь после создания всех нужных нам моделей мы можем выполнить dbt run и подключиться к DuckDB чтобы увидеть результат.

И также все наши модели отображаются в графе зависимостей:

b6fd8352a833056c401dcd7276797ec9.png

Также данный пакет позволяет писать в S3. Для того чтобы совершить запись в S3 необходимо обратиться к документации — Writing to external files. Запись во внешние таблицы имеет специфический синтаксис и о нём стоит знать.

Давайте запишем результат нашего привычного запроса в S3 и для этого создадим файл fct_registred_by_date_from_s3_to_s3.sql:

{{ config(materialized='external', location='s3://prod/dm/fct_registred_date.gzip.parquet') }}  
SELECT  
    created_at::date AS registred_date,  
    count(id) AS count  
FROM  
    {{ ref("users_from_s3_extension") }}  
GROUP BY  
    1

И теперь помимо того, что мы сделали запись в S3 так это ещё отображается корректно в нашем графе:

6626ca5594d37aaf4640f94baefb8872.png

Важно: я показал только пример использования dbt + DuckDB, но таких интеграций может быть множество: ClickHouse, MySQL и прочие коннекторы.

Заключение

Как я сказал в начале — dbt является мощным инструментом.

Я постарался показать основные моменты, но в dbt есть множество тонкостей и особенностей работы, с которыми вы сможете познакомиться на практике.

dbt также является довольно популярным инструментом, вы его можете найти у многих компаний. Поэтому знание dbt вам точно пригодится.

На что я бы обратил внимание:

  • Необходимо организовать хранение моделей так, чтобы было удобно всем участникам data-команды, но и было понятно как происходит деление моделей (по слоям, по логике и пр.).

  • Не стоит избегать тегов при работе с моделями.

  • Заполняйте мета-данные по моделям сразу.

  • Определите владельца мета-данных, чтобы не было свалки по моделям и колонкам.

  • Покрывайте модели тестами и не только стандартными, но из модуля dbt_utils.

  • Осторожнее с макросами.

  • Старайтесь использовать ref и source, чтобы выстраивать корректный data lineage и иметь возможность прогрузить всю цепочку.

  • У dbt хорошая документация, не стоит ей пренебрегать.

  • Если вы не можете найти ответ на свой вопрос, то вы всегда можете обратиться в официальный Slack dbt.

Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.

© Habrahabr.ru