Apache Spark: оптимизация производительности на реальных примерах

526e7c75eeae84df49a845b10e4fb2ab.jpg

Apache Spark — фреймворк для обработки больших данных, который давно уже стал одним из самых популярных и часто встречаемых во всевозможных проектах, связанных с Big Data. Он удачно сочетает в себе скорость работы и простоту выражения своих мыслей разработчиком.

Разработчик работает с данными на достаточно высоком уровне и, кажется, что нет ничего сложного в том, чтобы, например, соединить два набора данных, написав всего одну строку кода:

 ordersDF.join(customersDF, 
               ordersDF ["customer_id"] == customersDF["id"], 
               "left_outer")

Но только задумайтесь: что происходит в кластере при соединении двух наборов данных, которые могут и не находится целиком на каком-либо из узлов кластера? Обычно Spark со всем справляется быстро, но иногда, а особенно, если данных действительно много, необходимо все-таки понимать — что происходит уровнем ниже и использовать это знание, чтобы помочь Spark работать в полную силу.

Сегодня мы поговорим о том, как сделать так, чтобы ваше приложение работало быстро и использовало все ресурсы, которые вы запросили для него. В этой статье рассмотрим в основном модуль Spark SQL, запуск приложения Spark в кластере с Yarn со статическим выделением ресурсов. Но общие идеи можно применять и с другими начальными данными. Мы рассматриваем здесь Spark 2.3/2.4, чтобы лучше понять все нововведения Spark 3.X с его Adaptive Query Execution (AQE) (хотя некоторые функции уже присутствуют и в Spark 2.4) и почему они так важны.

Данные и где они обитают

Начнем с абстракции, которую нам предоставляет Spark для работы с данными — это RDD (Resilient Distributed Dataset). Для цели нашей статьи не важно, что мы работаем с DataFrame или DataSet.

RDD (Resilient Distributed Dataset)RDD (Resilient Distributed Dataset)

Таким образом, для разработчиков набор данных представлен в виде единого объекта, а обрабатывается он порциями (блоками) отдельно в каком-то потоке на каком-то исполнителе (executor) в кластере. Блок является минимальной единицей обработки, исполнитель получает блок и инструкцию, которая сообщает ему о том, что необходимо сделать с этим блоком данных.

Как работает приложение Spark в кластере

На высоком уровне каждое приложение Spark в момент своей работы состоит из драйвера — программы, которая исполняет функцию main () и исполнителей, которые работают на узлах кластера. Исполнители — универсальные солдаты, они получают порцию данных (блок) и инструкцию, исполняют ее и докладывают драйверу о завершении, чтобы получить следующую инструкцию. В каждом исполнителе может быть запущено более одного потока обработки, и в этом случае каждый поток обрабатывает свой блок данных независимо от других. Таким образом, если мы при запуске нашего приложения заказали у менеджера кластера пять исполнителей по четыре ядра (потока), то в каждый момент времени мы располагаем 5×4 = 20 потоками и в лучшем случае можем обрабатывать 20 блоков данных одновременно.

Итак, каждая задача получает для выполнения:

  • num_executors — к-во отдельных процессов JVM, в которых будут запущена потоки обработки данных (они могут быть расположены как на одной узле кластера, так и на разных). Процессы будут работать до конца работы приложения;

  • executor_cores — количество параллельных потоков, выполняемых в каждом executor. Один поток в каждый момент времени обрабатывает один блок данных.

Схема работы приложения SparkСхема работы приложения Spark

В Spark History (Web сервер для отображения логов выполнения приложений Spark в удобном виде) это выглядит так:

Spark History: StageSpark History: Stage

Мы здесь видим два исполнителя, в каждом из которых работает по четыре потока обработки.

Перетасовка (Shuffle)

Итак, мы разобрались в том, что у нас есть N блоков данных и P потоков (работников), которые эти блоки данных могут перерабатывать параллельно.

И все у нас было бы хорошо, если бы эти блоки жили до конца приложения, но почти в любом приложении будет обработка, которая влечет за собой полную перетасовку наших блоков. Это, например, соединение двух таблиц по ключу (JOIN), группировка по ключу (GROUP BY). В этом случае работает всем хорошо известный паттерн MapReduce, при котором происходит перераспределение данных всего набора по ключу на новые блоки данных, так, чтобы строки с одним и тем же ключом находились только в одном блоке. Этот процесс в Spark называется Shuffle. Почему я написал его с большой буквы? Потому что это очень сложный и дорогостоящий процесс, при котором увеличивается потребление памяти на исполнителях, потребление дисковой памяти на узлах кластера и сетевой обмен между узлами кластера. Очень напоминает превращение гусеницы в бабочку — все разваливается на куски, чтобы потом собраться в новом обличии, и так же энергозатратно.

