Как тестировать в Databricks: Nutter Framework

f23944f89774454e220faa6879be308a.png

Disclaimer

Статья предполает, что вы знаете что такое Databricks, что такое ноутбуки, кластера и джобы (воркфлоус) в нём. Поиск по хабру предлагает вот эту статью для ознакомления.

Вступление

Я продолжаю серию статей, где анализирую свой текущий проект в области BigData. Первая статья была посвящена найму питонистов, вторая — проблемам внедрения новых процессов в распределённых командах. Однако помимо чисто управленческих задач, в проекте есть ряд технических сложностей. Одной из таких сложностей является тестирование платформы обработки данных.

Если с тестированием более привычных программных продуктов более-менее ясно, то вот с BigData возникает множество вопросов. Если у вас Java — у вас есть как минимум JUnit, а абсолютное большинство фреймворков заботятся о простоте тестирования. Например Spring посвящает этому очень много документации. Тестирование фронтенда тоже хорошо проработано: от Selenium до JestJs. Тестировать блокчейн и смарт-контракты одно удовольствие (хотя бы на Ethereum сети благодаря Truffle Suite)

На Питоне тоже есть свои фреймворки для тестирования и вопрос этот вполне себе проработан. Даже сам Databricks, на базе которого построена наша платформа обработки данных, предлагает свои пути для тестирования. Например вот неплохой официальный гайд по тестированию: Unit testing for notebooks. Однако нужно иметь репозиторий внутри самого Databricks. А у нас код хранится в корпоративном GitLab, который недоступен из нашего же Databricks. Очень неудобно, но отдел безопасности не даёт разрешения настроить доступ из внешнего ресурса во внутреннюю сеть.

Есть возможность по тестированию прямо внутри ноутбуков Databricks: Test Databricks notebooks. Из минусов: сложность в организации кода, урезанный импорт ноутбуков внутри Databricks.

Мы сделали несколько попыток тестировать нашу платформу и так, и сяк, но всё выходило весьма «велосипедно». И тут мы наткнулись и решили попробовать Nutter Framework. То, что он обещал очень подходило для наших потребностей. Респект Microsoft за такой тул в opensource.

Nutter: очень краткое руководство

Главная цель данного фреймворка — дать возможность легко и быстро тестировать ноутбуки в Databricks. Nutter предлагает определённый подход к написанию тестов и несколько подходов к их выполнению.

Самый простой тест

Во-первых, чтобы начать работать с тестами, необходимо поставить библиотеку nutter на тот кластер, где будут выполняться тестовые ноутбуки:

1f55cdb0931e625f79ee66539e7644ee.png

Далее создаём ноутбук, импортируем базовый класс NutterFixture и начинаем писать тестовый класс:

from runtime.nutterfixture import NutterFixture, tag
class FirstTestFixture(NutterFixture):
    def run_test(self):
        dbutils.notebook.run('notebook_to_test', 600)
    def assertion_test(self):
        assert True
    def run_test_secundo(self):
        dbutils.notebook.run('another_notebook_to_test', 600)
    def assertion_test_secundo(self):
        assert True

Чтобы выполнить и увидеть результаты выполнения тестов, запускаем сам тестовый класс:

result = FirstTestFixture().execute_tests()
print(result.to_string())

Тесты можно запускать через коммандную строку (см. ниже как это сделать). И чтобы вернуть результаты выполнения тестов в nutter-cli, нужно выполнить в конце ноутбука:

result.exit(dbutils)

Однако есть неприятное ограничение: внутри result.exit вызывает dbutils.notebook.exit(), что приводит к тому, что Databricks прячет все выводы от команд print . Поэтому если запускать тесты прямо ноутбуках, строчку с exit нужно закомментировать.

Правила именования тестов

Внутри одного тест-класса может быть несколько тестов. Каждый тест состоит из 1 обязательного метода и 3 дополнительных:

  • before_(testname) — выполнятеся перед run Используется для настройки тестов и выполнения подготовительных действий. Необязателен.

  • run_(testname) — выполняется после before (если он имеется) или первым. Тут должны быть действия, которые непосредственно тестируются, например вызов ноутбука. Необязателен.

  • assertion_(testname) — выполняется после run (если он есть). Содержит проверки состояний. Можно использовать assert из любых питоновских тестовых библиотек. Каждый тест-класс должен содержать хотя бы 1 assertion-метод.

  • after_(testname) — выполняется после assertion. Обычно используется, чтобы вернуть состояние тестовых объектов в исходное или «почистить» что-то после выполнения теста.

