К порядку: правила создания конвейеров обработки данных
К 2020 году вы не могли не заметить, что миром правят данные. И, как только речь заходит о работе с ощутимыми объёмами, появляется необходимость в сложном многоэтапном конвейере обработки данных.
Сам по себе конвейер обработки данных — это комплект преобразований, которые требуется провести над входными данными. Сложен он, например, потому, что информация всегда поступает на вход конвейера в непроверенном и неструктурированном виде. А потребители хотят видеть её в лёгкой для понимания форме.
В наших приложениях Badoo и Bumble конвейеры принимают информацию из самых разных источников: генерируемых пользователями событий, баз данных и внешних систем. Естественно, без тщательного обслуживания конвейеры становятся хрупкими: выходят из строя, требуют ручного исправления данных или непрерывного наблюдения.
Я поделюсь несколькими простыми правилами, которые помогают нам в работе с преобразованием данных и, надеюсь, помогут и вам.
Правило наименьшего шага
Первое правило сформулировать легко: каждое отдельное взятое преобразование должно быть как можно проще и меньше.
Допустим, данные поступают на машину с POSIX-совместимой операционной системой. Каждая единица данных — это JSON-объект, и эти объекты собираются в большие файлы-пакеты, содержащие по одному JSON-объекту на строку. Пускай каждый такой пакет весит около 10 Гб.
Над пакетом надо произвести три преобразования:
Проверить ключи и значения каждого объекта.
Применить к каждому объекту первую трансформацию (скажем, изменить схему объекта).
Применить вторую трансформацию (внести новые данные).
Совершенно естественно всё это делать с помощью единственного скрипта на Python:
python transform.py < /input/batch.json > /output/batch.json
Блок-схема такого конвейера не выглядит сложной:
Проверка объектов в transform.py занимает около 10% времени, первое преобразование — 70%, на остальное уходит 20% времени.
Теперь представим, что ваш стартап вырос и вам уже приходится обрабатывать сотни, а то и тысячи пакетов. И тут вы обнаружили, что в финальный этап логики обработки данных (занимающий 20% времени) закралась ошибка, — и вам нужно всё выполнить заново.
В такой ситуации рекомендуется собирать конвейеры из как можно более мелких этапов:
python validate.py < /input/batch.json > /tmp/validated.json
python transform1.py < /input/batch.json > /tmp/transformed1.json
python transform2.py < /input/transformed1.json > /output/batch.json
Блок-схема превращается в симпатичный паровозик:
Выгоды очевидны:
конкретные преобразования проще понять;
каждый этап можно протестировать отдельно;
промежуточные результаты отлично кешируются;
систему легко дополнить механизмами обработки ошибок;
преобразования можно использовать и в других конвейерах.
Правило атомарности
К правилу наименьшего шага прилагается второе — правило атомарности. Оно звучит так: каждый шаг-преобразование либо должен случиться, либо нет. Никаких промежуточных состояний данных быть не должно.
Давайте вернёмся к первому примеру. Есть входные данные, над которыми мы проводим преобразование:
python transform.py < /input/batch.json > /output/batch.json
Что будет, если в процессе работы скрипт упадёт? Выходной файл будет повреждён. Или, что ещё хуже, данные окажутся преобразованы лишь частично, а следующие этапы конвейера об этом не узнают. Тогда на выходе вы получите лишь частичные данные. Это плохо.
В идеале данные должны быть в одном из двух состояний: готовые к преобразованию или уже преобразованные. Это называется атомарностью: данные либо переходят в следующее правильное состояние, либо нет:
Если какие-то этапы конвейера расположены в транзакционной базе данных, то атомарность легко достигается использованием транзакций. Если вы можете использовать такую базу данных, то не пренебрегайте этой возможностью.
В POSIX-совместимых файловых системах всегда есть атомарные операции (скажем, mv или ln), с помощью которых можно имитировать транзакции:
python transform.py < /input/batch.json > /output/batch.json.tmp
mv /output/batch.json.tmp /output/batch.json
В этом примере испорченные промежуточные данные окажутся в файле *.tmp, который можно изучить позднее при проведении отладки или просто удалить.
Обратите внимание, как хорошо это правило сочетается с правилом наименьшего шага, ведь маленькие этапы гораздо легче сделать атомарными.
Правило идемпотентности
В императивном программировании подпрограмма с побочными эффектами является идемпотентной, если состояние системы не меняется после одного или нескольких вызовов.
Википедия
Наше третье правило более тонкое: применение преобразования к одним и тем же данным один или несколько раз должно давать одинаковый результат.
Повторюсь: если вы дважды прогоните пакет через какой-то этап, результаты должны быть одинаковы. Если прогоните десять раз, результаты тоже не должны различаться. Давайте скорректируем наш пример, чтобы проиллюстрировать эту идею:
python transform.py < /input/batch.json > /output/batch1.json
python transform.py < /input/batch.json > /output/batch2.json
diff /input/batch1.json /output/batch2.json
# файлы те же
python transform.py < /input/batch.json > /output/batch3.json
diff /input/batch2.json /output/batch3.json
# никаких изменений
На входе у нас /input/batch.json, а на выходе — /output/batch.json. И вне зависимости от того, сколько раз мы применим преобразование, мы должны получить одни и те же данные:
Так что если только transform.py не зависит от каких-то неявных входных данных, этап transform.py является идемпотентным (своего рода перезапускаемым).
Обратите внимание, что неявные входные данные могут проявиться самым неожиданным образом. Если вы слышали про детерминированную компиляцию, то главные подозреваемые вам известны: временные метки, пути в файловой системе и другие разновидности скрытого глобального состояния.
Чем важна идемпотентность? В первую очередь это свойство упрощает обслуживание конвейера. Оно позволяет легко перезагружать подмножества данных после изменений в transform.py или входных данных в /input/batch.json. Информация будет идти по тем же маршрутам, попадёт в те же таблицы базы данных, окажется в тех же файлах и т. д.
Но помните, что некоторые этапы в конвейерах по определению не могут быть идемпотентными. Например, очистка внешнего буфера. Однако, конечно же, подобные процедуры всё равно должны оставаться маленькими™ и атомарными™.
Правило избыточности
Четвёртое правило: насколько возможно откладывайте удаление промежуточных данных. Зачастую это подразумевает использование дешёвого, медленного, но ёмкого хранилища для входных данных:
Пример:
python transform1.py < /input/batch.json > /tmp/batch-1.json
python transform2.py < /tmp/batch-1.json > /tmp/batch-2.json
python transform3.py < /tmp/batch-2.json > /tmp/batch-3.json
cp /tmp/batch-3.json /output/batch.json.tmp # не атомарно!
mv /output/batch.json.tmp /output/batch.json # атомарно
Сохраняйте сырые (input/batch.json) и промежуточные (/tmp/batch-1.json, /tmp/batch-2.json, /tmp/batch-3.json) данные как можно дольше — по меньшей мере до завершения цикла работы конвейера.
Вы скажете мне спасибо, когда аналитики решат поменять алгоритм вычисления какой-то метрики в transform3.py и вам придётся исправлять данные за несколько месяцев.
Другими словами: избыточность избыточных данных — ваш лучший избыточный друг.
Заключение
Давайте подведём итоги:
разбивайте конвейер на изолированные маленькие этапы;
стремитесь делать этапы атомарными и идемпотентными;
сохраняйте избыточность данных (в разумных пределах).
Так обрабатываем данные и мы в Badoo и Bumble: они приходят через сотни тщательно подготовленных этапов преобразований, 99% из которых атомарные, небольшие и идемпотентные. Мы можем позволить себе изрядную избыточность, поэтому держим данные в больших холодном и горячем хранилищах, а между отдельными ключевыми преобразованиями имеем и сверхгорячий промежуточный кеш.
Оглядываясь назад, могу сказать, что эти правила выглядят очевидными. Возможно, вы даже интуитивно уже следуете им. Но понимание лежащих в их основе причин помогает видеть границы применимости этих правил и выходить за них при необходимости.
А у вас есть свои правила обработки данных?