Apache Spark: настройка и отладка

Большинство современных приложений содержат в себе набор настроек по умолчанию, позволяющий обеспечить достаточно эффективную работу разворачиваемого приложения что называется «из коробки». Есть конечно критики данного подхода, но в целом он позволяет автоматизировать процесс установки и базовой настройки целевой системы.

Однако, при серьезном использовании любой системы рано или поздно возникает необходимость в гибкой настройке. А необходимость в отладке как правило возникает гораздо раньше. Apache Spark в этом плане не является исключением и в этой статье мы поговорим о механизмах настройки Spark и некоторых параметрах, которые пользователям может понадобиться отрегулировать под свои нужды. Также мы рассмотрим механизмы журналирования.

Класс SparkConf

Начнем рассмотрение вопросов настройки Spark с изменения конфигурации среды выполнения приложения. Основным механизмом настройки в Spark является класс SparkConf. При создании нового объекта SparkContext нам потребуется экземпляр SparkConf.

В примере ниже мы объявляем новый экземпляр SparkConf, c именем My Spark APP.

Использование параметра local[4] означает, что приложение Spark запускается в локальном режиме, а не в кластере, где находятся данные. Далее мы переопределяем порт по умолчанию на 36 000. И в завершении создаем SparkContext.

# Создать объект conf

conf = new SparkConf()
conf.set("spark.app.name", "Му Spark Арр")
conf.set("spark.master", "local[4]")
conf.set("spark.ui.port", "36000") # Переопределить порт по умолчанию

# Создать SparkContext с данной конфигурацией

sc = SparkContext(conf)

В целом, класс SparkConf устроен довольно просто, каждый экземпляр SparkConf состоит из пар ключ/значение, представляющих параметры конфигурации, которые пользователь может переопределить. Каждый параметр в Spark определяется строковым ключом и значением.

Чтобы воспользоваться объектом SparkConf, его сначала нужно создать, для этого с помощью метода set (} мы определяем параметры конфигурации, и затем передать этот объект в вызов конструктора SparkContext.

Стоит отметить, что в этом примере мы устанавливаем значения параметров SparkConf программно, в коде приложения, что может оказаться очень удобным при динамическом изменении конфигурации приложения.

Однако, если следовать лучшим практикам разработки приложений, например методологии 12 факторов, то нельзя жестко указывать конфигурационные параметры в коде самого приложения.

И здесь в Spark имеется инструмент spark‑submit. При запуске приложения с помощью сценария spark‑submit, данный скрипт внедряет в окружение параметры настройки по умолчанию, которые затем определяются и переписываются во вновь созданный объект SparkConf. Благодаря этому функционалу пользовательские приложения могут просто конструировать <<Пустые>> объекты SparkConf и передавать их непосредственно в вызов конструктора SparkContext.

При работе с данным конструктором также важно учитывать, что все решения по значениям параметров конфигурации должны быть приняты до создания объекта SparkContext.

В примере ниже при запуске spark‑submit мы используем универсальный флаг ‑conf, который может принимать любые конфигурационные значения Spark.

b3b053077ef7778d82694a5d58b6d86d.png

Также с помощью spark‑submit можно загружать конфигурационные параметры из файла. Использование файлов с готовыми конфигурациями позволяет упростить развертывание приложения в различных средах.

По умолчанию spark‑submit ищет настройки в conf/spark‑defaults.conf и пытается прочитать пары ключ/значение, разделенные пробельными символами. При необходимости можно указать иное расположение файла с помощью флага ‑properties‑file.

$ bin/spark-submit \

    --class com.example.MyApp \

    --properties-file my-conf.conf \

Предлагаемый файл с параметрами конфигурации будет иметь следующий вид:

spark.master local[4]
spark.app.name "Му Spark Арр"
spark.ui.port 36000

Веб интерфейс Spark

Удобным инструментом для выполнения отладки и исследования производительности приложения является веб интерфейс. По умолчанию он доступен на порту 4040 машины, на которой запущен драйвер Spark. Однако в кластерных средах этот порт может отличаться.

В зависимости от используемой версии Spark содержимое веб интерфейса может несколько отличаться, однако обычно на веб странице можно увидеть по нодам, запущенным и завершенным приложениям, потребляемых ресурсах и многое другое.

76233970687c6c95977c5b65c49360c7.png

С помощью веб-интерфейса можно не только выявлять задачи, выполняющиеся медленно, но также и смотреть, сколько времени тратят задачи на каждом этапе своего жизненного цикла: чтение, вычисление и запись. Так, к примеру, если задача тратит мало времени на чтение/запись данных, но выполняется слишком долго, это может быть обусловлено неоптимальной работой программного кода. Хотя иногда бывают обратные ситуации, когда некоторые задачи тратят много времени на чтение данных из внешних источников, и в этом случае оптимизация кода не даст ощутимых результатов, потому что узким местом являются операции сохранения данных.

