[Перевод] Уроки, извлеченные из масштабирования до многотерабайтных датасетов
В этой статье я расскажу об уроках, которые вынес из работы с многотерабайтными датасетами. Объясню, с какими сложностями столкнулся при увеличении масштабов датасета и как их удалось решить.
Отмечу, что это не строгое руководство. Я познакомлю вас с основными концепциями и объясню, почему их следует применять. Не исключаю, что у описанных в статье инструментах могут быть более эффективные аналоги — рекомендую вам проявить инициативу и изучить их самостоятельно. Активное исследование — ключ к профессиональному росту.
Я разделил статью на две части: первая посвящена масштабированию на отдельной машине, вторая — масштабированию на множестве машин. Наша цель — максимизировать доступные ресурсы и как можно быстрее выполнить поставленные задачи.
Наконец, я бы хотел подчеркнуть, что никакие оптимизации или масштабирования не могут компенсировать изъяны алгоритма. Прежде чем выполнять масштабирование, обязательно оцените показатели алгоритма. Всегда начинайте с этого шага, чтобы создать надежный фундамент для последующей работы.
Масштабирование на одной машине
Joblib
Первое узкое место, которое приходит на ум при размышлениях о масштабировании — это вычислительные ресурсы. Масштабирование вычислений можно выполнить несколькими способами. Если вы дата-сайентист или ML-инженер, вы наверняка знакомы с Joblib — библиотекой, применяемой для параллельного выполнения кода (а также других задач). Ее часто используют в других библиотеках, например, в scikit-learn или в XGBoost.
Процесс распараллеливания операций при помощи Joblib выглядит достаточно просто (чуть упрощенный пример из документации Joblib):
>>> from joblib import Parallel, delayed
>>> from math import sqrt
>>> parallel_mapper = Parallel(n_jobs=-1)
>>> delayed_func = delayed(sqrt)
>>> jobs = [
delayed_func(x**2)
for x in range(10)
]
>>> parallel_mapper(jobs)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Joblib — это отличный способ масштабирования параллельных нагрузок. Эта библиотека используется в scikit-learn и других инструментах, она доказала свою надежность при работе с различными нагрузками. И это даже без учета других ее потрясающих функций, связанных с мемоизацией и Fast Compressed Persistence. Joblib отлично помогает распараллелить функцию на все ядра CPU.
GNU Parallel
GNU Parallel — это мощный инструмент препроцессинга или извлечения данных в CLI. Он отличается от Joblib своей универсальностью, а также тем, что его можно использовать вне скрипта. Параллельно с ним даже можно выполнять другие скрипты на Python. Один из наиболее популярных сценариев использования этой утилиты — одновременная распаковка множества файлов. Вот как это делаю я:
> ls
random_0.zip random_2.zip random_4.zip random_6.zip random_8.zip
random_1.zip random_3.zip random_5.zip random_7.zip random_9.zip
...
> mkdir output
> ls | parallel --eta --bar "unzip -q {} -d output/"
100% 10:0=0s random_9.zip
> ls output/
random_0.txt random_2.txt random_4.txt random_6.txt random_8.txt
random_1.txt random_3.txt random_5.txt random_7.txt random_9.txt
...
Если вы уже работали с терминалом Linux, вам должны быть понятны эти команды. Самое главное здесь — это передача имен файлов в parallel, чтобы unzip могла их распаковать.
Если вы уже настроили команду bash на работу с одним файлом, то достаточно немного изменить ее, чтобы распараллелить выполнение. По умолчанию parallel использует все доступные ядра CPU и может исполнять команды на нескольких машинах при помощи SSH; таким образом, эту систему можно использовать в качестве импровизированного вычислительного кластера.
Еще один сценарий использования — скачивание большого количества файлов. Чтобы параллельно загрузить все файлы, достаточно написать однострочный скрипт с wget, parallel и списком скачиваемых файлов. С этой задачей могут справиться и другие инструменты, например, axel и aria2c, но я выяснил, что при скачивании множества мелких файлов мой вариант работает лучше.
Важное замечание
Хотя данную систему можно использовать для скачивания большого количества файлов, учтите, что это может повысить нагрузку на серверы из-за создания множественных соединений, а следовательно — привести к перегрузке сети и снижению производительности для других пользователей. В таком случае ваши действия даже могут принять за DOS-атаку. Это особенно актуально для маленьких веб-сайтов или серверов с ограниченной пропускной способностью.
Известно, что разработчики aria2c отказались от предложений по увеличению максимального количества соединений с шестнадцати, несмотря на увеличение скорости компьютеров и существенное расширение пропускной способности сетей. Я разделяю их позицию и прошу вас подходить к скачиванию ответственно.
Добавлю, что несмотря на ускорение работы при использовании parallel, управление командами bash может быть сложной задачей. Это особенно актуально для новичков, работающих в командах, где остальные тиммейты в основном используют Python или другие традиционные языки программирования. Поэтому я рекомендую применять parallel для единичных задач, а не писать сложные конвейеры ETL на bash. Лучше хорошо поддерживаемого кода только полное отсутствие кода.
Масштабирование на множестве машин
Как понять, что пора использовать несколько машин
Ключевой признак того, что вам нужно переходить на использование нескольких машин (вспомним о Spark или о моем любимом Dask) — это когда вычисления в вашем сценарии занимают слишком много времени. Такая ситуация может возникнуть в контексте экспериментов, при обработке данных и т. д. В худших случаях вычисления под некоторые мои задачи заняли бы месяцы или даже год, если бы я работал на одном инстансе, даже если это AWS u-24tb1.112xlarge (настоящий зверь). Я против пустой траты чего бы то ни было и считаю, что чем оптимальнее вы сможете использовать имеющиеся ресурсы, тем лучше.
Распределив нагрузку среди нескольких более слабых машин, вы обретете множество преимуществ производительности. Некоторые решения при горизонтальном масштабировании обеспечивают практически линейное масштабирование CPU, памяти и сетевых ресурсов с увеличением количества инстансов.
Большинство относительно крупных инстансов EC2 обеспечивают скорость интернета до 10 Гбит/с, что позволяет устранить узкие места ввода-вывода, особенно если вы быстро передаете потоки данных в S3 и из него. Если для вашей рабочей нагрузки требуется, чтобы данные поступали со скоростью 50 Гбит/с, то у вас есть выбор: или использовать инстанс m7i.48xlarge ценой $9,67 за час со скоростью 50 Гбит или четыре инстанса m7i.8xlarge стоимостью $1,61 в час (суммарно $6,45) с той же пропускной способностью сети.
В качестве двух основных метрик я выбрал скорость сети и цену, но если вы стремитесь максимизировать объем памяти и ресурсы CPU, то можете провести сравнение с вышеупомянутым u-24tb1.112xlarge. По той же цене вы можете арендовать 135 инстансов m7i.8xlarge. Так вы получите 4320 CPU, 17,28 ТБ ОЗУ и 1687,5 Гбит скорости интернета (примерно в 17 раз больше)! Хотя ОЗУ получается меньше, я использовал здесь для масштабирования инстанс общего назначения, а не оптимизированный по памяти. Выбрав эквивалент, оптимизированный по памяти, мы получим 34,56 ТБ ОЗУ со всеми остальными преимуществами работы с несколькими машинами (запас мощности, более гибкий контроль за размером инстансов и так далее).
Более того, при подходящем бэкенде я могу масштабироваться на любое количество инстансов, приемлемых для моего сценария, инструмента оркестрации или бухгалтерского отдела. Такой уровень масштабируемости — это критически важное преимущество, позволяющее удовлетворить требованиям вашей рабочей нагрузки, не ограничиваясь возможностями одного инстанса.
Как обычно, у разных подходов есть свои преимущества. Ваша цель — оценить плюсы и минусы каждого решения и определить, что лучше всего подойдет для вашего сценария использования. Чтобы сделать правильный выбор, держите в голове основную цель: минимизировать затраты и максимизировать производительность.
Несмотря на эти невероятные преимущества, я рекомендую использовать множественные инстансы только в случаях, когда вы понимаете, с какими узкими местами вам предстоит столкнуться. Мне знакомы команды, которые прибегали к масштабированию, еще не поняв до конца свой сценарий использования, и в результате слишком усложняли подход к вычислениям. Поработав таких командах, я вынес для себя простой урок: в некоторых случаях правильно написанные CLI-инструменты могут обрабатывать данные быстрее, чем целый кластер Spark.
Различные модели вычислений
Для сильно параллелизуемых рабочих нагрузок
Сильно параллелизуемые нагрузки обычно проще всего масштабировать по сравнению с другими типами нагрузок. Мы уже вкратце говорили о том, как масштабировать вычисления с помощью Joblib или Parallel. А как насчет масштабирования на несколько машин?
Для масштабирования вычислений есть довольно много инструментов. Для одиночных сильно параллелизируемых рабочих нагрузок я бы рекомендовал использовать AWS Batch или AWS Lambda. Batch хорошо масштабируется и обходится намного дешевле, чем инстансы по запросу — при этом он справляется с задачами гораздо быстрее, чем инстансы, выполняемые параллельно на одной машине. Можно использовать и другие инструменты (например, GCP Cloud Run), но для долговременных задач я могу посоветовать только AWS Batch, потому что сам пользовался им раньше.
Так как настройка кластера может быть длительным процессом и не относится к теме нашей статьи, оставлю здесь эту ссылку на случай, если вам захочется изучить вопрос самостоятельно.
Стоит упомянуть одну тонкость: общая производительность вашей задачи будет больше ограничена скоростями чтения и записи, чем скоростью вычислений. Если вы выполняете чтение/запись в базу данных, то с большой вероятностью узким местом (а то и местом отказа) будет база данных. S3 — вполне подходящий вариант для чтения и записи, ведь он спроектирован под хорошую масштабируемость, но и у него есть свои ограничения: 3,5 тысячи операций чтения и 5,5 тысячи операций чтения на секционированный префикс. S3 спроектирован так, чтобы быть невидимым для пользователя при масштабировании, так что вы слабо сможете управлять тем, как он адаптируется к возросшей производительности.
После попадания данных в S3 (или в другой используемый вами сервис) можно передать их куда угодно.
Подготовка этой системы довольно утомительна, но она хорошо масштабируется в случае единичных задач. После нескольких итераций вы сможете снизить время настройки до считанных минут (это зависит от степени автоматизации процесса и потребностей вашей команды). В общем случае время, потраченное на настройку, позволяло сэкономить время вычислений и разработки, но я точно не планирую использовать ее для всех задач.
Аналитические рабочие нагрузки
Аналитические нагрузки масштабировать чуть сложнее. Это обусловлено тем, что мы обычно работаем с одним датасетом и пытаемся выполнить с ним множество операций. Также может присутствовать элемент интерактивности: например, что-то может работать в Jupyter Notebook. Для меня лучшим инструментом для масштабирования аналитических нагрузок стал Dask, а альтернативой ему — Spark. Dask и Spark — это опенсорсные инструменты, позволяющие масштабировать рабочие нагрузки на множество машин; они имеют свои плюсы и минусы. Кроме того, оба этих инструмента можно использовать локально, а их реализации DataFrame (Dask DataFrame и Spark Dataframe) можно использовать для масштабирования уже имеющихся рабочих нагрузок.
Dask гораздо проще в настройке и установке. Я могу запустить Dask локально за несколько минут одной командой (pip install "dask[complete]"
). Spark требует чуть более длительной настройки, и запустить его на моей локальной машине оказалось чуть сложнее. Кроме того, Dask удобен тем, что его может быстро освоить любой дата-сайентист, работавший с Pandas или Numpy, в то время как изучение Spark требует совершенно иного набора навыков. К тому же Dask лучше интегрирован со множеством инструментов PyData, то есть начать использовать его преимущества можно практически мгновенно.
Тем не менее, Spark и его экосистема гораздо более развиты; существует вероятность того, что ваша команда уже имела дело с настройкой и запуском кластера Spark. При работе с Dask у меня иногда возникают баги или проблемы с производительностью, а Spark благодаря своей зрелости обладает гораздо большей стабильностью. А еще Dask не подходит для долговременных вычислений.
Вот мои рекомендации с учетом этих доводов:
Если вы работаете в небольшой команде или стартапе без инфраструктуры для big data или распределенных вычислений, то рекомендую хотя бы поэкспериментировать с Dask вне зависимости от уровня опыта команды в Spark. За время, которое было бы потрачено на локальный запуск Spark, вы уже сможете проверить свой сценарий использования с помощью Dask, а ваша команда сможет применять другие инструменты из мира PyData.
Если вы работаете в большой организации, уже использующей Spark или другую крупную инфраструктуру обработки данных, то разумно будет использовать ее, если только у вас нет веских причин от нее отказаться. Я рекомендую посмотреть доклад Эрика Дилла Is Spark Still Relevant? — в нем рассказывается о том, почему крупные организации предпочитают использовать Spark вместо более современных инструментов. Докладу уже пять лет, так что некоторые аргументы могли устареть. Я все равно рекомендую попробовать Dask, тем более что оба фреймворка можно использовать одновременно.
Заключение
Масштабирование многотерабайтных датасетов и управление ими требуют глубокого понимания данных и инструментов работы с ними. Используя Joblib и GNU Parallel для масштабирования на одной машине, вы сможете максимизировать эффективность вычислительных ресурсов. Если необходимо масштабироваться на множество машин, то AWS Batch, Dask и Spark позволят создать надежные решения под разные рабочие нагрузки, от сильно параллелизуемых задач до сложных аналитических операций.
Самое главное — начинать оптимизацию алгоритмов до масштабирования. В противном случае оно будет только увеличивать неэффективность. Успешное масштабирование — это в первую очередь стратегическое планирование и менеджмент ресурсов, а не грубая вычислительная мощь. Попробуйте разобраться во всем на собственном опыте, и вы будете готовы мастерски справляться даже с самыми крупными датасетами.