Задание делится на этапы

В Spark обработка блоков от одной перетасовки (Shuffle) до другой называется этапом (Stage). Заметим, что до перетасовки все блоки обрабатываются параллельно, после перетасовки они тоже обрабатываются параллельно, но новый этап не начнется пока этот процесс не пройдут все блоки в конце предыдущего этапа. Таким образом, граница между этапами — это место ожидания при параллельной обработке блоков. Заметим также, что в рамках одного этапа все задачи (task) над одним блоком происходят последовательно в рамках одного потока. То есть блок никуда не передается по сети, но все блоки обрабатываются параллельно. Получается, что количество блоков в границах этапа неизменно.

Задание делится на этапыЗадание делится на этапы

Мы пришли к следующей картине: все задания делятся на этапы, а в рамках каждого этапа количество блоков неизменно и равно …. И вот здесь начинается самое интересное. Количество работников у нас известно (P = executors*cores), а вот сколько блоков будет на каждом этапе — это вопрос, от которого напрямую зависит производительность нашего приложения. Ведь если блоков много, а исполнителей мало, то каждый исполнитель будет обрабатывать последовательно несколько блоков и наоборот: если блоков мало, а исполнителей больше, то некоторые исполнители будут простаивать, когда остальные трудятся не покладая рук. Самое интересное здесь — то, что, когда приложение работает медленно, пытаются выдать ему больше исполнителей, но производительность в этом случае не увеличивается.

Давайте начнем выяснять объем работы по этапам. Здесь и далее для простоты картины будем рассматривать блоки только одного набора данных. В каждый момент времени исполнители могут обрабатывать несколько несвязанных друг с другом этапов. Например перед JOIN два набора данных будут обрабатываться независимо друг от друга и, таким образом, делить исполнителей между собой. В этом случае количество блоков обработки будет их суммой. Но для наших целей необходимо понять — что происходит с одним набором данных. На первом этапе все будет зависеть от того, откуда произошел ваш набор данных. Например, если вы читаете директорию файлов parquet из HDFS, то количество блоков на первом этапе в общем случае будет равно (количеству блоков HDFS, из которых состоят все файлы .parquet из загружаемой директории). То есть в этом случае каждый блок HDFS будет представлять собой отдельный блок данных для обработки. Но не забываем, что такое распределение блоков будет сохраняться до конца этапа. Вот отличный пример.

Мы читаем небольшой файл из HDFS, в котором 150 тысяч записей. Весь файл помещается в один блок HDFS. Таким образом, на первом этапе у нас всего один блок данных, поэтому работать с ним сможет только один исполнитель. Но по логике работы трансформации в каждой строке находится поле duration (количество секунд просмотра), и нам необходимо размножить каждую строку на выходе на столько строк, сколько секунд смотрения у нас в этой строке.

viDF = spark.read.parquet("/tst/vi/")
viDF.createOrReplaceTempView("ViewingInterval")
spark.sql("""select t.*, 
                    explode(get_list_of_seconds(duatation)) as secondNumber 
               from ViewingInterval""")

Трансформация на тестовых данных работает не быстро.  Глядя в Spark History, видим:

На первом этапе один блок данныхНа первом этапе один блок данных

Tasks = 1 значит что на этом этапе — всего одна задача, так как блок данных только один. Видим на входе 2 Мб данных, а на выходе уже развернутый набор 1 Гб данных. И все это делает один поток, остальные простаивают, так как на этом этапе больше нет задач. Как же нам поступить, ведь explode — узкая зависимость и по этой причине не прерывает этап, а выполняется на том же этапе, на котором происходит чтение данных. В рамках этапа, как мы уже знаем, количество блоков неизменно. В этом случае нам можно легко (так как набор данных на входе маленький, и перетасовка пройдет быстро) разорвать этот этап на два, используя функцию repartition(N), которая приводит к перетасовке в случайном порядке, создавая на выходе N блоков данных, примерно равных по размеру. А так как она производит перетасовку (Shuffle), значит, начинается новый этап.

viDF = spark.read.parquet("/tst/vi/")
viDF.repartition(60).createOrReplaceTempView("ViewingInterval")
spark.sql("""select t.*, 
                    explode(get_list_of_seconds(duatation)) as secondNumber 
               from ViewingInterval""")

Смотрим Spark History:

Теперь обработка идет параллельноТеперь обработка идет параллельно