Пример: методы run_checkpoint_location_generation и assertion_checkpoint_location_generation будут трактоваться как 1 тест-кейс.

Дополнительно есть два метода:

  • before_all

  • after_all

Они выполняются соответственно до и после всех тестов. Если вы хотите использовать несколько отдельных assertion на 1 тест-кейс, тогда нужно использовать before_all и для подготовки теста и для вызова тестируемых действий.

from runtime.nutterfixture import NutterFixture, tag
class MultiTestFixture(NutterFixture):
  def before_all(self):
    dbutils.notebook.run('notebook_under_test', 600, args) 
    #...
  def assertion_test_case_1(self):
    #...
  def assertion_test_case_2(self):
    #...
  def after_all(self):
    #... 

Nutter гарантирует выполнение тестов в алфавитном порядке на основе имени тест-кейса.

Также Nutter позволяет сохранять состояние и передавать данные между тест-кейсами через параметры конструктора:

 class TestFixture(NutterFixture):
  def __init__(self):
    self.file = '/data/myfile'
    NutterFixture.__init__(self)

Паралелльный запуск тестов

Когда тестов много, то хочется запускать их в несколько потоков и сократить время выполнения. Последняя версия Nutter`а (0.1.35) позволяет это сделать с помощью NutterFixtureParallelRunner :

from runtime.runner import NutterFixtureParallelRunner
#...
parallel_runner = NutterFixtureParallelRunner(num_of_workers=2)
parallel_runner.add_test_fixture(FirstTestFixture())
parallel_runner.add_test_fixture(AnotherTestFixture())
result = parallel_runner.execute()
print(result.to_string())

Резульаты тестов объединяются и показываются в удобном виде:

Notebook: N/A - Lifecycle State: N/A, Result: N/A
Run Page URL: N/A
============================================================
PASSING TESTS
------------------------------------------------------------
test_another_case (40.98681362399998 seconds)
test (40.99089991400001 seconds)
test_secundo (10.680228194999927 seconds)


============================================================

Nutter CLI

Одна из главных фишек Nutter`а — это возможность запускать тесты из командной строки.

Для начала установим Nutter через pip:

$ pip install nutter

Затем зададим 2 переменных окружения, чтобы предоставить доступ Databricks:

Linux

export DATABRICKS_HOST=
export DATABRICKS_TOKEN=

Windows PowerShell

$env:DATABRICKS_HOST="HOST"
$env:DATABRICKS_TOKEN="TOKEN"

Для начала можно показать какие тесты могут быть исполнены:

 $ nutter list /common/test/nutter

Получаем что-то такое:

Nutter Version 0.1.35
++++++++++++++++++++++++++++++++++++++++++++++++++

--> Looking for tests in /common/test/nutter
--> 3 tests found

Tests Found
-------------------------------------------------------
Name:   test_StreamDeltaTransformation_multi
Path:   /common/test/nutter/test_StreamDeltaTransformation_multi

Name:   test_RefineryFromJsonMerge_single
Path:   /common/test/nutter/test_RefineryFromJsonMerge_single

Name:   test_StreamDeltaTransformation_single
Path:   /common/test/nutter/test_StreamDeltaTransformation_single

-------------------------------------------------------

Total: 3

А теперь запустим какой-то конкретный тест:

 $ nutter run /common/test/nutter/test_StreamDeltaTransformation_single --cluster_id '0000-099999-abcdabcd'

Эта команда запускает конкретный тест-класс на определённом кластере (на кластере должна быть установлены библиотека nutter)

Чтобы запустить все тест-классы, можно использовать следующую команду

$ nutter run /common/test/nutter/ --cluster_id 0123-12334-tonedabc --recursive 

Имена тестов должны начинаться с test_

Флаг --recursiveобеспечивает поиск тестовых классов рекурсивно во всех поддиректориях.

Передавать какие-то параметры можно через опцию --notebook_params:

$ nutter run /common/test/nutter/* --cluster_id 0123-12334-tonedabc --notebook_params "{\"example_key_1\": \"example_value_1\", \"example_key_2\": \"example_value_2\"}"

Пример реального тест-класса

Спасибо @raukasky за написание конкретных тест-классов для сервисов платформы.

# команда ниже импортирует длинные yaml-конфиги, которые отправляются на вход нашим сервисам
%run ../configs/stream_delta_transformation_test_cases
#...
from runtime.runner import NutterFixtureParallelRunner
from runtime.nutterfixture import NutterFixture
from cds_utils.platform import ConfigRunner # это класс, который может запускать сервисы платформы
#...
class TestCase_01(NutterFixture):
    def __init__(self):
        self.test_uc_path = 'dataservices_nonprod.test_nonprod'
        self.s3_path = 's3://super_bucket.data'
        NutterFixture.__init__(self)
    def before_all(self):
        # так как у нас много проверок, то мы выполняем код в этом методе
        ConfigRunner(dbutils, spark).run_yaml_config(param_yaml=test_case_01, notebook_path='/../../../cds_platform/stream_delta_transformation')
    def assertion_isExistTable(self):
        # проверяем, что сервис вообще создал таблицу
        assert(spark.catalog.tableExists(f'{self.test_uc_path}.tindata_expected')==True)
    def assertion_table_location(self):
        # проверяем, что таблица созадан в нужном месте (как external)
        location = sql(f"describe detail {self.test_uc_path}.tindata_expected").select('location').collect()[0]['location']
        assert(location == f'{self.s3_path}/test/hive/test_nonprod/tindata_expected')
    def assertion_checkpoint_location(self):
        # проверяем, что после работы сервиса чекпоинт есть, и он в правильном месте
        location = [file.path for file in dbutils.fs.ls(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected')][0]
        assert(location == f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected/tindata/')
    def assertion_partitions(self):
        # проверяем, что конфиг прочитался правильно и таблица имеют верные партиции (как указано в yaml-конфиге)
        partitions = sql(f"describe detail dataservices_nonprod.test_nonprod.tindataxref").select('partitionColumns').collect()[0]['partitionColumns']
        assert(partitions == ['data_source_date'])
    def assertion_dedupe_column(self):
        # проверем, что дедупликация входных данных по первичному ключу прошла успешно
        # и что количество записей в целевой таблице ожидаемо меньше, чем в исходной
        num_of_change = spark.sql(f"select * from {self.test_uc_path}.tindataxref where source_id = 27 and data_source_ts = '2022-11-04T00:00:11.129+0000'").count()
        count_source = spark.sql(f"select * from {self.test_uc_path}.tindataxref").count()
        count_target = spark.sql(f"select * from {self.test_uc_path}.tindataxref_expected").count()
        assert(count_target == count_source-(num_of_change-1))  
    def after_all(self):
        # чистим за собой таблицы и директории с данными и чекпоинтами
        spark.sql(f'drop table {self.test_uc_path}.tindata_expected')
        dbutils.fs.rm(f'{self.s3_path}/test/hive/test_nonprod/tindata_expected', True)
        dbutils.fs.rm(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected', True)

Такой тест-класс можно отнести к уровню интеграционного тестирования: сервис stream_delta_transformation проверяется в окружении, максимально приближенном к реальному. Мы не внедряем моки или «шпионские» объекты, проверки идут на реально созданных таблицах и данных. Тем не менее, мы можем проверить некоторые методы внутри сервиса по тем результатам, что они выдают по итогам работы сервиса. Например мы проверяем, что целевые таблицы создаются, что они находятся в правильном месте (потому что все таблицы у нас external), и так далее. Кроме проверок работы конкретных методов, мы можем прочитать данные из целевой таблицы и проверить, что трансформация данных прошла корректно.

Пока мы запускаем это вручную из командной строки, но это уже огромный шаг вперёд для того, чтобы обеспечить интеграционное тестирование. А так как это вполне себе обычный python-код, то это открывает возможности проверки всего, что делают сервисы. Например, вполне реально написать тест на проверку публикации данных из Delta Lake в MySql, потому что в тестовом классе мы можем обратить в MySql напрямую и проверить, что там оказалось.

Выводы

Итак, Nutter Framework позволил нашей команде:

  • проводить интеграционное тестирование сервисов, написанных как ноутбуки в Databricks

  • запускать тесты как прямо в ноутубках в Databricks, так и из командной строки

  • написать набор регрессионных тестов на отдельные методы сервисов. Не так удобно как чистые unit-тесты, но значительно лучше, чем ничего. Время проведения регрессионного тестирования значительно снизилось

В дальнейшем мы планируем внедрить запуск тестов в процесс сборки и деплоя платформы, непосредственно в GitLab CI/CD. Как это делать для Azure есть в официальной документации.

Для нашего проекта Nutter стал отличным выбором и очень удобным инструментом тестирования.

В конце этой статьи хочу порекомендовать вам полезный вебинар от OTUS, где расскажут о профессии автоматизатора тестирования, об актуальных технологиях, пользе использования автотестов, а так же о нужных навыках и особенностях собеседований.

© Habrahabr.ru