Apache Spark в «боевых» проектах — опыт выживания
Предлагаем вашему вниманию материалы по мотивам выступления Александра Сербула на конференции BigData Conference. Я, как автор и докладчик, текст немного отредактировал и добавил современных мыслей и актуальных проблем, поэтому надеюсь пост принесет вам как дополнительные практические полезные знания в отрасли, так и пищу для размышлений — куда податься со своими знаниями. Итак — в бой!
В моем понимании, BigData — это нечто безумное, т.е. термин есть, а извилин внутри него — нет. С одной стороны, администраторы ломают голову, куда складывать значительный объем информации. С другой стороны, это высоконагруженная разработка. Необходимо создавать кластер, искать хардкорных разработчиков, очень опытных. Кроме того, необходимо использовать высшую математику: дифференциальные вычисления, линейную алгебру, теорию вероятности, машинное обучение, графы и обучение на них, логистическую регрессию, линейный дискриминантный анализ и так далее. Как выжить в этой ситуации, какие инструменты взять, какие еще сырые (к сожалению, большинство), как принести фирме деньги. Ведь это основная цель использования BigData, а всё остальное — популизм.
MapReduce — это парадигма свертки функций для выполнения больших и сложных запросов по данным, предложенная Google. Она позволяет параллельно обрабатывать информацию в духе data parallelizm. В идеале, нужно все алгоритмы, используемые для работы с большими и не только большими данными, переделать на MapReduce. Но никто не делает это бесплатно ;-) В старой книжке, у бабушки на полке в паутине вы довольно быстро найдете хороший алгоритм кластеризации K-means и он будет надежный работать. Но вдруг, перед релизом, или когда данных окажется больше, чем вы ожидали, вы обнаружите, что этот алгоритм не работает в «параллельном режиме», не работает через MapReduce — им можно загрузить только одно ядро процессора. Поэтому вам нужно будет экстренно и заново изобрести еще один алгоритм, который умеет работать параллельно, и придумать, как он должен работать в парадигме MapReduce. А это удается далеко не всем — это удел Computer Science (на самом деле иногда удается без PhD знаний переделать алгоритм на MapReduce алгоритм методом крепкого кофе и доски с маркерами).
Мы в своих проектах начали с использования алгоритмов на платформе Hadoop MapReduce, но затем перешли на Spark, потому что он оказался более разумно и практично устроенным. Классический Hadoop MapReduce работает очень просто: выполнил задание, положил результат в файл. Взял, выполнил, положил. Spark — берет, выполняет все задания, а затем выгружает результат. На мой взгляд и не только мой, Hadoop MapReduce, как не крути — устаревший конгломерат, который постоянно и конвульсионно пытается измениться, из-за чего разработчикам и сисадминам постоянно приходится переучиваться, а бизнесу — играть в русскую рулетку с «сырыми технологиями». Но… выбора у нас почти нет (да, мы смотрели Apache Storm —, но он совсем из другой области: task parallel computng).
Альтернатив Apache Spark, если честно, пока не видно. Это и самый активный open-source проект в инфраструктуре Apache, это и объект для подражания — посмотрите хотя бы на Prajna от Microsoft.
Можно кинуть в ответ Apache Tez или отыскать что-нибудь мелкое в зоопарке Apache — но, поверьте, для снижения рисков лучше использовать mainstream-технологии, которые развиваются в ногу с рынком.
Где-то рядом, не совсем из этой области, но из интересного и если очень хочется — посмотрите также на Apache Giraph и TensorFlow. И задайте вопрос: это TaskParallel или DataParallel технологии — и все станет понятно.
Мы используем технологии параллельной обработки данных примерно так. На MySQL-серверах крутятся сотни тысяч баз данных клиентов-компаний, со штатом от единиц до тысяч сотрудников. Spark используется в основном в сервисе персональных рекомендаций, о котором рассказывалось в одной из прошлых публикаций.
Cобираем события — просмотры заказов, добавления в корзину, оплаты заказов, — обрабатываем, кладем в Amazon Kinesis, обрабатываем worker’ами, сохраняем в NoSQL (DynamoDB) и, наконец, отправляем в Spark для генерации моделей и аналитики.
На первый взгляд, схема выглядит излишне усложненной, но постепенно приходит понимание для чего всё это нужно. Это — минимум необходимых инструментов, которые нужны для выживания.
Полученные результаты мы выгружаем в Apache Mahout и на выходе получаем конкретные рекомендации для клиента. Общий объем событий, регистрируемых сервисом рекомендаций, достигает десятков миллионов в день.
Пока мы интенсивно используем и развиваем алгоритмы коллаборативной фильтрации, но видим примерно такую дорожную карту по развитию алгоритмов:
• Мультимодальность
• Content-based рекомендации
• Кластеризация
• Machine learning, deep learning
• Таргеттинг
Сейчас, как никогда раньше, ценится мультимодальность — т.е. использование нескольких, разных алгоритмов, для ответа на вопрос (в нашем случае — выдачи персональной рекомендации). Недостаточно просто запустить сервис на базе Apache Mahout, это не даст никакого выигрыша перед конкурентами. Коллаборативной фильтрацией сегодня уже никого не удивишь. Нужно учитывать и облако тегов пользователя, когда он ходил по магазину и получал какую-то информацию. Кластеризация позволяет более гибко организовать таргетирование информации. Здесь, конечно, не обойтись без machine learning. Deep learning — это, простыми словами, «качественное» машинное обучение, подразумевающее очень детальное изучение проблемы машиной и, часто, использование многослойной рекуррентной нейронной сети. При грамотном применении это помогает еще больше увеличивать конверсию, более эффективно работать с клиентами.
Сегодня на рынке существует множество программных сред, средств, инструментов и продуктов для разработки и анализа данных. Спасибо opensource (да, там полно сырых глюковатых вурдалаков, но есть и отличные решения)! С одной стороны, разнообразие — это несомненное благо. С другой — процесс разработки может стать довольно хаотичным и бестолковым. Например, сначала пробуют использовать Apache Pig, когда что-то не получается, обращаются к Apache Hive. Посмотрели, поигрались, начинают писать на Spark SQL. Когда запросы начинают падать — кидаются к Impala (а там все еще хуже). Под угрозой суицида некоторые проклинают мир BigData и возвращаются к старым добрым РСУБД. Такое иногда впечатление, что создана куча игрушек для «специалистов», часто такими же «специалистами». А бизнес не понимает всех этих исканий, ему нужно зарабатывать деньги, он требует конкретику в срок.
Сегодня, пожалуй, лишь Apache Hive можно считать классическим и надежным инструментом для работы с SQL-запросами по распределенным данным, как и HDFS является классикой среди кластерных файловых систем (да, есть конечно еще GusterFS, CEPH). Тем не менее, многие переходят на Spark SQL, потому что этот фреймворк написан (как хочется верить) с учетом интересов бизнеса. Также, все более активно используются HBase, Casandra, Mahout, Spark MLLib. Однако требовать, чтобы каждый разработчик и/или сисадмин свободно ориентировался во всех этих инструментах — глупо. Это профанация. Технологии — глубокие, с кучей настроек и каждая требует глубокого погружения на месяца. Не спешите — из-за гонки за количеством неизбежно будет страдать качество.
В первую очередь хочу порекомендовать всем, кто работает или намеревается начать работать с параллельными алгоритмами и MapReduce, прочитать книгу «Mining of Massive Datasets», находящуюся в открытом доступе. Ее надо прочитать несколько раз, с блокнотиком и карандашом, иначе каши в голове не избежать. Сначала ничего не будет скорее всего понятно (мне стало открываться раза с 3). Но это одна из базовых и, что важно, доступных для инженеров не обладающих черным поясом по математике, книга по алгоритмам работы с большими данными. В частности, глава 2.3.6 посвящена реляционной алгебре и способам проецирования ее операций на MapReduce. Очень полезный материал, по сути, здесь представлены готовые советы для разработчиков, достаточно их только внимательно реализовать.
Читая литературу, полную математических деталей, вспоминайте анекдот и улыбайтесь :-)
Двое летят на воздушном шаре, попали в туман, заблудились. Вдруг их прижимает к земле, и они видят человека. Один из них кричит вниз: "Где мы?". Человек, подумав, отвечает: "Вы на воздушном шаре...". Очередным порывом ветра шар уносится ввысь. - Он идиот? - Нет, математик. - ?????? - Во-первых, он подумал, прежде чем ответить. Во-вторых, дал абсолютно точный ответ. А в-третьих, этот ответ совершенно никому не нужен.
• DAG (directed acyclic graph) vs Hadoop MapReduce vs Hadoop Streaming. Можно написать большой SQL-запрос, представляющий собой несколько MapReduce-операций в цепочке, который будет корректно выполняться. Streaming реализован в Spark гораздо лучше, чем в Hadoop, им гораздо удобнее пользоваться и работает часто эффективнее, за счет кэширования данных в памяти.
• Spark Programming Guide + API. Документация весьма толковая и полезная.
• Можно программировать на Java. С++ считался сложным языком, но Scala гораздо … нет, не сложнее, скорее высокоумнее на мой взгляд. Scala — классный язык, несмотря на некую академическую протухшесть и неорганическую связь с живыми мертвецами с широко выпученными глазами типа Haskel. Я очень люблю Scala, но от него можно сойти с ума, а время компиляции оставляет желать лучшего себе и своим детям. Поэтому, при желании, к Spark можно подключиться и из Java, и из Python и из R.
• Удобная абстракция: Resilient Distributed Dataset (RDD). Прекрасная, просто божественная концепция из мира функционального программирования, позволяющая распараллеливать файлы огромного объема — на сотни гигабайт, или даже терабайты.
• Удобные коллекции: filter, map, flatMap. Удобный подход в Spark — коллекции и операции над ними: filter, map, flatMap. Откуда это взялось? Это пришло из функционального программирования, которое сейчас активно проповедуется Scala (и не только в нем).
Исторически сложилось так, что мы пишем в Spark на Java 7, а не на Java 8. К сожалению, в «семёрке» нет нормальной поддержки функциональных объектов, поэтому мы вынуждены заниматься садомазохизмом и создавать объекты типа PairFunction, Tuple2, TupleN. В общем готовьтесь — когда Java7 интегрируется со Scala, то получается жутко нечитаемый код. Я, конечно, немного утрирую, но в нем все перемешано и хочется надеть очки с 13 окулярами.
JavaRDD combinedRowsOrdered = combinedRows.mapToPair(new PairFunction() {…
public Tuple2 call( String row ) {
…return new Tuple2…
Если вы не хотите лезть в дебри Scala, то лучше используйте Java 8. Код получается более читаемый и короче.
Название Scala произошло от английского scalable. Это язык для разработки больших компонентных приложений, созданный учеными-математиками. У меня лично складывается впечатление, что Scala — это тренд. Тренд оживления функционального программирования (снова привет Haskel, F#, …). Как то «вдруг» оказалось (хотя ученые об этом догадывались гораздо раньше), что обрабатывать массивы данных в парадигме Data-Parallel — удобнее в функциональном стиле, вау! :-) Spark активно использует и Scala Actors (здравствуй Erlang). Они позволяют писать простой, читаемый, однопоточный код, который выполняется на большом количестве серверов. Вы избавляетесь от рисков многопоточного программирования, которым вынуждены заниматься в Java — это сложно, долго и дорого (зато круто). Кроме того, из-за сложности многопоточного программирования возникает немало ошибок. А благодаря акторам жизнь «вдруг» упрощается.
Для развертывания в Amazon Spark предлагает нам скрипт под названием Spark-eс2. Он скачивает половину репозитория Беркли, что-то творит на Ruby под капотом (здравствуй Япония) и устанавливает кучу какого-то софта. Получившаяся система довольно хрупка, чувствительна к изменениям конфигурации машин. Также есть нарекания на логирование и обновление компонентов системы.
Какое-то время мы существовали со скриптом Spark-ec2, но оказалось, что лучше самостоятельно написать инсталлятор Spark. Кроме того, инсталлятор сможет учитывать возможность подключения новых spot-машин.
Всё это для нас болезненно, поскольку у нас нет большого штата сисадминов — мы больше программисты Если бы у нас было 30 сисадминов, я бы сказал: «Ребята, я буду программировать на Scala, а вы тут, пожалуйста, не спите ночами и занимайтесь кластерами Spark». Куда более привлекательным вариантом оказалась связка из Spark и Elastic MapReduce. Также коллеги хвалят решения со Spark от Cloudera и HortonWorks — может они вам тоже окажутся полезными.
Амазон предлагает нам не терять время и развернуть кластер Spark у них с использованием сервиса ElasticMapReduce. Тут почти все будет работать из коробки. Spark интегрирован в Yarn-кластер, есть куча софта, есть подглючивающий мониторинг, можно добавлять машины, масштабировать HDFS, менять размер кластера, увеличивать и уменьшать количество задач за счет spot-машин. Spot-машины в Amazon стоят в 5–10 раз дешевле. Мы их используем всегда, потому что это удобно, дешево и быстро.
Spark в EMR профессионально интегрирован с S3. Это правильно, ведь именно там вы скорее всего будете хранить файлы в Amazon. Мы сравнивали хранение больших файлов в S3 и HDFS, и оказалось, что скорость доступа примерно одинаковая. Тогда зачем связываться с HDFS, мучиться с кластерной файловой системой, если есть готовый сервис. Также в Elastic MapReduce к потрохам Sparlk/Hadoop можно прокинуть через ssh-туннелинг веб-админки и привыкнуть к ним (хотя я так и не привык).
Получается немного дороже, чем обычные машины, разница около 10%. При этом вы получаете «в комплекте» много готового, правда немного глючащего софта (Hue глючит больше всех) и возможность масштабировать кластер. При этом вам даже админ не нужен — вы, как разработчики, там цари и боги.
Типы машин
Машины здесь бывают трех типов:
• Master-машины, которые контролируют вообще весь кластер. На них установлен Spark Master.
• Core-машины, на которых развернута файловая система — HDFS. Их может быть несколько штук. Правда, рекомендуется только увеличивать количество core-машин, а не уменьшать, иначе теряются данные.
• Для всего остального используются task-машины. Это обычные Spark-серверы, на которых работают воркеры. Количество spot-машин можно свободно менять, создавая парк хоть из сотен машин.
Софт
• Spark. В предыдущих версиях образов Amazon пока не поддерживается spark.dynamicAllocation.enabled, так что вы должны сами говорить ему, сколько нужно машин для выполнения задачи. Если кластер частично простаивает, то Spark не займет оставшиеся машины для выполнения. Вы должны жестко прописать, сколько машин ему нужно. Это неудобно. Но начиная с AMI 4 эта функция уже работает.
• Hadoop/Yarn/HDFS. В Yarn-кластерах, как и в Oracle, используется множество настроек, и, по хорошему, нужен админ, который в этом очень хорошо разбирается. Но, несмотря на боль, Hadoop-кластеры уверенно решают свои задачи. Больше всего мне не нравится в Yarn и Hadoop то, что там бардак с логированием. В логи пишется абсолютно всё, настройки уровней логирования раскиданы по разным частям кластерных потрохов и потому их количество очень быстро разрастается. И нормального решения этой проблемы нет. Неплохо бы поучиться у старых добрых обитателей unix — например у mysql, apache.
• Ganglia. Это time-series софт, который строит графики по различным метрикам: нагрузка, количество задач и т.д. Помогает получить представление о состоянии кластера. Удобная вещь, но есть недостатки — «убитые» спот машины продолжают висеть и загромождать графики.
• Hive. Это поддержка команд SQL, которая работает на файлах в HDFS и S3. Неплохой инструмент, но иногда его возможностей не хватает. Используем. Но когда нужно больше — заходим в Spark и напрямую решаем задачи по реляционной алгебре.
• Pig. Мы его не используем, поэтому дать какую-то оценку затрудняюсь.
• Hbase. Вариант NoSQL, пока не используем.
• Impala. Очень интересная вещь, про которую можно написать отдельный пост.Пока производит впечатление сырого софта. Так что используйте на свой страх и риск.
• Hue. Это админка к «бигдате», написанная на Python. Ее GUI позволяет объединить вместе и Impala, и Hbase, и Pig, и Hive. То есть можно сделать свой уголок аналитика в вебе:-) Я им попользовался неделю, он стал глючить, зависать, потом открываться перестал вообще — в общем, недоделанный софт
Падения по памяти
Что такое Map? Мы берем что-то, раскидываем на ключи, а их уже раскидываем по кластеру. Тут ничего не должно упасть — алгоритмически.
Что такое Reduce? Когда в один worker собираются сгруппированные по одному ключу данные. Если хорошо запрограммировать, то можно порционно передавать в reducer все значения внутри одного ключа и ничего не будет падать. Но на практике оказалось, что Spark может упасть в разных точках — то буфера не хватило для сериализации, то воркеру памяти не хватило. И, на мой взгляд, это основная проблема. Но всё же можно аккуратно пристроиться. У нас Spark сейчас не падает, хотя добились мы этого с помощью магии.
Обязательно нужно поставить разумное Executor Memory: --executor-memory 20G, --conf spark.kryoserializer.buffer.mb=256, --conf spark.default.parallelism=128, --conf spark.storage.memoryFraction=0.1
KryoSerializer позволяет сжимать объекты (spark.serializer org.apache.spark.serializer.KryoSerializer). Без этого они потребляют гораздо больше памяти. Также не рекомендую уменьшать значение константы spark.default.parallelism=128, иначе может часто падать по памяти. Что касается memoryFraction, то мы не используем кэширование.
Выгрузка результатов
Допустим, вам нужно выгрузить из Spark данные в модель. Если объем велик, то это будет выполняться очень долго.
• Благодаря --driver-memory 10G вы понимаете, что выгружаете с драйвера.
• При использовании Colleсt () весь результат собирается в память в драйвере и он может упасть. Поэтому рекомендую использовать toLocalIterator (). Увы, его производительность очень очень низка. Поэтому нам пришлось написать код для сборки партиций. Кому интересно, расскажу подробнее.
Логирование
Этот код — единственное, что помогло нам справиться с проблемой логирования:
export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark"
export SPARK_WORKER_DIR="/mnt/spark_worker"
export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=1800 -Dspark.worker.cleanup.appDataTtl=259200"
#Worker executor stdout/stderr logs
spark.executor.logs.rolling.maxRetainedFiles 2
spark.executor.logs.rolling.strategy time
spark.executor.logs.rolling.time.interval daily
Надеюсь было и полезно и интересно. Все больше и активнее в нашей жизни рулят параллельные алгоритмы на MapReduce. Их мало, их ищут, но иного выхода похоже нет (ну может что-то получится посчитать быстрее на Apache Giraph и TensorFlow и через парадигму Task-Parallel). Платформа, ставшая классикой — Hadoop MapReduce, уступает место функционально написанной и на современном языке и математиками платформе Apache Spark. Скорее всего вы будете вынуждены начать разбираться, хотя бы на уровне терминов, в обитателях Hadoop-зоопарка: Hive, Pig, HDFS, Presto, Impala. Но постоянно учиться — наше все и чтобы опережать конкурентов нужно знать больше, писать быстрее и думать — ярче. Всем удачи и с наступающим Новым Годом!