Подружить Great Expectations с Impala: решение для больших данных
Всем привет, меня зовут Ольга Вишницкая, работаю главным аналитиком данных в одном департаментов Газпромбанка. Мы постоянно следим за развитием инструментов и технологий в области анализа данных, ищем и тестируем новые решения. И в какой-то момент один из наших стримов, который отвечает за качество данных, обратил внимание на Great Expectations (GX). Это отличная библиотека для анализа качества данных: от базовой валидации до сложного профилирования и автоматической генерации документации.
Но при внедрении мы столкнулись с проблемой: GX официально не поддерживает Impala — наш основной SQL-движок для обработки данных в Hadoop. Сначала мы решили пойти обходным путем через pandas DataFrame, благо GX прекрасно с ним работает. На тестовых данных все выглядело многообещающе, однако DataFrame может обрабатывает только около 15 000 строк за раз. Данные нужно дробить на части и по результатам теста на действительно больших объемов, обработка растянулась больше чем на сутки, а часть возможностей библиотеки мы вообще потеряли.
Стало понятно — нужно возвращаться к идее использования Impala. Тем более что он обрабатывает те же объемы данных за считанные минуты. Оставалось только найти способ подружить его с GX. В документации GX ни слова о том, как запустить проверки через неподдерживаемый движок. Пришлось экспериментировать самим, и после серии проб и ошибок нашли решение. Решила им поделиться.
Примечание: в этой статье я описываю работу с GX версии 0.17.19. Сейчас доступны новые версии библиотеки, но описанные подходы и логика решений остаются, в целом, теми же.
Базовая настройка
Первым делом нужно настроить подключение к Impala и организовать структуру проекта. Начнем с подключения:
from sqlalchemy import create_engine
engine = create_engine('impala://host:port/database')
С помощью функции create_engine
библиотеки SQLAlchemy создаем подключение к базе данных. Подробнее о конфигурации библиотеки можно почитать в документации.
Подключение кастомных проверок
Так же нужно добавить свои проверки. К сожалению, в документации GX нет информации о том, как подключать проверки из произвольной папки — описан только механизм замены через plugins или GitHub. Решение оказалось довольно простым:
Создаем папку для кастомных проверок в директории с основным кодом GX
В файле с основным кодом подключаем нужные проверки:
from .custom_expectations.expect_column_values_to_match_regex_impala import (
ExpectColumnValuesToMatchRegexImpala
)
Основные проблемы и их решения
Проблема №1: Регулярные выражения
Первое серьезное препятствие возникло при попытке использовать проверки на регулярные выражения, такие как expect_column_values_to_match_regex. Проверки просто не запускались на Impala, так как некоторые выражения Regex не поддерживается для диалекта.
Мы получали ошибки вида:
'ImpalaDialect' object has no attribute 'dialect'
Regex is not supported for dialect
Было два пути решения:
Сделать pull request с изменениями в библиотеку GX. Но это означало бы долгое ожидание подтверждения от разработчиков и последующие сложности с обновлением версий библиотеки в банковской инфраструктуре.
Создать свою проверку, где будем игнорировать проверку диалекта и возвращать условие регулярного выражения в понятном для Impala формате. Этот путь и выбрали.
Решение
1. Копируем в свою директорию исходный файл проверки регулярных выражений. Не забудьте изменить его название, а также поменять наименование проверки во всех местах, где она указана. В нашем случае мы везде добавили _impala:
map_metric = "column_values.match_regex_impala"
2. В файле expect_column_values_to_match_regex_impala указываем подключение файла с проверкой:
from .column_values_match_regex_impala import (ColumnValuesMatchRegexImpala)
Главное — заменить все названия проверок на свои.
3. Копируем в свою директорию еще один файл, меняем его название. А в самом файле меняем переменные condition_metric_name и regex_expression. В методе SQLAlchemy используется функция get_dialect_regex_expression, которая вызывает ошибку диалекта, так как в списках, поддерживаемых БД нет «импалы». Поэтому вызов этой функции мы можем отключить и сформировать свое условие (переменная regex_expression) регулярного выражения под нашу базу данных (мы взяли за основу выражение, которое используется для MySQL):
class ColumnValuesMatchRegexImpala(ColumnMapMetricProvider):
condition_metric_name = "column_values.match_regex_impala"
regex_expression = BinaryExpression(
column,
literal(regex),
custom_op("REGEXP")
)
4. Формируем условие регулярного выражения с помощью класса BinaryExpression из SQLAlchemy, где:
column — проверяемая колонка
sqlalchemy.literal (regex) — регулярное выражение
sqlalchemy.custom_op («REGEXP») — оператор
В результате выполнения этого выражения получается строка вида: table_nm REGEXP '%abv', в которой:
table_nm — колонка
REGEXP — оператор
'%abv' — регулярное выражение
Важно! Не забудьте добавить подключение BinaryExpressionl. Используйте literal (regex). Именно literal — без него не отработает.
from sqlalchemy.sql.elements import BinaryExpression, literal
Проблема №2: Зарезервированные слова
При работе GX на Impala мы столкнулись с синтаксическими ошибками. Дело в том, что GX при создании запросов к БД использует слова, которые являются зарезервированными в Impala, использование которых в БД невозможно. Вот примеры ошибок:
DBAPIError: (impala.error.HiveServer2Error) ParseException: Syntax error in line 2:
FROM (SELECT sum(condition) AS unexpected_count
^
Encountered: A reserved word cannot be used as an identifier: condition
Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER
Syntax error in line 3:
...d IN ('0') THEN CAST(1 AS NUMERIC) ELSE CAST(0 AS NUME...
^
Encountered: A reserved word cannot be used as an identifier: NUMERIC
Expected: ARRAY, BIGINT, BINARY, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, REAL, FLOAT, INTEGER, MAP, SMALLINT, STRING, STRUCT, TIMESTAMP, TINYINT, VARCHAR
Решение
Нужно изменить метку (label), название которой является зарезервированным словом в Impala, на любые другие разрешённые наименования. Например, в первой ошибке зарезервированным словом является condition. Заменяем его на другое название и обновляем все файлы, где используется эта проверка.
В случае с NUMERIC просто меняем тип данных на INTEGER, который предложен в самой ошибке как один из допустимых вариантов.
Вот пример изменений в коде.
Было:
# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later).
count_case_statement: List[sqlalchemy.Label] = sa.case(
(
unexpected_condition,
sa.sql.expression.cast(1, sa.NUMERIC),
),
else_=sa.sql.expression.cast(0, sa.NUMERIC),
).label("condition")
Стало:
# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later).
count_case_statement: List[sqlalchemy.Label] = sa.case(
(
unexpected_condition,
sa.sql.expression.cast(1, sa.Integer),
),
else_=sa.sql.expression.cast(0, sa.Integer),
).label("condition2")
Чтобы понять, какие файлы нужно заменять, мы смотрели логи — в них указывался путь возникновения ошибок. По этому пути находили проблемный файл на GitHub и дальше уже прослеживали цепочку подключений, чтобы увидеть, откуда тянется функция проверки.
Проблема №3: Подзапросы в Impala
Еще одна сложность возникла с подзапросами. При использовании некоторых проверок GX (например, ExpectColumnValuesToBeUnique) мы получали ошибку:
AnalysisException: Subqueries are not supported in the select list.
Дело в том, что GX использует вложенные подзапросы в разделе SELECT, которые запрещены в Impala.
Решение
Эту проблему можно обойти, если вынести не поддерживаемый Impala подзапрос в отдельный запрос и его результат подставить в основной подзапрос.
За основу мы взяли запрос из переменной unexpected_condition в файле map_condition_auxilliary_methods.py, вывели его через print и с помощью методов конструктора запросов SqlAlchemy сформировали такой же запрос в поддерживаемом Impala формате.
Вот как выглядит исходный запрос:
SELECT `UnexpectedCountSubquery`.unexpected_count
FROM (SELECT sum(condition2) AS unexpected_count
FROM (SELECT CASE WHEN (product_cd IS NOT NULL AND product_cd IN
(SELECT product_cd
FROM (SELECT *
FROM sbx_041.kva_xref_product_ge_test
WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd
HAVING count(product_cd) > %(count_1)s))
THEN CAST(%(param_1)s AS INTEGER)
ELSE CAST(%(param_2)s AS INTEGER) END AS condition2
FROM (SELECT *
FROM sbx_041.kva_xref_product_ge_test
WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2) AS anon_1)
AS `UnexpectedCountSubquery`
Из этого запроса нужно выделить подзапрос, который Impala не может обработать:
SELECT product_cd
FROM (SELECT *
FROM baza.test
WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd
HAVING count(product_cd) > %(count_1)s
С помощью метода SqlAlchemy этот запрос будет выглядеть так:
query = (
sa.select(sa.column(domain_kwargs['column']))
.select_from(selectable)
.group_by(sa.column(domain_kwargs['column']))
.having(sa.func.count(sa.column(domain_kwargs['column'])) > 1)
)
После выполнения мы получаем результат (назовем его условно result_impala) и подставляем в окончательный запрос:
SELECT "UnexpectedCountSubquery".unexpected_count
FROM (SELECT sum(condition2) AS unexpected_count
FROM (SELECT CASE WHEN product_cd IS NOT NULL AND product_cd IN (result_impala)
THEN CAST(:param_1 AS INTEGER)
ELSE CAST(:param_2 AS INTEGER) END AS condition2
FROM (SELECT *
FROM baza.test
WHERE product_open_dt = :product_open_dt_1) AS anon_2) AS anon_1)
AS "UnexpectedCountSubquery"
Последовательность замены файлов
При решении проблем с зарезервированными словами и подзапросами нам потребовалось заменить несколько файлов. Вот полная последовательность:
Заменяем наименование проверок (чтобы было отлично от вариантов GX) в подключении и в классах: great_expectations/expectations/core/expect_column_values_to_be_unique.py
Здесь заменяем только подключение: great_expectations/expectations/metrics/column_map_metrics/column_values_unique.py
И здесь: great_expectations/expecttions/metrics/map_metric_provider/column_map_metric_provider.py
И тут: great_expectations/expectations/metrics/map_metric_provider/map_metric_provider.py
Изменяем файл (как описано в разделе «Проблема №1: Регулярные выражения»): great_expectations/expectations/metrics/map_metric_provider/ map_condition_auxilliary_methods.py
Чтобы добраться до места, где возникает ошибка, нам пришлось подменить цепочку файлов, которые вызывают эту функцию. В принципе, таким образом можно подменить и убрать ошибку. Хоть этот способ и выглядит как костыль, но он рабочий и позволяет быстро решать проблемы.
Заключение
В результате наших экспериментов мы получили рабочее решение для интеграции GX с Impala. Да, местами пришлось использовать не самые элегантные подходы — подмена файлов библиотеки может показаться спорным решением. Но когда нужно быстро получить работающий инструмент, этот подход себя оправдал. А если у вас есть идеи, как сделать по-другому — пишите в комментариях.