Повышаем Data Quality: щепотка Soda для ваших данных

Привет! Меня зовут Александр Кудрявцев, я аналитик данных в команде Data Platform Банки.ру. Недавно мы озадачились вопросом контроля качества данных (Data Quality) и стали искать комплексное решение. Один из инструментов, который попал в поле зрения, — Soda Core. О нем и пойдет речь в материале.

Почему Data Quality — это важно

Не думаю, что местной публике стоит раскрывать избитые тезисы о важности использования данных и о том, как компании извлекают из них ощутимую пользу. Остановиться хочу на качестве.

Мы в Банки.ру используем data-driven подход при принятии бизнес-решений. С экспоненциальными темпами роста объема данных Data Quality становится критически важным. Принятые на основе некачественных данных бизнес-решения будут стоить компании очень дорого. Помимо прямых последствий ошибки, ее еще нужно найти, исправить, переобучить модель машинного обучения (если данные готовились для команды ML), потратить на это ресурс команды и так далее.

В идеале, Data Quality подход должен работать на упреждение: проверять данные от их источника до целевых витрин в моменты трансформаций. Важно, чтобы трансформации данных и пайплайны ETL запускались уже только после проверки качества — принцип GIGO (Garbage In — Garbage Out) никто не отменял.

d0d02972a5fd87f5cf2567ebfce61de6.png

За эти процессы у нас отвечают как и чисто технические, так и бизнес-мониторинги. Они рассылают сообщения в десятки каналов корпоративного мессенджера, попутно тегая ответственных лиц. Но существуют две проблемы.

  1. Мониторингов очень много, их инициализируют и оркестрируют разные процессы. А проверки хочется структурировать, описать всю логику в одном месте.

  2. Большинство проверок происходят уже постфактум, хотя они нужны в самих пайплайнах обработки и поставки данных.

Чтобы от этих проблем избавиться, мы решили попробовать Soda Core.

Soda Core для самых маленьких

