Подружить Great Expectations с Impala: решение для больших данных

8d697d21c422956a9a6c19c8031420df.jpg

Всем привет, меня зовут Ольга Вишницкая, работаю главным аналитиком данных в одном департаментов Газпромбанка. Мы постоянно следим за развитием инструментов и технологий в области анализа данных, ищем и тестируем новые решения. И в какой-то момент один из наших стримов, который отвечает за качество данных, обратил внимание на Great Expectations (GX). Это отличная библиотека для анализа качества данных: от базовой валидации до сложного профилирования и автоматической генерации документации. 

Но при внедрении мы столкнулись с проблемой: GX официально не поддерживает Impala — наш основной SQL-движок для обработки данных в Hadoop. Сначала мы решили пойти обходным путем через pandas DataFrame, благо GX прекрасно с ним работает. На тестовых данных все выглядело многообещающе, однако DataFrame может обрабатывает только около 15 000 строк за раз. Данные нужно дробить на части и по результатам теста на действительно больших объемов, обработка растянулась больше чем на сутки, а часть возможностей библиотеки мы вообще потеряли.

Стало понятно — нужно возвращаться к идее использования Impala. Тем более что он обрабатывает те же объемы данных за считанные минуты. Оставалось только найти способ подружить его с GX. В документации GX ни слова о том, как запустить проверки через неподдерживаемый движок. Пришлось экспериментировать самим, и после серии проб и ошибок нашли решение. Решила им поделиться.

Примечание: в этой статье я описываю работу с GX версии 0.17.19. Сейчас доступны новые версии библиотеки, но описанные подходы и логика решений остаются, в целом, теми же.

Базовая настройка

Первым делом нужно настроить подключение к Impala и организовать структуру проекта. Начнем с подключения:

from sqlalchemy import create_engine
engine = create_engine('impala://host:port/database')

С помощью функции create_engine библиотеки SQLAlchemy создаем подключение к базе данных. Подробнее о конфигурации библиотеки можно почитать в документации.

Подключение кастомных проверок

Так же нужно добавить свои проверки. К сожалению, в документации GX нет информации о том, как подключать проверки из произвольной папки — описан только механизм замены через plugins или GitHub. Решение оказалось довольно простым:

  1. Создаем папку для кастомных проверок в директории с основным кодом GX

  2. В файле с основным кодом подключаем нужные проверки:

from .custom_expectations.expect_column_values_to_match_regex_impala import (
    ExpectColumnValuesToMatchRegexImpala
)

Основные проблемы и их решения

Проблема №1: Регулярные выражения

Первое серьезное препятствие возникло при попытке использовать проверки на регулярные выражения, такие как expect_column_values_to_match_regex. Проверки просто не запускались на Impala,   так как некоторые выражения Regex не поддерживается для диалекта.

Мы получали ошибки вида:

'ImpalaDialect' object has no attribute 'dialect'
Regex is not supported for dialect 

Было два пути решения:

  1. Сделать pull request с изменениями в библиотеку GX. Но это означало бы долгое ожидание подтверждения от разработчиков и последующие сложности с обновлением версий библиотеки в банковской инфраструктуре.

  2. Создать свою проверку, где будем игнорировать проверку диалекта и возвращать условие регулярного выражения в понятном для Impala формате. Этот путь и выбрали.

Решение

1.  Копируем в свою директорию исходный файл проверки регулярных выражений. Не забудьте изменить его название, а также поменять наименование проверки во всех местах, где она указана. В нашем случае мы везде добавили _impala:

map_metric = "column_values.match_regex_impala"

2.  В файле expect_column_values_to_match_regex_impala указываем подключение файла с проверкой:  

from .column_values_match_regex_impala import (ColumnValuesMatchRegexImpala)

Главное — заменить все названия проверок на свои.

3. Копируем в свою директорию еще один файл, меняем его название. А в самом файле меняем переменные condition_metric_name и regex_expression. В методе SQLAlchemy используется функция get_dialect_regex_expression, которая вызывает ошибку диалекта, так как в списках, поддерживаемых БД нет «импалы». Поэтому вызов этой функции мы можем отключить и сформировать свое условие (переменная regex_expression) регулярного выражения под нашу базу данных (мы взяли за основу выражение, которое используется для MySQL):

class ColumnValuesMatchRegexImpala(ColumnMapMetricProvider):
    condition_metric_name = "column_values.match_regex_impala"
    regex_expression = BinaryExpression(
        column, 
        literal(regex), 
        custom_op("REGEXP")
    )

4. Формируем условие регулярного выражения с помощью класса BinaryExpression из SQLAlchemy, где:

  • column — проверяемая колонка

  • sqlalchemy.literal (regex) — регулярное выражение

  • sqlalchemy.custom_op («REGEXP») — оператор

В результате выполнения этого выражения получается строка вида: table_nm REGEXP '%abv', в которой:

  • table_nm — колонка

  • REGEXP — оператор

  • '%abv' — регулярное выражение

Важно! Не забудьте добавить подключение BinaryExpressionl. Используйте literal (regex). Именно literal — без него не отработает. 

from sqlalchemy.sql.elements import BinaryExpression, literal

Проблема №2: Зарезервированные слова

При работе GX на Impala мы столкнулись с синтаксическими ошибками. Дело в том, что GX при создании запросов к БД использует слова, которые являются зарезервированными в Impala, использование которых в БД невозможно. Вот примеры ошибок:

DBAPIError: (impala.error.HiveServer2Error) ParseException: Syntax error in line 2:
FROM (SELECT sum(condition) AS unexpected_count
                ^
Encountered: A reserved word cannot be used as an identifier: condition
Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER
Syntax error in line 3:
...d IN ('0') THEN CAST(1 AS NUMERIC) ELSE CAST(0 AS NUME...
                                  ^
Encountered: A reserved word cannot be used as an identifier: NUMERIC
Expected: ARRAY, BIGINT, BINARY, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, REAL, FLOAT, INTEGER, MAP, SMALLINT, STRING, STRUCT, TIMESTAMP, TINYINT, VARCHAR

Решение

Нужно изменить метку (label), название которой является зарезервированным словом в Impala, на любые другие разрешённые наименования. Например, в первой ошибке зарезервированным словом является condition. Заменяем его на другое название и обновляем все файлы, где используется эта проверка.

В случае с NUMERIC просто меняем тип данных на INTEGER, который предложен в самой ошибке как один из допустимых вариантов.

Вот пример изменений в коде.

Было:  

# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later).
count_case_statement: List[sqlalchemy.Label] = sa.case(
    (
        unexpected_condition,
        sa.sql.expression.cast(1, sa.NUMERIC),
    ),
    else_=sa.sql.expression.cast(0, sa.NUMERIC),
).label("condition")

Стало:

# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later).
count_case_statement: List[sqlalchemy.Label] = sa.case(
    (
        unexpected_condition,
        sa.sql.expression.cast(1, sa.Integer),
    ),
    else_=sa.sql.expression.cast(0, sa.Integer),
).label("condition2")

Чтобы понять, какие файлы нужно заменять, мы смотрели логи — в них указывался путь возникновения ошибок. По этому пути находили проблемный файл на GitHub и дальше уже прослеживали цепочку подключений, чтобы увидеть, откуда тянется функция проверки.

Проблема №3: Подзапросы в Impala

Еще одна сложность возникла с подзапросами. При использовании некоторых проверок GX (например, ExpectColumnValuesToBeUnique) мы получали ошибку:

AnalysisException: Subqueries are not supported in the select list.

Дело в том, что GX использует вложенные подзапросы в разделе SELECT, которые запрещены в Impala.

Решение

Эту проблему можно обойти, если вынести не поддерживаемый Impala подзапрос в отдельный запрос и его результат подставить в основной подзапрос.

За основу мы взяли запрос из переменной unexpected_condition в файле map_condition_auxilliary_methods.py, вывели его через print и с помощью методов конструктора запросов SqlAlchemy сформировали такой же запрос в поддерживаемом Impala формате.

Вот как выглядит исходный запрос:

SELECT `UnexpectedCountSubquery`.unexpected_count
FROM (SELECT sum(condition2) AS unexpected_count
FROM (SELECT CASE WHEN (product_cd IS NOT NULL AND product_cd IN 
(SELECT product_cd
FROM (SELECT *
FROM sbx_041.kva_xref_product_ge_test
WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd
HAVING count(product_cd) > %(count_1)s)) 
THEN CAST(%(param_1)s AS INTEGER) 
ELSE CAST(%(param_2)s AS INTEGER) END AS condition2
FROM (SELECT *
FROM sbx_041.kva_xref_product_ge_test
WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2) AS anon_1) 
AS `UnexpectedCountSubquery`

Из этого запроса нужно выделить подзапрос, который Impala не может обработать:

SELECT product_cd
FROM (SELECT *
FROM baza.test
WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd
HAVING count(product_cd) > %(count_1)s

С помощью метода SqlAlchemy этот запрос будет выглядеть так:

query = (
    sa.select(sa.column(domain_kwargs['column']))
    .select_from(selectable)
    .group_by(sa.column(domain_kwargs['column']))
    .having(sa.func.count(sa.column(domain_kwargs['column'])) > 1)
)

После выполнения мы получаем результат (назовем его условно result_impala) и подставляем в окончательный запрос:

SELECT "UnexpectedCountSubquery".unexpected_count
FROM (SELECT sum(condition2) AS unexpected_count
FROM (SELECT CASE WHEN product_cd IS NOT NULL AND product_cd IN (result_impala) 
THEN CAST(:param_1 AS INTEGER) 
ELSE CAST(:param_2 AS INTEGER) END AS condition2
FROM (SELECT *
FROM baza.test
WHERE product_open_dt = :product_open_dt_1) AS anon_2) AS anon_1) 
AS "UnexpectedCountSubquery"

Последовательность замены файлов

При решении проблем с зарезервированными словами и подзапросами нам потребовалось заменить несколько файлов. Вот полная последовательность:

  1. Заменяем наименование проверок (чтобы было отлично от вариантов GX) в подключении и в классах: great_expectations/expectations/core/expect_column_values_to_be_unique.py

  2. Здесь заменяем только подключение: great_expectations/expectations/metrics/column_map_metrics/column_values_unique.py

  3. И здесь: great_expectations/expecttions/metrics/map_metric_provider/column_map_metric_provider.py

  4. И тут: great_expectations/expectations/metrics/map_metric_provider/map_metric_provider.py

  5. Изменяем файл (как описано в разделе «Проблема №1: Регулярные выражения»): great_expectations/expectations/metrics/map_metric_provider/ map_condition_auxilliary_methods.py

Чтобы добраться до места, где возникает ошибка, нам пришлось подменить цепочку файлов, которые вызывают эту функцию. В принципе, таким образом можно подменить и убрать ошибку. Хоть этот способ и выглядит как костыль, но он рабочий и позволяет быстро решать проблемы.

Заключение

В результате наших экспериментов мы получили рабочее решение для интеграции GX с Impala. Да, местами пришлось использовать не самые элегантные подходы — подмена файлов библиотеки может показаться спорным решением. Но когда нужно быстро получить работающий инструмент, этот подход себя оправдал. А если у вас есть идеи, как сделать по-другому —  пишите в комментариях. 

© Habrahabr.ru