На втором этапе — на который теперь попал explode после repartition, у нас 60 заданий (блоков данных) и все исполнители теперь работают, а не простаивают. Время работы трансформации сократилось почти вдвое. Наша задача заключается в том, чтобы не было простоя, и все исполнители работали, иначе зачем мы отбираем у кластера ресурсы, которые потом не используем.

С первым этапом разобрались и даже узнали — как разорвать любой этап на два с помощью repartition(N). Разберемся с внутренними этапами, которые находятся между двумя перетасовками. Здесь все решает параметр spark spark.sql.shuffle.partitions (default 200). Точнее решал, так как с введением AQE Spark научился сам регулировать это количество. Итак, любой внутренний этап будет состоять из spark.sql.shuffle.partitions блоков данных. Но здесь тоже не все так гладко: если у вас данных не много, то необходимо уменьшать этот параметр, а если много — увеличивать. И в случае с Spark 2.3 вам необходимо искать какую-то золотую середину, в зависимости от ваших данных.

Приведу пример, когда у нас данных мало, а spark.sql.shuffle.partitions = 200 по умолчанию. Вырожденный случай и не используем broadcast. Глядя на Spark History, видим, что наш набор данных состоит всего из 185 строк, и был поделен при перетасовке на 200 блоков (но тут и на 200 блоков не хватит). Заметим, что реальная полезная работа исполнителя окрашена здесь в зеленый цвет. То есть получается, что из всего времени работы исполнителя на обработку одного блока данных из одной записи полезное время составило <10%. Остальное время – это ожидание и десериализация.

image-loader.svgimage-loader.svg

Что же у нас происходит на последнем этапе? Это опять зависит от того, куда мы выводим данные нашей трансформации. Например, мы хотим записать все в директорию в виде parquet файлов. Если мы сделаем это после перетасовки, ничего не предприняв, то обнаружим 200 файлов в этой директории после выполнения нашей программы. Почему? Потому что после перетасовки у нас получилось по умолчанию spark.sql.shuffle.partitions = 200 блоков, а так как один блок обрабатывается одним потоком, то и записывать его он будет сам в отдельный файл.

Обычно в этом месте разработчики хотят контролировать количество файлов в HDFS и при сохранении на DataFrame вызывают метод coalesce(N). Этот метод просто зачисляет каждый блок нашего набора в один из N новых блоков. То есть реально coalesce(), в отличие от repartition(), не приводит к перетасовке, а следовательно не разрывает этап, просто делает так, что на нашем этапе будет N блоков данных. Но к чему это приводит — к тому, что на этом этапе будет работа только для N исполнителей. Что, если мы решили сохранить все в один файл — будет работать только один поток. Вспомним рассуждения про первый этап и, если последний этап достаточно серьезный по вычислениям, то непосредственно перед сохранением вместо coalesce(N) имеет смысл сделать repartition(N), чтобы разбить последний этап на два: предпоследний, который будет выполнять тяжелые вычисления в spark.sql.shuffle.partitions потоков параллельно (если до этого был join, например) и последний, в котором непосредственно произойдет сохранение в нужное нам количество файлов (N) уже без ресурсоемких вычислений. Здесь необходимо подумать, что будет быстрее — оставить все как есть или, добавив repartition(N), произвести перетасовку, что тоже не бесплатно, но потенциально возможно распараллелить сложные вычисления.

dataDF.repartition(1) \
.write \
.format("parquet") \
.mode("overwrite") \
.option("compression","snappy") \
.save("/tst/problem_4/result")

Теперь, когда мы разобрались с отношениями между количеством блоков на этапе и количеством исполнителей, приведу небольшой пример. На входном этапе у нас 20 блоков данных, а исполнителей всего 10 (5 executors * 2 cores). Видим, что почти каждый исполнитель после обработки одного блока вынужден взять на обработку еще один блок, так как в среднем на одного исполнителя приходится два блока данных, нуждающихся в обработке. Но, вспомнив, что все блоки данных на одном этапе можно обработать параллельно запросим 20 исполнителей для нашей задачи (5 executors * 4 cores), получим, что каждый исполнитель теперь обработает только один блок и время работы всего этапа в идеале сократится в два раза. Как раз тот случай когда увеличение ресурсов работает и увеличивает скорость, но до определенного момента — после порога в 20 потоков для этого случая увеличение ресурсов работать уже не будет.

Увеличили ресурсы - работает быстрееУвеличили ресурсы — работает быстрее

Кстати, один из интересных моментов применения метода разрыва последнего этапа при сохранении, описанного в предыдущем параграфе:

dataDF.repartition(N).write. …