Soda Core (https://docs.soda.io/) — это open-source проект для проверки качества данных. Под капотом — собственный язык проверок SodaCL (Soda Check Language), чеки прописываются в файле YAML в интуитивно понятной форме.

143ddcacb92dc0784bfd48d6b478910f.png

В базовой версии инструмент можно установить в виде CLI или Python-библиотеки и развернуть в своей инфраструктуре. Можно также использовать платную cloud-based версию. В ней доступно хранение истории проверок, красивый GUI с флажками, автопроверки и различные интеграции из коробки (например, пушинг алертов в Slack и другие мессенджеры).

Как установить SODA на локальный компьютер

Привожу пример для Windows.

  1. Создаем новый каталог, переходим в него, готовим виртуальное окружение и активируем. Для Soda Core необходим Python версии не ниже 3.8.

python -m venv venv
.venv\Scripts\activate
  1. Обновляем версию pip и устанавливаем сам пакет. В качестве примера я использую базу Postgres, поэтому устанавливаю пакет с коннектором к этой базе — soda-core-postgres. Полный список доступных коннекторов доступен в доках на сайте Soda — https://docs.soda.io/soda-library/install.html#install-soda-library-1.

pip install --upgrade pip
pip install -i https://pypi.cloud.soda.io soda-core-postgres
  1. Формируем два YAML файла. Один описывает конфигурацию подключения к базе данных, второй содержит инструкции для самих проверок на языке SodaCL. Еще понадобится простая табличка для тестирования. В документации (ссылка выше) можно скачать специальный docker образ с базой Postgres и семплом данных. Но я использую уже развернутую у нас песочницу.

 data_source postgres_test:
   type: postgres
   host: ${POSTGRES_TEST_HOST}
   username: ${POSTGRES_TEST_USERNAME}
   password: ${POSTGRES_TEST_PASS}
   database: postgres_test
   schema: temp
  • Из директории проекта создаём файл checks.yml и пишем первую простую проверку для нашей таблицы. Будем проверять, что количество строк в ней больше нуля. Тут все достаточно просто:

checks for t_soda_test:
  - row_count > 0
  1. Создаем тестовую таблицу и наполняем её данными выдуманных заказов.

CREATE TABLE IF NOT EXISTS "temp".t_soda_test AS (
SELECT
	floor(random() * 10000000)::int AS order_id,
	dt AS order_dt,
	floor(random() * 10000 + 300)::int AS user_id,
	(ARRAY[1,2,3,4,5])[floor(random() * 5 +1)] AS product_id,
	floor(random() * 10 + 5)::numeric AS qty,
	ROUND((random() * 500)::numeric, 2) AS revenue
FROM generate_series('2023-01-09'::timestamp, current_timestamp, '3 hours'::INTERVAL) dt
);
  1. Запускаем проверку из терминала и видим в консоли вывод о том, что она прошла успешно. Ура!

soda scan -d postgres_test -c configuration.yml checks.yml
Soda Core 3.3.1
Scan summary:
1/1 check PASSED: 
     t_soda_test in postgres_test
       row_count > 0 [PASSED]
All is good. No failures. No warnings. No errors.

Какие проверки можно писать на SodaCL

Приведу примеры основных проверок качества данных, которые можно реализовать в Soda Core. С полным перечнем можно ознакомиться в документации по языку SodaCL.

Картинка, которая в общих чертах показывает синтаксис SodaCL.

7835b31d75adddb33760ef4982662b40.png

Проверки числовых метрик

Пожалуй, самый частый тип. Соотносим числовую метрику данных с некой границей (trashhold). Границы можно задавать в виде диапазона от и до, а самим проверкам присваивать читаемые и понятные для конечных пользователей наименования.

В этом примере я проверяю общее количество строк, среднюю выручку и показатель минимального количества товаров в заказе, когда оно (количество товаров) больше или равно единице. Для первого параметра задаю диапазон, для второго указываю требование =>1, для последнего использую только один параметр — 0.

checks for t_soda_test:
  - row_count > 0:
      name: Общее количество строк больше нуля
  - avg(revenue) between 100 and 500:
      name: Средняя выручка между 100 и 500
  - min(qty) >= 1:
      name: Минимальное количество товаров в заказе больше или равно единице
  - duplicate_count(order_id) = 0:
      name: Количество дубликатов по order_id
Soda Core 3.3.1
Using DefaultSampler
Using DefaultSampler
Scan summary:
4/4 checks PASSED: 
t_soda_test in postgres_test
       Общее количество строк больше нуля [PASSED]
       Средняя выручка между 100 и 500 [PASSED]
       Минимальное количество товаров заказе больше или равно единице [PASSED]
       Количество дубликатов по order_id [PASSED]
 All is good. No failures. No warnings. No errors.

Проверка отсутствия данных

Здесь тоже все максимально просто: проверяем, не содержат ли наши данные пропусков в критичных колонках. SodaCL позволяет задавать разные уровни проверок. Например, если объем пропусков не превышает 10%, выдаем предупреждение, а больше 10% — это фейл.

checks for t_soda_test:
  - missing_count(user_id) = 0
  - missing_percent(product_id):
      warn: when > 0%
      fail: when >= 10%
Soda Core 3.3.1
Scan summary:
2/2 checks PASSED: 
     t_soda_test in postgres_test
       missing_count(user_id) = 0 [PASSED]
       missing_percent(product_id) warn when > 0% fail when >= 10% [PASSED]
 All is good. No failures. No warnings. No errors.

Проверка актуальности данных

Практически во всех таблицах хранятся данные о времени совершения событий, или содержится чисто техническое поле с датой и временем загрузки сведений в таблицу. Soda Core позволяет перед запуском последующих обработок проверить «свежесть» данных. В доках — https://docs.soda.io/soda-cl/freshness.html можно найти описание таких проверок. В нашем случае проверяем что:

  • в таблице есть заказы, сформированные через час или меньше от времени последней проверки;

  • в таблице есть данные о заказах, совершенных в течение дня.

Для этого указываем колонку в таблице, которая должна быть в формате date/timestamp. Проверки можно кастомизировать: задать дату, с которой хотим сравнить данные, но по дефолту Soda будет проверять данные и сравнивать их с моментом запуска последней.

checks for t_soda_test:
  - freshness(order_dt) <= 1h
  - freshness(order_dt) <= 1d
Soda Core 3.3.1
Scan summary:
 1/2 checks PASSED: 
     t_soda_test in postgres_test
       freshness(order_dt) <= 1d [PASSED]
 1/2 checks FAILED:
     t_soda_test in postgres_test
       freshness(order_dt) <= 1h [FAILED]
         max_column_timestamp: 2024-04-30 15:00:00+03:00
         max_column_timestamp_utc: 2024-04-30 12:00:00+00:00
         now_variable_name: NOW
         now_timestamp: 2024-04-30T15:12:53.186588+00:00
         now_timestamp_utc: 2024-04-30 15:12:53.186588+00:00
         freshness: 3:12:53.186588
Oops! 1 failures. 0 warnings. 0 errors. 1 pass.

Проверка валидности данных

С помощью проверок этого типа мы можем, например, убедиться, что имейлы пользователей написаны в правильном формате или валидировать колонки с датой рождения покупателей с помощью регулярных выражений…

Сделать это можно:

  • с помощью встроенных в SodaCL проверок. В библиотеке уже зашиты популярные форматы. Например, format email знает, как должен выглядеть валидный имейл, также есть номера телефонов.

  • написав свои проверки с помощью регулярных выражений. Как раз для случая, когда нужно валидировать колонку с датой рождения или с другими данными.

    checks for t_soda_customer_test
      invalid_count(email_address) = 0: valid format: email
      invalid_percent(birthday) < 5%: valid regex: (0?[0-9]|1[012])
    

Собственные проверки

В Soda Core можно достаточно легко реализовывать проверки с собственной логикой. Для этого нужно написать SQL выражение прямо в рамках yaml-файла. В отличие от проверки числовых метрик, в этом случае мы проводим вычисления. Содержание самой таблицы при этом не меняем.

Например, проверим, что выручка за вычетом скидки в 20% находится в диапазоне между 100 и 500.

checks for t_soda_test:
  avg_revenue_with_discount between 100 and 500: 
    avg_revenue_with_discount expression: AVG(revenue * 0.8) 
    name: Средняя выручка за вычетом скидки 20%
Soda Core 3.3.1
Scan summary:
 1/1 check PASSED: 
     t_soda_test in dwh_test
       Средняя выручка за вычетом скидки 20% [PASSED]
 All is good. No failures. No warnings. No errors.

Кажется, это очень полезная функция для накручивания специфической бизнес-логике. При этом все на родном и понятном SQL.

Что еще доступно из коробки в Soda

Выше мы пробежались по основным проверкам. Если захотите копнуть глубже, возможности «из коробки» приятно удивят. Я подготовил небольшую таблицу с кратким описанием того, что еще можно проверить с помощью Soda Core. В таблице добавил ссылки на соответствующие разделы официальной документации.

Тип проверки

Описание

Ссылка на документацию

Distribution checks

Проверка распределения метрики, удобно для выявления аномалий и выбросов на раннем этапе (нужно устанавливать доп. пакет)

https://docs.soda.io/soda-cl/distribution.html

Failed rows checks

Проверяем строки таблицы на ошибочные значения

https://docs.soda.io/soda-cl/failed-rows-checks.html

Anomaly detection checks

Поиск аномалий на базе значений таблицы

https://docs.soda.io/soda-cl/anomaly-detection.html

Schema checks

Проверяем схему таблицы, типы данных и т.д.

https://docs.soda.io/soda-cl/schema.html

Reference checks

Проверяем, что значения в столбце соответствуют значениям другой таблицы (Foreign Key)

https://docs.soda.io/soda-cl/reference.html

Cross checks

Проверяем что, например, количество строк в двух таблицах совпадает (актуально даже для разных баз)

https://docs.soda.io/soda-cl/cross-row-checks.html

Reconciliation checks

Можем проверить, например, при миграции данных из одного источника в другой, что целевые данные соответствуют данным на источнике

https://docs.soda.io/soda-cl/recon.html

Автоматизируем проверки качества данных с помощью Apache Airflow

Выше я привел примеры запуска проверок данных непосредственно из командной строки. Для ознакомления с инструментом этого вполне достаточно, но для внедрения проверок в промышленных масштабах нужно их через что-то запускать и оркестрировать. Ниже продемонстрирую пример запуска проверки по расписанию с помощью Apache Airflow.

Вариантов тут может быть несколько:

  1. Устанавливаем Soda Core CLI на удаленную машину. Используя SSHOperator, подключаемся к ней из Airflow и запускаем проверки через BashOperator.

  2. Устанавливаем библиотеку в образ самого Airflow. Не забываем попутно проверить, что версии библиотек не конфликтуют между собой. После этого можно даже написать какой-нибудь оператор, которому на вход можно подавать названия файлов/схем для проверки.

  3. Используем VirtualenvOperator в Airflow. При запуске дага создаем временное виртуальное окружение, скачиваем нужный пакет soda-core и запускаем проверки.

from datetime import timedelta 
from pathlib import Path

from airflow import DAG 
from airflow.operators.python import PythonVirtualenvOperator 
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

CONFIG_FILE = (Path(file).parent / 'configuration.yml').as_posix()
CHECKS_FILE = (Path(file).parent / 'checks.yml').as_posix()
DATA_SOURCE = 'postgres_test'

default_args = { 
  'owner': 'airflow', 
  'retries': 1, 
  'retry_delay': timedelta(minutes=5)
}

def run_soda_scan(config_file, data_source, checks_file):
  from soda.scan import Scan
  scan = Scan()
  scan.set_verbose()
  scan.add_configuration_yaml_file(config_file)
  scan.set_data_source_name(data_source)
  scan.add_sodacl_yaml_files(checks_file)
  scan.execute()

  print(scan.get_logs_text())
  
  if scan.has_check_fails():
     raise ValueError(f"Soda Scan failed with errors!")
  else:
    print("Soda scan successful")
    return 0

with DAG(
  'soda_core_airflow_test',
  default_args=default_args,
  schedule_interval=None,
  start_date=days_ago(1), 
) as dag:

  first_step = DummyOperator( 
    task_id='first_step' 
  )

  soda_core_scan_op = PythonVirtualenvOperator(
    task_id='dq_soda_scan',
    python_callable=run_soda_scan,
    requirements=["-i https://pypi.cloud.soda.io", "soda-core-postgres"],
    op_kwargs={
      'config_file': CONFIG_FILE,
      'data_source': DATA_SOURCE,
      'checks_file': CHECKS_FILE 
    }, 
    system_site_packages=False
  )

  next_step = DummyOperator(
    task_id='next_step'
  )

  first_step >> soda_core_scan_op >> next_step
  

cc7e06a980e5cd6b75be7207ec6451dc.png

Выводы

После первых тестов Soda Core показывает себя очень хорошо. Инструмент прост в освоении, универсален, имеет много коннекторов к различным БД. Можно даже настраивать end-to-end проверки от источника до целевой витрины.

Из явных плюсов Soda Core я бы выделил:

  • интуитивно понятный язык SodaCL;

  • низкий порог входа;

  • много решений из коробки (даже поиск аномалий в данных подвезли);

  • возможность с помощью понятного SQL реализовать собственную логику проверки данных.

Из явных минусов пока вижу только отсутствие в бесплатной версии хранения историчности проверок и красивого интерфейса. Еще нет интеграций с тем же Slack, но это не сложно реализовать самостоятельно.

В бою инструмент еще не проверен. Мы только готовимся проводить нагрузочное тестирование. Будем смотреть, как Soda покажет себя на больших объемах. Здесь есть некоторые опасения. Они связаны с тем, что запуск большого количества проверок дополнительно нагрузит БД. Правда, нагрузку можно регулировать: дополнительно фильтровать данные с помощью встроенных инструментов в SodaCL, снижая тем самым объем данных для проверки.

Из альтернативных и более популярных open-source решений можно выделить «Great eXpectations». Наши коллеги из ML-команды уже активно используют GX для проверок данных перед обучением моделей. И даже рассказывали об этом в статье на Хабре.

GX мы пока внимательно не исследовали. Издалека он выглядит функциональным: возможных метрик и проверок данных там очень много, есть «родная» интеграция с Airflow. Но пока кажется, что для его использования нужен больший ресурс на поддержку, нежели на SODA.

Спасибо, что прочитали эту статью. Буду рад по возможности ответить на ваши вопросы в комментариях. Также призываю делиться своим опытом по внедрению DQ и рассказывать об инструментах, которые для этого используете.

© Habrahabr.ru