Логирование в Spark

Помимо информации, которую можно получить с помощью веб интерфейса, для решения проблем можно также воспользоваться журналами событий, которые заполняют непосредственно драйверы и исполнители. Журналы событий в силу в силу своей структуры содержат более полные следы аномальных событий, такие как внутренние предупреждения или сведения об исключениях в пользовательском коде. Эти данные могут помочь при решении проблем или устранении неожиданных аномалий в поведении.

В Spark журналы событий распологаются в различных местах в зависимости от режима развертывания. Так в режиме Standalone логи отображаются непосредственно в веб‑интерфейсе ведущего узла. Также по умолчанию события сохраняются в подкаталоге ~/work/ рабочего узла Spark. При использовании режима YARN проще всего получить доступ к журналам с помощью инструмента выборки информации из журналов с помощью:

yarn logs -applicationid <арр ID>

Данная команда возвращает отчет с журналами для указанного приложения.

Также журналы событий можно посмотреть с помощью веб интерфейса:

3c86acb1cc90ef10a49271f95bfc3daf.png

Что влияет на производительность?

Далее мы поговорим об общих проблемах производительности, с которыми можно столкнуться в приложениях, а также посмотрим, как можно настроить приложение с целью повышения его производительности.

Напомним, что RDD (Resilient Distributed Dataset) в Spark — это неизменяемая распределённая совокупность элементов данных. На логическом уровне набор RDD является единой коллекцией объектов и в процессе выполнения RDD делится на множество разделов, каждый из которых содержит подмножество всех данных.

Когда Spark планирует и выполняет задачи, для каждого раздела создается по одной задаче, и каждая задача будет выполняться по умолчанию на одном ядре. В большинстве случаев такой степени параллелизма вполне достаточно для быстрой обработки наборов RDD. Кроме того, параллелизм для исходных RDD обычно зависит от используемой системы хранения. Например, в HDFS исходные наборы RDD делятся на разделы по блокам файла HDFS. Для наборов, полученных в результате обработки других наборов, степень параллелизма определяется размерами родительских наборов RDD.

В зависимости от степени параллелизма мы можем по разному влиять на производительность. Так, если у нас недостаточно высокая степень параллелизма, некоторые ресурсы Spark могут простаивать в ожидании своей задачи. Например, если в распоряжение приложения передана 1000 ядер, а оно выполняет стадию, состоящую всего из 30 задач, можно было бы увеличить степень параллелизма и задействовать большее число ядер.

И, напротив, если степень параллелизма слишком высока, небольшие накладные расходы, связанные с выполнением каждой задачи, в сумме могут оказаться существенными. Признаком такой ситуации может служить почти мгновенное — в течение нескольких миллисекунд — выполнение задач или когда задачи не читают и не записывают данных.

Посмотрим способы настройки параллелизма. Во‑первых, мы можем передать степень параллелизма в виде параметра в операции, которые производят новые наборы RDD. Во‑вторых — любой имеющийся набор можно перераспределить между большим или меньшим числом разделов.

Мы можем перераспределить RDD случайным образом с помощью операторов repartition () или coalesce (). Если у вас сложилось мнение, что степень параллелизма слишком высокая или слишком низкая, попробуйте перераспределить свои данные с помощью этих операторов.

В качестве примера можно рассмотреть ситуацию, когда приложение читает большой объем данных из облака и сразу после этого выполняет операцию filter (), которая скорее всего исключит какую‑то часть из набора данных. По умолчанию набор RDD, возвращаемый функцией filter (), получит тот же размер, что и родительский, и может включать множество пустых или маленьких разделов. И здесь мы можем увеличить производительность приложения путем объединения маленьких разделов RDD.

Рассмотрим пример кода на Python, который выполняет необходимые действия.

>>> input = sc.textFile("\cloud_store\*.log")
>>> input.getNumPartitions()
35154
# Фильтр, исключающий почти все данные
>>> lines = input.filter(lamЬda line: line.startswith("2024-10-14"))
>>> lines.getNumPartitions()
35154
>>> lines = lines.coalesce(S) .cache()
>>> lines.getNumPartitions()
4
>>> lines.count()

В первых двух строках мы загружаем данные, далее осуществляем фильтрацию. Затем производим объединение строк в RDD перед кэшированием. Все последующие операции будут выполняться с объединенным набором RDD.

Заключение

В этой статье мы поговорили о том, как можно работать с настройками и логами в Apache Spark, а также о некоторых аспектах оптимизации работы с данными.

Кстати, Spark предлагает несколько API. Приглашаю вас на бесплатный вебинар, где рассмотрим чем они отличаются и когда какой стоит использовать. Зарегистрироваться.

Также на странице курса вы можете посмотреть записи прошедших вебинаров и зарегистрироваться на курс.

© Habrahabr.ru