Если сравнивать показатели перед и после разрыва последнего этапа, то все кажется отлично: время работы трансформации сократилось в несколько раз (так как последние вычисления выполнялись параллельно всеми исполнителями), исчез Shuffle Spill (это когда исполнителю не хватает памяти и он устраивает своеобразный swap с локальным диском. Конечно, в этом случае все данные пришли несколькими большими блоками и исполнители с трудом их переварили).

СТОП! Приглядимся к размеру файлов, полученных при сохранении. Было 5.9 Гб, стало 10.3 Гб, количество записей одно и то же, состав данных тот же. Почему? Вот и ложка дегтя!

Обратите внимание на размер OutputОбратите внимание на размер Output

Добавили только repartition().Мы уже узнали, что он распределяет данные случайным образом. То есть взамен частично упорядоченных данных по ключу после последней перетасовки (JOIN был в нашем случае) мы получаем данные, распределенные случайно. Вспоминаем, что parquet — колоночный формат хранения файлов, причем данные в нем сжимаются, извлекая пользу из того, что в колонке они могут быть частично упорядочены. Получается, мы привнесли случайность в распределение строк и таким образом ухудшили сжимаемость данных почти в два раза. Что с этим можно сделать? Можно вернуть порядок, но внутри каждого блока данных.

dataDF.repartition(20). sortWithinPartitions(asc("id")).write. …

Функция sortWithinPartitions() производит сортировку по полю или нескольким полям внутри каждого блока, т.е. никакой перетасовки не происходит все работает в рамках одного исполнителя в его памяти. После применения этой функции к нашей трансформации для сортировки по нескольким полям, общий размер файлов на выходе стал даже немного меньше, чем изначально. Теперь у нас все работает быстро, размер файлов на выходе нас устраивает. Кроме того, в этом случае мы записали в HDFS файлы примерно одинакового размера (это следствие repartition()), что может быть удобно для дальнейшей обработки.

О работе оптимизатора

Раз уж мы коснулись файла формата parquet, на нем и посмотрим, как работает оптимизатор Spark на примере такого правила оптимизатора, как predicate pushdown и projection pushdown.

В случае с projection pushdown колоночный форма parquet особенно выигрывает. Напомню, что реальное выполнение дерева запроса начинается только в момент, когда выполняется действие, то есть операция, которая выводит данные: передает в основную программу (драйвер) (collect, count,…), сохраняет в файл, передает в базу данных и т.д. При этом Spark строит дерево запроса и оптимизирует его. Таким образом, при построении запроса оптимизатор уже знает — какие поля необходимы, чтобы получить результат, и будет читать из файла только эти поля. Так как в колоночном формате файлов данные хранятся в разрезе колонок, то из файла прочитаются только эти поля.

Рассмотрим правило оптимизатора — спуск условия (predicate pushdown). Принцип этой оптимизации достаточно прост: данных у нас много и ни к чему их обрабатывать, если они в конце концов не пригодятся, например должны быть отфильтрованы в конце выполнения нашего дерева запроса. Все условия и фильтры оптимизатор старается по возможности спустить на уровни ниже — ближе к источникам данных, в идеале до непосредственного чтения файла (или, например, запроса к RDBMS).

Рассмотрим пример:

image-loader.svg

Вот физический план выполнения запроса, который при этом генерируется:

image-loader.svg

Обратим внимание на блок непосредственного чтения из файла (FileScan parquet) и блок PushedFilters — это те условия, которые будут накладываться при физическом чтении файла. Видим, что сюда попали три условия:

  • для ValueDate условия IsNotNull и LessThanOrEqual — с последним понятно, это отражено в нашем SQL. Откуда взялось IsNotNull? Ясно, что у нас в запросе стоит условие ValueDate <= константа и NULL значения не удовлетворяют этому условию, то есть логически все правильно. Но зачем оптимизатор для файла parquet вынес это условие отдельно?  Об этом — в следующем параграфе;

  • для SubjectID условие IsNotNull. Но у нас нет такого условия в запросе и вообще нет условия на SubjectID. Есть только LEFT JOIN по этому полю, где наша таблица присоединяется к основной. Да, точно:  при таком JOIN все строки, где SubjectID is NULL, не попадут в результирующую выборку. Мы видим, что оптимизатор это учитывает и в самом начале даже не читает такие строки из файла.

Давайте все-таки разберемся что же такого интересного в условии IsNotNull, что оптимизатор его добавляет отдельно. Для этого заглянем в структуру файла parquet. Для этого можно использовать parquet-tools. Все дело в том, что файл parquet наряду со схемой хранит так же некоторую статистику по полям в разрезе групп строк.

Файл parquet внутриФайл parquet внутри

