Как благодаря переезду хранилища данных прокачать стек, архитектуру и скиллы команды
Приветствую всех читателей! Меня зовут Николай Самсонов. Я являюсь руководителем платформы данных в команде Учи.ру. В своей работе часто сталкиваюсь с ситуацией, когда бизнесу нужны метрики и показатели здесь и сейчас, в то время как автоматизация получения и обработки терабайт данных для их расчета может занимать значительное количество времени.
Правильный стек, правильная архитектура и правильное видение процесса ELT — залог успешной аналитики, с этим никто не спорит. Но как прийти к ним и как найти баланс между затратами времени на исследование и поддержкой уже сделанного в бесконечном потоке A/B-тестирований, дашбордов, метрик и Ad hoc-запросов?
Точного ответа у меня нет, но хочу рассказать про наш опыт переездов хранилища данных, за счет которых мы смогли качнуть баланс равновесия между задачами операционными (Run) и связанными с изменениями (Change) в пользу вторых. В наше хранилище грузятся обезличенные данные пользователей, необходимые для расчета показателей и метрик. Наш опыт показывает, что переезды не «пожар», а возможность прокачать технологии и скиллы людей, которые занимаются построением DWH.
Переезд начинается
На протяжении долгого времени весь стек DP работал достаточно беспроблемно, но при этом SLA доезда прода был размыт и с ручным подключением дежурного дата-инженера занимал время с 13:00 до 15:00 ежедневно.
Как мы поняли в дальнейшем (и это небольшой спойлер всей истории), мощности железа позволяли сгладить многие шероховатости и неоптимальность кода. На нашем объеме, (если измерять ежедневной дельтой, то это цифры 14–16 TB), все сложности были связаны исключительно с этой неоптимальностью и человеческим фактором (последствиями внедрений).
Впоследствии для устранения этих неоптимальностей мы приняли решение о замене провайдера. Переезд к новому поставщику услуг дался нам непросто. Сначала сетевое окружение и виртуальные роутеры просто не были готовы к нашим объемам данных при сохранении текущего стека. Когда мы от тестовых ограниченных запусков перешли к полноценному нагрузочному тестированию, то стали постоянно ловить S3 throttling алерты. Затем нам перенастроили сеть, дали более мощную ноду под виртуальный роутер — наша Big Data наконец-то поехала.
Однако появились рандомные ошибки при записи файлов в S3-бакеты. При этом, если мы ловили их на Core-таблицах нижнего слоя, то ломались все витрины верхних слоев, и восстановление согласованных данных было возможно только полной перегрузкой прода.
По итогам множества встреч с представителями облачного хранилища и постоянных пересылок многогигабайтных логов мы получили достаточно честный диагноз происходящего от самого провайдера: «Нашли косвенные доказательства того, что наш сервис вернул неполный листинг в момент интенсивной записи в контейнер. Мы храним листинги в реплицированной БД и такое поведение возможно, если при создании объекта не обновилась одна из реплик. А при чтении листинга запрос оказался именно на ней. Это соответствует гарантиям Eventual Consistency, которые дает наше хранилище».
Мы стали размышлять, как с этим справиться и какие действия стоит предпринять в дальнейшем.
Начали с того, что проапгрейдили нашу систему data quality, , а именно:
фреймворк сравнения количества записываемых в моменте файлов Spark с количеством файлов в S3 после записи вместе со связкой ретраев (на практике вышли на достаточность трех);
систему ранних алертов, которая позволяла понять, что в данный конкретный день все пропало система ретраев по той или иной причине не дала результат (с дальнейшими действиями на выбор дежурного DE — от точечного перезапуска дагов до остановки прогрузки в CH в состоянии целостности данных в нем на Т-2 и одновременным перезапуском всего прода).
Затем мы сформулировали план по глобальному изменению инженерной архитектуры. Он был связан с внедрением одного из S3-коммитеров, которые должны были снизить нагрузку на хранилище при записи, либо должен был осуществить переход от листинга файлов из папки S3 к использованию меты записанных файлов (фактическая реализация с помощью Apache Iceberg). Внедрение такого инструмента, как Iceberg, требует множества изменений, ведь меняется сам подход работы с файлами. Как следствие, требуется перерасчет всех слоев DWH. В дальнейшем в этой статье я опишу, как и с каким эффектом мы его внедрили.
Для решения проблемы частичного недоезда части данных мы пробовали применять несколько различных коммитеров:
Дефолтный S3-коммитер. Он медленный и требует строгой консистентности данных. В случае с нашим провайдером он только увеличил время сохранения файлов в S3 и не решил проблему.
Staging directory-коммитер. С ним запись осуществляется быстрее, чем с дефолтным, но его нельзя использовать в партицированных таблицах. Он значительно сократил время записи файлов по сравнению с дефолтным, но мы продолжали периодически ловить ошибки.
Magic-коммитер. Гораздо бодрей и надежней, однако при его использовании остались проблемы с партициями (их постоянный пересчет и пуш в Hive Metastore в больших таблицах мог занимать до 10–15 минут).
Итак, после использования коммитеров наша проблема неполной записи данных в хранилище, не поддерживающем строгую согласованность, уменьшилась, но полностью не исчезла. Это было связано с тем, что коммитер должен хранить информацию о том, что записывают другие воркеры в S3. И если при команде ls драйвер не видит чего-либо, значит, для него это и не существует.
В итоге частичные проблемы неполной записи данных не удалось исключить полностью, поэтому приняли решение перейти к новому провайдеру. К моменту второго переезда мы уже знали «что делать», какие тесты запускать, на какие показатели и алерты смотреть, какие вопросы задавать облачному саппорту.
Далее я подробней остановлюсь на всех изменениях в разрезе архитектуры и технологий, которые мы поменяли для оптимизации DWH при наших переездах.
Немного об архитектуре DWH Учи.ру
Когда в компании еще не было платформы данных, первые аналитики установили древнюю версию AirFlow. Таким образом, вся архитектура состояла из того самого первого подхода (сделать «здесь и сейчас») и последующих итераций на эту же тему.
При переезде мы оставили общий подход хранения данных с разделением его на «холодное хранение» (будь то наши parquet«ы в S3 или не часто используемые Jupyter Notebook) и «горячее хранение» в виде ClickHouse на SSD, на который смотрят BI-дашборды Tableau или техническая учетка А/Б-тестера.
Сам S3-бакет данных в облаке делится на RAW (грузим как есть из источников) и STORAGE (трансформируем и рассчитываем). Также в STORAGE выделяется слой data mart (из которого load-даги уже производят выгрузку в Clickhouse). Помимо data mart существуют продуктовые «песочницы» — следствие микросервисной архитектуры (созданы для данных, которые нет необходимости мерджить с данными других источников).
Что изменилось в архитектуре DWH
Во-первых, в Storage был невыраженный слой плоских данных и вырожденный слой произвольного набора агрегатов. Первому мы дали основной приоритет (сделали условным DDS) с вынесением в него многих плоских витрин из data mart, второй полностью заменили на слой основных переиспользуемых данных — CDM.
Во-первых, в Storage был невыраженный слой плоских данных и вырожденный слой произвольного набора агрегатов. Первому мы дали основной приоритет (сделали условным DDS) с вынесением в него многих плоских витрин из data mart, второй полностью заменили на слой основных переиспользуемых данных — CDM.
В-третьих, с помощью Grafana мы поигрались с очередями запуска слоев и распределением дагов между слоями. Итого получилось пять основных слоев DWH: raw, ushi (DDS), dict, cdm, data mart.
Кроме этого, мы привнесли на прод возможность делать версионированные таблицы (Slowly Changing Dimensions).
Что изменилось в стеке DWH
Наши инженеры и DevOPS-инженеры значительно видоизменили и улучшили многие инстансы и инструменты стека.
Заменили YARN на K8s
Эта идея была бы осуществлена нами и без переезда — в силу преимуществ, которые дает «кубер» по сравнению с «еще одним менеджером ресурсов».
Он позволяет:
подключать дополнительные ноды из квоты провайдера при высокой нагрузке с последующим их отключением;
запускать разные версии Spark и Python в одном кластере и обеспечивать беспроблемное совместное использование ресурсов;
поддерживать множество фреймворков, а также гибко их настраивать.
Дополнительно нужно сказать, что весь кейс с K8s явился триггером множества других доработок, о которых будет написано ниже.
Обновили ClickHouse до последней версии
А также отточили и оптимизировали настройки балансировщика, плюс — подняли небольшой тестовый ClickHouse рядом с основным (как расширенную песочницу для аналитиков и пробных обновлений версий самого CH).
Усовершенствовали Zeppelin
Мы сделали две ноды в нод-группе, которая предназначена для Zeppelin (вместо одной), а также установили лимиты на отдельные интерпретаторы, чтобы они не потребляли всю доступную память ноды (ранее при этом периодически страдала работоспособность самого сервера Zeppelin). В данный момент, если даже произойдет подобная ситуация на одной из нод, сервер должен пересоздаться на второй и продолжить работу бесшовно для пользователя этого инстанса.
Внедрили OpenLDAP-сервер
У нас достаточно много сервисов для работы с Big Data, и в каждом есть свои отдельные скрипты и файлы конфигурации для добавления новых пользователей. Это показалось нам неудобным: хотелось найти инструмент, который собрал бы всех пользователей, а сервисы могли обращаться к нему за аутентификацией. Удобным казался Google SSO, однако не все приложения его поддерживают (или нужно было доработать его какими-то сторонними решениями). Зато LDAP-интеграция была у всех приложений из коробки. Дело было за малым: написали Ansible, протестировали и внедрили OpenLDAP-сервер.Также с момента появления LDAP (а с ним пришла на прод и возможность заведения ролей с помощью SQL вместо config-файла) сделали автоматический revoke и grant для учеток аналитиков в ClickHouse в связи с тем, что CH по дефолту не поддерживает acid (кроме экспериментальных функций). Это существенно упростило жизнь аналитиков, которые могли получать все время разные данные днем в момент, когда мы шатали отдельные таблицы на проде по тикетам других аналитиков, а также жизнь дата-инженеров, которые обычно в этом случае писали предупреждающие сообщения.
Задеплоили Vault
Для работы с конфиденциальными данными внутри нашего Kubernetes-кластер мы развернули Hashicorp Vault в и используем плагин Vault-CRD для чтения секретов подами прямо из Vault DB. Также в отдельном разделе мы храним ключи, которые могут понадобиться нашим data и devops инженерам, куда есть доступ через веб-интерфейс с аутентификацией через LDAP, разграничением прав доступа к ключам и настройке хранилища секретов в зависимости от роли пользователя.
Усовершенствовали JupyterLab
После переезда в Kubernetes пользователям JupyterLab часто не хватало выделенного ресурса для хранения данных, а вечно делать ресайз тома не казалось удобным решением. Что хотят пользователи, хранящие гигабайтные архивы и не желающие в гит? Правильно — безразмерное объектное хранилище S3. Из коробки JupyterLab не позволял монтировать S3-объекты в корень рабочей директории, поэтому мы взяли драйвер S3 от Yandex, добавили StorageClass и в деплойменте описали новый PV. Теперь пользователи JupyterLab могут хранить свои ноутбуки в папке S3 безразмерно и вечно.
Заменили Thrift на Kyuubi
Для работы с данными в S3 первоначально использовался такой инструмент, как Thrift-сервер, поднятый на отдельных виртуалках. Но у него были критические недостатки: медленная скорость работы даже на простых запросах и катастрофическая отказоустойчивость. Запустив Kyuubi, мы получили масштабируемость и возможность автовосстановления сервиса, а также улучшили перфоманс до 3–4 раз на больших запросах.
Обновили AirFlow
Заменили старую версию на последнюю, что существенно отразилось как на скорости (таски стали запускаться сразу же, а не через 2–3 минуты), так и на основном изменении de-стека — появились возможности интеграции AF c «кубером». Изменение версии AF (как и его работа внутри K8s) повлекло за собой изменение в написании декораторов и подхода к деплою.
Как мы внедряли Iceberg
Apache Iceberg — это, по сути плагин к Spark, который позволяет работать с данными в особом формате Apache Iceberg Open Table Format. Его основными особенностями являются отдельное хранение меты и версионность (со своим Rollback и Time Travel). Внутри каждого каталога появляется два подкаталога — дата и метадата, где в последней лежат файлы json и avro, описывающие parquet«ы в первой.
Последнее обстоятельство делает этот инструмент необходимым в случаях, когда требуются быстрые откаты к предыдущему состоянию таблиц. Это важно аналитикам, работающим в Data Lake с A/B-тестированиями на больших объемах данных. Мы же в основном используем его, чтобы убедиться, что все наши данные «доедут до цели».
Одновременно хочу заметить, что коммитер, находящийся под капотом в Apache Iceberg, дал значительный прирост скорости. А его мета позволила хранить историю апдейтов и откатов. Из относительных сложностей стоит упомянуть необходимость дополнительных настроек хранения меты (забор актуальных данных из других сервисов) и периодической чистки истории (снэпшотов), это позволяет не раздувать S3 до бесконечности.
При сверке таблиц, загруженных традиционным способом в старое хранилище и с помощью Iceberg, мы заметили, что некоторые наши источники достаточно сильно меняют информацию задним числом. При том, что инструмент по версионированию у нас был разработан, но не получил большой поддержки снизу у непосредственных заказчиков, каждый случай такого расхождения был разобран как отдельный кейс с вариантами решения: обновить, перенести как есть, выявить причину у разработчиков.
Итоги
Итак, подвожу итоги и отвечаю на вопрос, вынесенный в заглавие статьи: почему переезд хранилища данных — не «пожар», а уникальный опыт для прокачки стека?
Большинство улучшений в стеке (появление новых инструментов и оптимизация существующих настроек), произведенных нами при переезде, были ответной реакцией на новую среду. Их можно условно разделить на две большие группы:
1. Раз переезжаем, то сразу берем самую новую версию инструмента.
2. Попробовали как раньше, не вышло, начали искать лучший вариант.
Очень сомневаюсь, что при обычном Run-процессе можно было бы бесшовно и безболезненно поменять столь многое.
Переезд потребовал большего перекоса в сторону Change-задач, а также более долгосрочного и прицельного планирования, чем просто работа по спринтам. Не скажу, что мы полностью перешли на каскадную разработку: скорее, стали использовать разные методологии для разных типов задач. А еще мы неожиданно поняли, что благодаря стоящим перед нами вызовам удалось привлечь специалистов высокого уровня, которым были интересны очень нетривиальные задачи.
Любой переезд DWH не может быть долгим, ведь это период двойных костов для компании, которая продолжает тратить ресурсы на старый прод, для непрерывного получения актуальной информации в старом хранилище, и на установку и перенос данных в новое.
Также при одновременном заборе данных из определенных внешних реплик мы были вынуждены выбирать окна и перестраивать процесс, чтобы не привести их к состоянию даунтайма. Когда что-то делаешь второй раз, то уже становится понятно, куда нужно подстелить соломку.
Важно отметить, что при переезде невозможно значительно оптимизировать архитектуру хранения данных. Да, мы сделали все для снижения SLA доезда прода, создали тестовые пространства с расширенными правами для аналитиков и внедрили SCD. Но большинство таблиц нашего хранилища пока далеки от третьей нормальной формы, о Data Vault говорить не приходится.
Чтобы этого достичь, нам еще предстоит:
на время отойти от переездов и изменения стека, чтобы перестраивать (согласно правильным подходам) давно существующие витрины, на которых держится большинство дашбордов;
после этого постепенно переключать их со старых витрин на новые.
То есть осуществить такой «внутренний переезд». Мы ждем окончания внешнего переезда, чтобы вплотную заняться этим вопросом.