Повышаем Data Quality: щепотка Soda для ваших данных
Привет! Меня зовут Александр Кудрявцев, я аналитик данных в команде Data Platform Банки.ру. Недавно мы озадачились вопросом контроля качества данных (Data Quality) и стали искать комплексное решение. Один из инструментов, который попал в поле зрения, — Soda Core. О нем и пойдет речь в материале.
Почему Data Quality — это важно
Не думаю, что местной публике стоит раскрывать избитые тезисы о важности использования данных и о том, как компании извлекают из них ощутимую пользу. Остановиться хочу на качестве.
Мы в Банки.ру используем data-driven подход при принятии бизнес-решений. С экспоненциальными темпами роста объема данных Data Quality становится критически важным. Принятые на основе некачественных данных бизнес-решения будут стоить компании очень дорого. Помимо прямых последствий ошибки, ее еще нужно найти, исправить, переобучить модель машинного обучения (если данные готовились для команды ML), потратить на это ресурс команды и так далее.
В идеале, Data Quality подход должен работать на упреждение: проверять данные от их источника до целевых витрин в моменты трансформаций. Важно, чтобы трансформации данных и пайплайны ETL запускались уже только после проверки качества — принцип GIGO (Garbage In — Garbage Out) никто не отменял.
За эти процессы у нас отвечают как и чисто технические, так и бизнес-мониторинги. Они рассылают сообщения в десятки каналов корпоративного мессенджера, попутно тегая ответственных лиц. Но существуют две проблемы.
Мониторингов очень много, их инициализируют и оркестрируют разные процессы. А проверки хочется структурировать, описать всю логику в одном месте.
Большинство проверок происходят уже постфактум, хотя они нужны в самих пайплайнах обработки и поставки данных.
Чтобы от этих проблем избавиться, мы решили попробовать Soda Core.
Soda Core для самых маленьких
Soda Core (https://docs.soda.io/) — это open-source проект для проверки качества данных. Под капотом — собственный язык проверок SodaCL (Soda Check Language), чеки прописываются в файле YAML в интуитивно понятной форме.
В базовой версии инструмент можно установить в виде CLI или Python-библиотеки и развернуть в своей инфраструктуре. Можно также использовать платную cloud-based версию. В ней доступно хранение истории проверок, красивый GUI с флажками, автопроверки и различные интеграции из коробки (например, пушинг алертов в Slack и другие мессенджеры).
Как установить SODA на локальный компьютер
Привожу пример для Windows.
Создаем новый каталог, переходим в него, готовим виртуальное окружение и активируем. Для Soda Core необходим Python версии не ниже 3.8.
python -m venv venv
.venv\Scripts\activate
Обновляем версию pip и устанавливаем сам пакет. В качестве примера я использую базу Postgres, поэтому устанавливаю пакет с коннектором к этой базе —
soda-core-postgres
. Полный список доступных коннекторов доступен в доках на сайте Soda — https://docs.soda.io/soda-library/install.html#install-soda-library-1.
pip install --upgrade pip
pip install -i https://pypi.cloud.soda.io soda-core-postgres
Формируем два YAML файла. Один описывает конфигурацию подключения к базе данных, второй содержит инструкции для самих проверок на языке SodaCL. Еще понадобится простая табличка для тестирования. В документации (ссылка выше) можно скачать специальный docker образ с базой Postgres и семплом данных. Но я использую уже развернутую у нас песочницу.
data_source postgres_test:
type: postgres
host: ${POSTGRES_TEST_HOST}
username: ${POSTGRES_TEST_USERNAME}
password: ${POSTGRES_TEST_PASS}
database: postgres_test
schema: temp
Из директории проекта создаём файл checks.yml и пишем первую простую проверку для нашей таблицы. Будем проверять, что количество строк в ней больше нуля. Тут все достаточно просто:
checks for t_soda_test:
- row_count > 0
Создаем тестовую таблицу и наполняем её данными выдуманных заказов.
CREATE TABLE IF NOT EXISTS "temp".t_soda_test AS (
SELECT
floor(random() * 10000000)::int AS order_id,
dt AS order_dt,
floor(random() * 10000 + 300)::int AS user_id,
(ARRAY[1,2,3,4,5])[floor(random() * 5 +1)] AS product_id,
floor(random() * 10 + 5)::numeric AS qty,
ROUND((random() * 500)::numeric, 2) AS revenue
FROM generate_series('2023-01-09'::timestamp, current_timestamp, '3 hours'::INTERVAL) dt
);
Запускаем проверку из терминала и видим в консоли вывод о том, что она прошла успешно. Ура!
soda scan -d postgres_test -c configuration.yml checks.yml
Soda Core 3.3.1
Scan summary:
1/1 check PASSED:
t_soda_test in postgres_test
row_count > 0 [PASSED]
All is good. No failures. No warnings. No errors.
Какие проверки можно писать на SodaCL
Приведу примеры основных проверок качества данных, которые можно реализовать в Soda Core. С полным перечнем можно ознакомиться в документации по языку SodaCL.
Картинка, которая в общих чертах показывает синтаксис SodaCL.
Проверки числовых метрик
Пожалуй, самый частый тип. Соотносим числовую метрику данных с некой границей (trashhold). Границы можно задавать в виде диапазона от и до, а самим проверкам присваивать читаемые и понятные для конечных пользователей наименования.
В этом примере я проверяю общее количество строк, среднюю выручку и показатель минимального количества товаров в заказе, когда оно (количество товаров) больше или равно единице. Для первого параметра задаю диапазон, для второго указываю требование =>1, для последнего использую только один параметр — 0.
checks for t_soda_test:
- row_count > 0:
name: Общее количество строк больше нуля
- avg(revenue) between 100 and 500:
name: Средняя выручка между 100 и 500
- min(qty) >= 1:
name: Минимальное количество товаров в заказе больше или равно единице
- duplicate_count(order_id) = 0:
name: Количество дубликатов по order_id
Soda Core 3.3.1
Using DefaultSampler
Using DefaultSampler
Scan summary:
4/4 checks PASSED:
t_soda_test in postgres_test
Общее количество строк больше нуля [PASSED]
Средняя выручка между 100 и 500 [PASSED]
Минимальное количество товаров заказе больше или равно единице [PASSED]
Количество дубликатов по order_id [PASSED]
All is good. No failures. No warnings. No errors.
Проверка отсутствия данных
Здесь тоже все максимально просто: проверяем, не содержат ли наши данные пропусков в критичных колонках. SodaCL позволяет задавать разные уровни проверок. Например, если объем пропусков не превышает 10%, выдаем предупреждение, а больше 10% — это фейл.
checks for t_soda_test:
- missing_count(user_id) = 0
- missing_percent(product_id):
warn: when > 0%
fail: when >= 10%
Soda Core 3.3.1
Scan summary:
2/2 checks PASSED:
t_soda_test in postgres_test
missing_count(user_id) = 0 [PASSED]
missing_percent(product_id) warn when > 0% fail when >= 10% [PASSED]
All is good. No failures. No warnings. No errors.
Проверка актуальности данных
Практически во всех таблицах хранятся данные о времени совершения событий, или содержится чисто техническое поле с датой и временем загрузки сведений в таблицу. Soda Core позволяет перед запуском последующих обработок проверить «свежесть» данных. В доках — https://docs.soda.io/soda-cl/freshness.html можно найти описание таких проверок. В нашем случае проверяем что:
в таблице есть заказы, сформированные через час или меньше от времени последней проверки;
в таблице есть данные о заказах, совершенных в течение дня.
Для этого указываем колонку в таблице, которая должна быть в формате date/timestamp. Проверки можно кастомизировать: задать дату, с которой хотим сравнить данные, но по дефолту Soda будет проверять данные и сравнивать их с моментом запуска последней.
checks for t_soda_test:
- freshness(order_dt) <= 1h
- freshness(order_dt) <= 1d
Soda Core 3.3.1
Scan summary:
1/2 checks PASSED:
t_soda_test in postgres_test
freshness(order_dt) <= 1d [PASSED]
1/2 checks FAILED:
t_soda_test in postgres_test
freshness(order_dt) <= 1h [FAILED]
max_column_timestamp: 2024-04-30 15:00:00+03:00
max_column_timestamp_utc: 2024-04-30 12:00:00+00:00
now_variable_name: NOW
now_timestamp: 2024-04-30T15:12:53.186588+00:00
now_timestamp_utc: 2024-04-30 15:12:53.186588+00:00
freshness: 3:12:53.186588
Oops! 1 failures. 0 warnings. 0 errors. 1 pass.
Проверка валидности данных
С помощью проверок этого типа мы можем, например, убедиться, что имейлы пользователей написаны в правильном формате или валидировать колонки с датой рождения покупателей с помощью регулярных выражений…
Сделать это можно:
с помощью встроенных в SodaCL проверок. В библиотеке уже зашиты популярные форматы. Например, format email знает, как должен выглядеть валидный имейл, также есть номера телефонов.
написав свои проверки с помощью регулярных выражений. Как раз для случая, когда нужно валидировать колонку с датой рождения или с другими данными.
checks for t_soda_customer_test invalid_count(email_address) = 0: valid format: email invalid_percent(birthday) < 5%: valid regex: (0?[0-9]|1[012])
Собственные проверки
В Soda Core можно достаточно легко реализовывать проверки с собственной логикой. Для этого нужно написать SQL выражение прямо в рамках yaml-файла. В отличие от проверки числовых метрик, в этом случае мы проводим вычисления. Содержание самой таблицы при этом не меняем.
Например, проверим, что выручка за вычетом скидки в 20% находится в диапазоне между 100 и 500.
checks for t_soda_test:
avg_revenue_with_discount between 100 and 500:
avg_revenue_with_discount expression: AVG(revenue * 0.8)
name: Средняя выручка за вычетом скидки 20%
Soda Core 3.3.1
Scan summary:
1/1 check PASSED:
t_soda_test in dwh_test
Средняя выручка за вычетом скидки 20% [PASSED]
All is good. No failures. No warnings. No errors.
Кажется, это очень полезная функция для накручивания специфической бизнес-логике. При этом все на родном и понятном SQL.
Что еще доступно из коробки в Soda
Выше мы пробежались по основным проверкам. Если захотите копнуть глубже, возможности «из коробки» приятно удивят. Я подготовил небольшую таблицу с кратким описанием того, что еще можно проверить с помощью Soda Core. В таблице добавил ссылки на соответствующие разделы официальной документации.
Тип проверки | Описание | Ссылка на документацию |
Distribution checks | Проверка распределения метрики, удобно для выявления аномалий и выбросов на раннем этапе (нужно устанавливать доп. пакет) | https://docs.soda.io/soda-cl/distribution.html |
Failed rows checks | Проверяем строки таблицы на ошибочные значения | https://docs.soda.io/soda-cl/failed-rows-checks.html |
Anomaly detection checks | Поиск аномалий на базе значений таблицы | https://docs.soda.io/soda-cl/anomaly-detection.html |
Schema checks | Проверяем схему таблицы, типы данных и т.д. | https://docs.soda.io/soda-cl/schema.html |
Reference checks | Проверяем, что значения в столбце соответствуют значениям другой таблицы (Foreign Key) | https://docs.soda.io/soda-cl/reference.html |
Cross checks | Проверяем что, например, количество строк в двух таблицах совпадает (актуально даже для разных баз) | https://docs.soda.io/soda-cl/cross-row-checks.html |
Reconciliation checks | Можем проверить, например, при миграции данных из одного источника в другой, что целевые данные соответствуют данным на источнике | https://docs.soda.io/soda-cl/recon.html |
Автоматизируем проверки качества данных с помощью Apache Airflow
Выше я привел примеры запуска проверок данных непосредственно из командной строки. Для ознакомления с инструментом этого вполне достаточно, но для внедрения проверок в промышленных масштабах нужно их через что-то запускать и оркестрировать. Ниже продемонстрирую пример запуска проверки по расписанию с помощью Apache Airflow.
Вариантов тут может быть несколько:
Устанавливаем Soda Core CLI на удаленную машину. Используя SSHOperator, подключаемся к ней из Airflow и запускаем проверки через BashOperator.
Устанавливаем библиотеку в образ самого Airflow. Не забываем попутно проверить, что версии библиотек не конфликтуют между собой. После этого можно даже написать какой-нибудь оператор, которому на вход можно подавать названия файлов/схем для проверки.
Используем VirtualenvOperator в Airflow. При запуске дага создаем временное виртуальное окружение, скачиваем нужный пакет soda-core и запускаем проверки.
from datetime import timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
CONFIG_FILE = (Path(file).parent / 'configuration.yml').as_posix()
CHECKS_FILE = (Path(file).parent / 'checks.yml').as_posix()
DATA_SOURCE = 'postgres_test'
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
def run_soda_scan(config_file, data_source, checks_file):
from soda.scan import Scan
scan = Scan()
scan.set_verbose()
scan.add_configuration_yaml_file(config_file)
scan.set_data_source_name(data_source)
scan.add_sodacl_yaml_files(checks_file)
scan.execute()
print(scan.get_logs_text())
if scan.has_check_fails():
raise ValueError(f"Soda Scan failed with errors!")
else:
print("Soda scan successful")
return 0
with DAG(
'soda_core_airflow_test',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
) as dag:
first_step = DummyOperator(
task_id='first_step'
)
soda_core_scan_op = PythonVirtualenvOperator(
task_id='dq_soda_scan',
python_callable=run_soda_scan,
requirements=["-i https://pypi.cloud.soda.io", "soda-core-postgres"],
op_kwargs={
'config_file': CONFIG_FILE,
'data_source': DATA_SOURCE,
'checks_file': CHECKS_FILE
},
system_site_packages=False
)
next_step = DummyOperator(
task_id='next_step'
)
first_step >> soda_core_scan_op >> next_step
Выводы
После первых тестов Soda Core показывает себя очень хорошо. Инструмент прост в освоении, универсален, имеет много коннекторов к различным БД. Можно даже настраивать end-to-end проверки от источника до целевой витрины.
Из явных плюсов Soda Core я бы выделил:
интуитивно понятный язык SodaCL;
низкий порог входа;
много решений из коробки (даже поиск аномалий в данных подвезли);
возможность с помощью понятного SQL реализовать собственную логику проверки данных.
Из явных минусов пока вижу только отсутствие в бесплатной версии хранения историчности проверок и красивого интерфейса. Еще нет интеграций с тем же Slack, но это не сложно реализовать самостоятельно.
В бою инструмент еще не проверен. Мы только готовимся проводить нагрузочное тестирование. Будем смотреть, как Soda покажет себя на больших объемах. Здесь есть некоторые опасения. Они связаны с тем, что запуск большого количества проверок дополнительно нагрузит БД. Правда, нагрузку можно регулировать: дополнительно фильтровать данные с помощью встроенных инструментов в SodaCL, снижая тем самым объем данных для проверки.
Из альтернативных и более популярных open-source решений можно выделить «Great eXpectations». Наши коллеги из ML-команды уже активно используют GX для проверок данных перед обучением моделей. И даже рассказывали об этом в статье на Хабре.
GX мы пока внимательно не исследовали. Издалека он выглядит функциональным: возможных метрик и проверок данных там очень много, есть «родная» интеграция с Airflow. Но пока кажется, что для его использования нужен больший ресурс на поддержку, нежели на SODA.
Спасибо, что прочитали эту статью. Буду рад по возможности ответить на ваши вопросы в комментариях. Также призываю делиться своим опытом по внедрению DQ и рассказывать об инструментах, которые для этого используете.