Видим, что для всех целочисленных типов есть количество значений (Values), количество NULL в данном блоке (Null Values), а также Min и Max значения столбца в этой группе строк. Сразу вспоминаем наше условие на поле IsNotNull. То есть, если бы у поля SubjectID в этой группе было бы Values = Null Values, то мы могли бы сделать вывод, что все значения в данной группе строк являются NULL и не читать этот блок вовсе. То же самое относится и к условиям больше, меньше, равно — здесь можно использовать Min и Max значения столбца и делать вывод — нужно ли вообще читать эту группу строк.

Важно понимать, что условие можно спустить на уровни ниже только, если оно заранее известно перед началом выполнения запроса.

Пример из реальной жизни.

Директория файлов parquet партицирована по полю. В трансформацию передавалась строка со значениями фильтра по этому полю, разделенными запятой. Разработчик сделал explode(split(фильтр)), то есть маленькую таблицу из этой строки со значениями и со спокойной совестью сделал INNER JOIN с основной таблицей, которую нужно отфильтровать. Трансформация работала медленно. Посмотрим на план запроса:

Из HDFS читаются все партицииИз HDFS читаются все партиции

Странно, но на первом этапе Spark вычитывает все партиции (PartitionCount =121), хотя мы передаем фильтр, который состоит только из одного значения. Это как раз тот случай, когда при построении дерева запроса Spark не знает о фильтре вообще, ведь он скрыт за JOIN.

Вместо процедуры построения таблицы со значениям фильтра просто используем стандартную функцию Spark SQL — find_in_set (). Она ищет положение подстроки в строке, представляющей из себя список, разделенный запятыми.

То есть фильтр у нас теперь представляет простое выражение:

where find_in_set(surveyprogectid, <строка фильтра>) 

И если посмотреть на план выполнения запроса, так как при его построении оптимизатор уже знает строку фильтра и условие, он спускает это условие на уровень чтения из файла. Кроме того, зная, что это партицирующее поле, применяет правило partition pruning, то есть выкидывает из рассмотрения партиции, не удовлетворяющие фильтру.

Читается из HDFS только одна партицияЧитается из HDFS только одна партиция

Обратите внимание, что наше условие теперь находится в блоке PartitionFilters, так как поле партицирующее, из HDFS вычитывается только нужная нам партиция (PartitionCount = 1).

Поэтому, если у вас есть большая таблица с партицированием и вы отбираете некоторые партиции через JOIN, то возможно будет лучше отдельным действием сформировать список значений из этой таблицы фильтра в виде строки и передать его в виде константы в условие основного запроса.

Когда оптимизатор может навредить

Отличная работа оптимизатора… Но иногда его стремление спустить условие как можно ниже к источнику может навредить. На сцену выходит UDF (user defined function). Функция, определенная пользователем, это черный ящик для оптимизатора Spark.

Рассмотрим следующий пример:

Имеем большой файл с несколькими миллиардами строк. Мы хотим отобрать только уникальные id и применить к ним нашу UDF, далее отобрать только те результаты, которые будут Null. Последовательность запросов:

T1=> select distinct id from T
T2=> select UDF(ID) as newID from T1
T3=> select * from T2 where newID is null

Уникальных значений id в таблице всего несколько тысяч, а наша UDF работает не быстро — ходит в HBase. То есть мы, построив такое дерево запроса, рассчитываем, что наша UDF будет вызвана несколько тысяч раз. Запускаем запрос и долго-долго ждем.

Смотрим план выполнения запроса:

Условие с UDF спустилось почти на самы нижний уровеньУсловие с UDF спустилось почти на самы нижний уровень

… ой! Оптимизатор постарался на славу: он честно спустил наше условие isNull(UDF(id)) на уровень сразу после непосредственного чтения файла, даже до того момента, когда мы отбираем только уникальные id. А это значит, что наша тяжелая UDF пыталась выполниться миллиарды раз вместо тысяч.

Что здесь можно придумать? Например, сделать cache(persist) после вычисления уникальных id (T1). Или использовать lateral view, через который оптимизатор не пропустит условие дальше.

select udf_result as newID 
  from T1 lateral view explode (array(UDF(ID))) as udf_result	

Получили то, что и хотели в начале — UDF вычисляется только для уникальных id:

image-loader.svg

Заключение

За рамками этой статьи остались вопросы, связанные с оптимизацией JOIN: broadcast, data skew, с плюсами и минусами coalesce и repartition. Некоторые моменты достаточно подробно описаны на Хабр, а некоторые — нет. Возможно, это будет темой следующей статьи. Спасибо за внимание.

© Habrahabr.ru