[Перевод] Утилиты командной строки могут быть в 235-раз быстрее вашего Hadoop кластера
Примечания tsafin:
Перед публикацией своего цикла статей по MapReduce в Caché, мне показалось важным озвучить данную прошлогоднюю точку зрения из статьи Адама Дрейка «Command-line tools can be 235x faster than your Hadoop cluster». К сожалению оригинальная статья Тома Хайдена, на которую он ссылается стала уже недоступна на сайте Тома, но её, по-прежнему, можно найти в архивах. Для полноты картины предлагаю ознакомиться и с ней тоже.
Введение
Посещая в очередной раз свои любимые сайты, я нашел крутую статью Тома Хайдена об использовании Amazon Elastic Map Reduce (EMR) и mrjob для вычисления статистики отношения выигрыш/проигрыш в наборе данных со статистикой по шахматным матчам, которую он скачал с сайта millionbase archive, и c которой он начал играться используя EMR. Так как объем данных был всего 1.75GB, описывающий 2 миллиона шахматных партий, то я скептически отнесся к использованию Hadoop для данной задачи, хотя были и понятны его намерения просто поиграться и изучить плотнее, на реальном примере, утилиту mrjob и инфраструктуру EMR.
Изначально формулировка задачи (найти строку результата в файле и подсчитать расклад) больше подходила для поточной обработки посредством команд shell. Я попытался применить такой подход с похожим объемом данных на моем лаптопе, и получил результат примерно через 12 секунд (т.е. эффективная скорость обработки данных около 270МБ/сек), тогда как обработка в Hadoop заняла 26 минут (т.е. эффективная скорость обработки данных 1.14МБ/сек).
При рассказе об обработке такого объема данных на кластере из 7 машин c1.medium Том сказал, что это заняло 26 минут в кластере и «возможно это время лучше чем если бы я делал это последовательно на моей машине, но все-таки медленнее чем если бы обрабатывал это посредством хитрого, многопоточного приложения локально.»
Это абсолютно верно, хотя заметим, что даже локальная последовательная обработка может легко побить эти 26 заявленных минут. Да, Том делал этот проект просто чтобы развлечься и получить удовольствие, но часто многие другие используют инструменты Big Data (tm) для обработки и анализа такого, «не очень большого», объема данных, с которым получить результаты можно применяя инструменты и технике и попроще и побыстрее.
Один, наиболее недооцененный подход для обработки данных — это использование старых добрых инструментов и конструкций shell. Преимущества такого подхода огромны, т.к. создавая конвейер данных из shell команд, вы все шаги обработки запускаете в параллель с минимальными накладными расходами. Это, как если бы у вас был свой локальный кластер Storm. Можно даже попытаться перевести концепции Spouts, Bolts и Sinks в простые команды и конвейеры в shell. Вы можете легко конструировать конвейер для обработки данных из простых команд, и работать они будут быстрее большинства инструментов из арсенала Big Data (tm).
Также давайте поговорим об отличии поточного и пакетного подходов в обработке данных. Том упомянул в начале статьи, что если он загружал больше 10000 результатов игр локально, то его [Python] программа очень быстро вылетала по памяти. Это происходило из-за того, что все данные об играх загружались сначала в память для последующего анализа. Тем не менее, при более плотном исследовании задачи, можно создать конвейер по поточной обработке данных, который фактически не будет потреблять памяти.
Результирующий конвейер, который мы создали, был более чем 236 раза быстрее аналогичной Hadoop реализации и почти не съедал память.
Исследование данных
Первым шагом упражнения будет выкачивание этих файлов PGN. Т.к. я не представлял, как выглядит этот формат, то я заглянул в Wikipedia.
[Event "F/S Return Match"]
[Site "Belgrade, Serbia Yugoslavia|JUG"]
[Date "1992.11.04"]
[Round "29"]
[White "Fischer, Robert J."]
[Black "Spassky, Boris V."]
[Result "1/2-1/2"]
(moves from the game follow...)
Нас интересуют только результаты игр, с 3 возможными вариантами. 1-0 будет означать что выиграли белые, 0-1 – выиграли черные и 1/2-1/2 означает ничью в партии. Там так же есть результат - , который означает, что игра продолжается или была прервана, но мы игнорируем этот случай в нашем упражнении.
Получение набора данных
Первым делом постараемся скачать данные об играх. Это оказалось не так легко, но после короткого поиска в сети я нашел Git репозиторий rozim в котором было множество различных источников данных о шахматных играх за различные промежутки времени. Я использовал код из этого репозитория для компиляции набора данных объемом в 3.46ГБ, что почти вдвое больше объема данных, используемых Томом в его тесте. Следующим шагом поместим все эти данные в наш конвейер.
Построение конвейера по обработке данных
Если вы будете следить за повествованием и замерять каждый шаг, то не забудьте очищать кеш страниц вашей операционной системы для получения более правильных времянок при замерах обработки данных.
Использование команд из shell удобно для обработки данных, т.к. вы можете получить параллелизм исполнения команд «задарма». Проверьте, например, как исполнится данная команда в терминале.
sleep 3 | echo "Привет, мир"
Интуитивно может показаться, что здесь мы сначала задерживаемся на 3 секунды на первой команде и потом выдаем на терминал «Привет, мир». Но, на самом деле, обе команды запускаются на исполнение одновременно. Это один из основных факторов, почему мы можем получить такое ускорение на одной машине для простых команд, не нагружающих IO.
Перед тем как начать конструировать конвейер по обработки данных, давайте определим верхний поток производительности посредством простого копирования данных в /dev/null.
В этом случае операция занимает 13 секунд, и для массива 3.46ГБ это означает 272МБ/сек. Это будет нашей верхней границей скорости чтения с диска.
Теперь мы можем начать построение конвейера, и на первом шаге будем использовать cat для генерации потока данных:
cat *.pgn
Так как нас интересуют только строки с результатами, то мы можем сканировать файлы и выбирать только строки содержащие 'Result' посредством утилиты grep:
cat *.pgn | grep "Result"
[Непонятно, почему он сразу не запускал это как `grep Result *.pgn`?]
Это даст нам только строки содержащие Result. Теперь если хотите, то можете использовать команды sort и uniq для того, чтобы получить список уникальных элементов и для подсчета их количества.
cat *.pgn | grep "Result" | sort | uniq -c
Это очень простой подход для анализа, и он предоставляет нам результат за 70 секунд. Мы, конечно же, можем это ускорить, но и сейчас заметим, что с учетом линейного масштабирования это же самое заняло бы на кластере Hadoop примерно 52 минуты на обработку.
Чтобы ускориться, мы можем убрать шаги sort | uniq из конвейера и заместить их вызовом AWK, который является замечательным инструментом для обработки данных событий.
cat *.pgn | grep "Result" | awk '{ split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print white+black+draw, white, black, draw }'
Здесь мы возьмем всю запись, разделим по знаку дефис, и возьмем символ непосредственно ему предшествующий слева, если там будет 0, то выиграли черные, 1 – белые, и 2 – ничья. Заметим, что здесь $0 это встроенная переменная, представляющая всю запись.
Это уменьшает время исполнения до примерно 65 секунд, и т.к. мы обрабатываем примерно в 2 раза больше данных, то это ускорение примерно в 47 раз.
Даже на текущем шаге мы имеем ускорение в 47 раз при локальном исполнении. Более того, размер используемой памяти почти нулевой, т.к. хранятся данные только текущей записи, и инкрементирование 3х переменных целого типа — это ничтожная с точки зрения потребления памяти операция. Тем не менее, htop показывает, что grep в данной последовательности является узким местом, и полностью использует процессорное время одного ядра.
Параллелизация узкого места
Проблема недоиспользования дополнительных ядер может быть решена при помощи замечательной команды xargs, которая может запустить grep параллельно. Т.к. xargs ожидает вход в определенном формате, то будем использовать find с аргументом -print0 , для того чтобы имя передаваемого в xargs файла заканчивалось нулем. Соответственно передадим -0, чтобы xargs на своей стороне ожидал завершенные нулем строки. Опция -n управляет количеством строк передаваемым в один вызов, и -P означает количество параллельных запускаемых команд. Важно заметить, что такой параллельный конвейер не гарантирует порядка доставки, но это не проблема, если вы рассчитывали на распределенную обработку изначально. Опция -F в grep означает, что мы ищем по простому совпадению строки, и не спрашиваем никаких замороченных регулярных выражений, что в теории может дать дополнительный выигрыш по скорости, что не было замечено в экспериментах
[это утверждение не совсем верно для нашего примера, gnu grep строит детерминированный конечный автомат, что в данном простом выражении будет эквивалентно простому совпадению строки. Единственное что выигрываем, это время компиляции регулярного выражения, чем в данном случае можно пренебречь].
find . -type f -name '*.pgn' -print0 | xargs -0 -n1 -P4 grep -F "Result" | gawk '{ split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print NR, white, black, draw }'
В результате получаем время 38 секунд, что дало «привар» в 40% только за счет параллелизации запуска grep в нашем конвейере. Теперь мы примерно в 77 раз быстрее реализации на Hadoop.
Хотя мы и улучшили производительность «драматически» за счет параллелизации шага с grep, но мы можем вообще избавиться от него, заставляя сам awk искать нужные записи и работать только с теми, что содержат строку «Result».
find . -type f -name '*.pgn' -print0 | xargs -0 -n1 -P4 awk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print white+black+draw, white, black, draw }'
Это может показаться правильным решением, но это выдает результаты о каждом файле в отдельности. Тогда как нам нужен общий, агрегированный результат. Корректная реализация агрегации концептуально очень похожа на MapReduce:
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 awk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | awk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'
Добавляя второй вызов awk, в итоге мы получаем желаемую агрегатную информацию об играх.
Это еще более увеличивает финальную скорость, сокращая время запуска до 18 секунд, что 174 раз быстрее Hadoop реализации.
Тем не менее, мы можем это еще немного ускорить, применяя mawk, который полностью заменяет gawk, при сохранении тех же опций запуска, но предоставляя лучшую производительность.
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 mawk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | mawk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'
Таким образом, конвейер с применением перенаправления вывода между процессами find | xargs mawk | mawk позволил выполнить обработку за 12 секунд, что составляет 270 МБ/сек, и что более чем в 235 раз быстрее реализации на Hadoop.
Заключение
Надеюсь, нам удалось аргументировать несколько наших идей относительно порочной практики по использованию инструментов Hadoop для обработки не очень большого набора данных, вместо обработки их же на одном компьютере с применением обычных shell команд и инструментов. Если у вас есть реально гигантский массив данных или действительно требуются распределенные вычисления, то, конечно же, может потребоваться использование [тяжелых] инструментов Hadoop, но чаще мы видим ситуацию, когда Hadoop используют вместо традиционных реляционных баз данных, или там, где другие решения будут и быстрее и дешевле, и легче в поддержке.