Apache Spark: что там под капотом?
Вступление В последнее время проект Apache Spark привлекает к себе огромное внимание, про него написано большое количество маленьких практических статей, он стал частью Hadoop 2.0. Плюс он быстро оброс дополнительными фреймворками, такими, как Spark Streaming, SparkML, Spark SQL, GraphX, а кроме этих «официальных» фреймворков появилось море проектов — различные коннекторы, алгоритмы, библиотеки и так далее. Достаточно быстро и уверенно разобраться в этом зоопарке при отсутсвие серьезной документации, особенно учитывая факт того, что Spark содержит всякие базовые кусочки других проектов Беркли (например BlinkDB) — дело непростое. Поэтому решил написать эту статью, чтобы немножко облегчить жизнь занятым людям.Небольшая предыстория: Spark — проект лаборатории UC Berkeley, который зародился примерно в 2009 г. Основатели Спарка — известные ученые из области баз данных, и по философии своей Spark в каком-то роде ответ на MapReduce. Сейчас Spark находится под «крышей» Apache, но идеологи и основные разработчики — те же люди.Spoiler: Spark в 2-х словах Spark можно описать одной фразой так — это внутренности движка массивно-параллельной СУБД. То есть Spark не продвигает свое хранилище, а живет сверх других (HDFS — распределенная файловая система Hadoop File System, HBase, JDBC, Cassandra, …). Правда стоит сразу отметить проект IndexedRDD — key/value хранилище для Spark, которое наверное скоро будет интегрировано в проект.Также Spark не заботится о транзакциях, но в остальном это именно движок MPP DBMS.RDD — основная концепция Spark Ключ к пониманию Spark — это RDD: Resilient Distributed Dataset. По сути это надежная распределенная таблица (на самом деле RDD содержит произвольную коллекцию, но удобнее всего работать с кортежами, как в реляционной таблице). RDD может быть полностью виртуальной и просто знать, как она породилась, чтобы, например, в случае сбоя узла, восстановиться. А может быть и материализована — распределенно, в памяти или на диске (или в памяти с вытеснением на диск). Также, внутри, RDD разбита на партиции — это минимальный объем RDD, который будет обработан каждым рабочим узлом.Все интересное, что происходит в Spark, происходит через операции над RDD. То есть обычно приложения для Spark выглядит так — создаем RDD (например достаем данные из HDFS), мусолим его (map, reduce, join, groupBy, aggregate, reduce, …), что-нибудь делаем с результатом — например кидаем обратно в HDFS.
Ну и уже исходя из этого понимания следует Spark рассматривать как параллельную среду для сложных аналитических банч заданий, где есть мастер, который координирует задание, и куча рабочих узлов, которые участвуют в выполнении.
Давайте рассмотрим такое простое приложение в деталях (напишем его на Scala — вот и повод изучить этот модный язык):
Пример Spark приложения (не все включено, например include) Мы отдельно разберем, что происходит на каждом шаге. def main (args: Array[String]){
// Инициализация, не особо интересно val conf = new SparkConf ().setAppName (appName).setMaster (master) val sc = new SparkContext (conf)
// Прочитаем данные из HDFS, сразу получив RDD val myRDD = sc.textFile («hdfs://mydata.txt»)
// Из текстового файла мы получаем строки. Не слишком интересные данные. // Мы из этих строк сделаем кортежи, где первый элемент (сделаем его потом // ключем) — первое «слово» строки val afterSplitRDD = myRDD.map (x => (x.split (» »)(0), x))
// Сделаем группировку по ключу: ключ — первый элемент кортежа val groupByRDD = afterSplitRDD.groupByKey (x=>x._1)
// Посчитаем кол-во элементов в каждой группе val resultRDD = groupByRDD.map (x => (x._1, x._2.length))
// Теперь можно записать результат обратно на HDFS resultRDD.saveAsTextFile («hdfs://myoutput.txt») } А что же там происходит? Теперь пробежимся по этой программе и посмотрим что происходит.Ну во-первых программа запускается на мастере кластера, и прежде чем пойдет какая-нибудь параллельная обработка данные есть возможность что-то поделать спокойно в одном потоке. Далее — как уже наверное заметно — каждая операция над RDD создает другой RDD (кроме saveAsTextFile). При этом RDD все создаются лениво, только когда мы просим или записать в файл, или например выгрузить в память на мастер — начинается выполнение. По есть выполнение происходит как в плане запроса, конвеером, где элемент конвеера — это партиция.
Что происходит с самой первой RDD, которую мы сделали из файла HDFS? Spark хорошо синтегрирован с Hadoop, поэтому на каждом рабочем узле будет закачиваться свое подмножество данных, и закачиваться будет по партициям (которые в случае HDFS совпадают с блоками). То есть все узлы закачали первый блок, и пошло выполнение дальше по плану.
После чтения с диска у нас map — он выполняется тривиально на каждом рабочем узле.
Дальше идет groupBy. Это уже не простая конвеерная операция, а настоящая распределенная группировка. По хорошему, лучше этот оператор избегать, так как пока он реализован не слишком умно — плохо отслеживает локальность данных и по производительности будет сравним с распределенной сортировкой. Ну это уже информация к размышлению.
Давайте задумаемся о состоянии дел в момент выполнения groupBy. Все RDD до этого были конвеерными, то есть они ничего нигде не сохраняли. В случае сбоя, они опять бы вытащили недостающие данные из HDFS и пропустили через конвеер. Но groupBy нарушает конвеерность и в результате мы получим закэшированный RDD. В случае потери теперь мы вынуждены будем переделать все RDD до groupBy полностью.
Чтобы избежать ситуации, когда из-за сбоев в сложном приложении для Spark приходится пересчитывать весь конвеер, Spark позволяет пользователю контролировать кэширование оператором persist. Он умеет кэшировать в память (в этом случае идет пересчет при потере данных в памяти — она может случится при переполнении кэша), на диск (не всегда достаточно быстро), или в память с выбросом на диск в случае переполнения кэша.
После, у нас опять map и запись в HDFS.
Ну вот, теперь более менее понятно что происходит внутри Spark на простом уровне.
А как же подробности? Например хочется знать как именно работает операция groupBy. Или операция reduceByKey, и почему она намного эфективнее, чем groupBy. Или как работает join и leftOuterJoin. К сожалению большинство подробностей пока легче всего узнать только из исходников Spark или задав вопрос на их mailing list (кстати, рекомендую подписаться на него, если будете что-то серьезное или нестандартное делать на Spark).Еще хуже с понимаем, что творится в различных коннекторах к Spark. И насколько ими вообще можно пользоваться. Например нам на время пришлось отказаться от идеи интегрироваться с Cassandra из-за их непонятной поддержки коннектора к Spark. Но надежда есть что документация качественная в скором будущем появится.
А что у нас интересного есть сверху Spark? SparkSQL: SQL движок сверху Spark. Как мы видели уже, в Sparke уже практически все для этого есть, кроме хранилища, индексов и своей статистики. Это серьезно затрудняет оптимизацию, но команда SparkSQL утверждает, что они пилят свой новый фреймворк оптимизации, а также AMP LAB (лаборатория, откуда вырос Spark) не собирается отказывать и от проекта Shark — полное замещение Apache HIVE Spark MLib: Это по сути замещение Apache Mahaout, только намного серьезнее. Помимо эффективного параллельного машинного обучения (не только средствами RDD, но и дополнительными примитивами) SparkML еще намного качественнее работает с локальными данными, используя пакет нативной линейной алгебры Breeze, который притянет к вам в кластер Фортрановский код. Ну и очень хорошо продуманный API. Простой пример: параллельно обучаемся на кластере с кросс-валидацией. BlinkDB: Очень интересный проект — неточные SQL запросы сверху больших объемов данных. Хотим подсчитать average по какому-нибудь полю, но хочется сделать это не дольше чем за 5 секунд (потеряв в точности) — пожалуйста. Хотим результат с погрешностью не больше заданной — тоже годится. Кстати куски этой BlinkDB можно встретить внутри Spark (это можно рассматривать как отдельный квест). Ну и много-много всего сейчас пишется сверху Spark, я только самые интересные с моей точки зрения проекты перечислил