[Перевод] Что нового в Apache Spark 3.4.0 — Spark Connect — Доработки для Shuffle

Spark Connect

Spark Connect — это, пожалуй, самая ожидаемая фича Apache Spark 3.4.0. Она была анонсирована на конференции Data+AI Summit 2022 и сейчас широко освещается в социальных сетях. Вот и я решил приобщиться и внести свой небольшой вклад в это дело, продемонстрировав вам несколько интересных деталей реализации.

Зачем это нужно?

Как обычно, хорошо бы начать с вопроса «а зачем это вообще нам нужно?». Мартин Грунд (Martin Grund), возглавляющий проект и определивший его первоначальный дизайн, приводит некоторые причины в SPARK-39375:

  • Стабильность: в кластерах с общим доступом OOM одного пользователя может вывести из строя весь кластер.

  • Обновляемость: зависимости в classpath создают достаточно неприятные проблемы, такие как ад зависимостей.

  • Developer experience: подключение вашего локального кода к кластерам уже давно перешло в разряд насущных потребностей. Несмотря на множество плагинов, стандартного способа сделать это не существует.

  • Встроенное удаленное подключение: требуются сторонние инструменты, например Apache Livy.

Честно говоря, самым интересным пунктом в этом списке для меня является developer experience. Запускать задачу с локальной машины, в перспективе с дополнительной отладочной информацией, без необходимости устанавливать сторонние плагины или специально настраивать для этого IDE — это просто замечательно. В одной из своих недавних презентаций Мартин привел довольно убедительный пример того, насколько Spark Connect может упростить нам жизнь:

— Написали один раз, выполняется везде.— Настройка конечной точки Spark Connect на основе переменных окружения)

— Написали один раз, выполняется везде.
— Настройка конечной точки Spark Connect на основе переменных окружения)

За этой простотой все-таки скрывается некоторая сложность. Но, к счастью, вы, как конечный пользователь, не по большей части будете от нее ограждены!

Высокоуровневая архитектура

В общем виде Spark Connect представляет собой клиент-серверную архитектуру с взаимодействием на основе gRPC:

0b6efe5de92d4d0ad92f3ba01f989189.png

Выглядит просто, не правда ли? Если вы настроены использовать Spark Connect, то этой картинки в принципе должно быть достаточно. Но если вы хотите узнать больше, давайте увеличим схему и посмотрим на некоторые детали:

4ae8bae15ae7d9692407fa836e97d830.png

Низкоуровневые детали

Должен признать, что схема достаточно детальная, но я не смог больше ничего из нее вынести, чтобы не потерять возможность наглядно продемонстрировать реализацию Spark Connect. Надеюсь, с помощью следующих подробностей вы сможете лучше понять, как Spark Connect устроен под капотом!

Для начала, когда вы создаете Spark Connect SparkSession, вы вызываете метод SparkSession.builder.remote("..."). Внутри он преобразуется в SparkSession, расположенный в модуле pyspark.sql.connect. Эта выделенная для Spark Connect сессия имеет общий API-контракт с SparkSession, но предоставляет немного другую реализацию. Вместо того чтобы взаимодействовать с физическими узлами Spark напрямую, она делает это посредством gRPC-вызовов.

В нашем примере, когда мы вызываем метод collect(), мы фактически выполняем gRPC-вызов типа execute plan. Здесь стоит отметить, что план является логическим и локальным, то есть он не имеет никакого отношения к плану Catalyst. Он является частью запроса с полезной нагрузкой на основе Protobuf, и сервер Spark Connect знает, как с ним работать.

После перехвата первоначального запроса в конечной точке SparkConnectService#executePlan начинается этап обработки. После этого метод handlePlan() вызывает функцию processAsArrowBatch:

processAsArrowBatches(
  	sessionId: String,
  	dataframe: DataFrame,
  	responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
	val spark = dataframe.sparkSession
// ...
	SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
  	val rows = dataframe.queryExecution.executedPlan.execute()
  	val numPartitions = rows.getNumPartitions
  	var numSent = 0

  	if (numPartitions > 0) {
    	type Batch = (Array[Byte], Long)

    	val batches = rows.mapPartitionsInternal(
      	SparkConnectStreamHandler
        	.rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId))

// ...
      	partition.foreach { case (bytes, count) =>
        	val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
        	val batch = proto.ExecutePlanResponse.ArrowBatch
          	.newBuilder()
          	.setRowCount(count)
          	.setData(ByteString.copyFrom(bytes))
          	.build()
        	response.setArrowBatch(batch)
        	responseObserver.onNext(response.build())
        	numSent += 1
      	}

      	currentPartitionId += 1
    	}
  	}

Как видно из этого фрагмента, есть еще одна (на этот раз физическая) сессия SparkSession, которая после преобразования логического плана Spark Connect в физическое представление работает непосредственно с данными и возвращает их клиенту в виде пакетов (батчей).

Расширяемость

Кроме того, Spark Connect является расширяемым. Вы можете реализовать собственные расширения для отношений, команд и выражений в составе библиотек Spark Server. Все, что вам нужно сделать в этом случае, это:

  1. Определите плагин. Он должен реализовать один из этих интерфейсов: CommandPlugin, ExpressionPluginили RelationPlugin. Плагин преобразует логический план Protobuf, полученный от клиента, в его эквивалент в Catalyst.

Зарегистрируйте плагины команд, выражений и отношений в конфигурационных записях spark.connect.extensions.command.classes, spark.connect.extensions.expression.classesи spark.connect.extensions.relation.classesсоответственно. Таким образом, зарегистрированные классы будут загружены SparkConnectPluginRegistryи преобразованы на этапе преобразования плана из Protobuf в Catalyst. Ниже приведен пример для отношений:

private def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
	SparkConnectPluginRegistry.relationRegistry
  	// Лениво перебираем коллекцию.
  	.view
  	// Применяем трансформацию.
  	.map(p => p.transform(extension, this))
  	// Находим первую непустую трансформацию или вызываем  throw.
  	.find(_.nonEmpty)
  	.flatten
  	.getOrElse(throw InvalidPlanInput("No handler found for extension"))
  }
  1. Определите клиентскую часть. Она должна быть способна кодировать сообщение Protobuf, используемое для описания плагина на стороне сервера.

Резюме из презентации Мартина, упоминавшейся ранее:

0454085c57bc8065c79cb90f2dae9d1a.png

Для полноты картины — еще один материал из выступления Мартина, демонстрирующий всю мощь реализации Spark Connect на базе Protobuf:

Вы можете использовать его [плагин] на стороне клиента на любом языке. И это еще один ключевой момент. Если вы создадите свою серверную библиотеку, вы сможете использовать ее из Rust, из Go, откуда угодно, если только вы знаете, как закодировать в ней сообщение Protobuf.

Databricks on LinkedIn: Use Spark from anywhere: A Spark client in Python powered by Spark Connect… | 92 comments

www.linkedin.com

Пок это все, что касается Spark Connect в контексте Apache Spark 3.4.0, но это только начало истории Apache Spark. Недавняя активность в подкаталоге на Github дает надежду на то, что Spark Connect может стать постоянной рубрикой в моей серии «Что нового в Apache Spark»!

— Доработки для Shuffle

Shuffle (перетасовка) — постоянно повторяющаяся тема в серии «Что нового в Apache Spark». Почему? Зачастую это одна из самых трудоемких частей задач, а знание об улучшениях в этом процессе попросту помогает писать лучшие конвейеры.

Хранилище состояний shuffle-сервиса RocksDB

Автором первой фичи, связанной с shuffle, является Ян Цзе (Yang Jie). Он расширил хранилище состояний (state store) shuffle-сервиса реализацией для RocksDB. Но прежде всего, зачем здесь RocksDB? Shuffle-сервис представляет собой прокси-сервер на каждом исполнителе в кластере. При чтении shuffle-данных исполнители общаются с прокси, а не напрямую между собой. Вы можете подумать, что в этой конфигурации shuffle-блоки хранятся в RocksDB, но это не так. Сервис отслеживает только файлы, поэтому ему необходимо поддерживать состояние управляемых исполнителей. А если точнее, то вот это:

public class ExecutorShuffleInfo implements Encodable {
/** Базовый набор локальных директорий, в которых исполнитель хранит свои shuffle-файлы. */
  public final String[] localDirs;
  /** Number of subdirectories created within each localDir. *//** Количество подкаталогов, созданных в каждом localDir. */
  public final int subDirsPerLocalDir;
  /**
   * Shuffle-менеджер (SortShuffleManager), который использует исполнитель.
   * Если эта строка содержит точку с запятой, она также будет включать метаинформацию
   * для push-based shuffle в формате JSON. Пример строки с точкой с запятой:
   * SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
   */
  public final String shuffleManager;
// ...
}

Проблема с внешним shuffle-сервисом, как и с любым другим, заключается в риске сбоя. Исторически эта проблема была решена в ExternalShuffleService, который должен быть устойчив к перезапускам NodeManager в yarn, в релизе 1.6.0. Пул-реквест, предложенный Имраном Рашидом (Imran Rashid) для привязанной JIRA, очень хорошо описывает назначение хранилища состояний shuffle-сервиса:

В целом, Yarn-приложения должны быть устойчивы к перезапускам NodeManager. Однако если запустить spark с включенным внешним shuffle-сервисом, то после перезапуска NodeManager все перетасовки не работают, поскольку shuffle-сервис потерял часть состояния с информацией о каждом исполнителе. (Обратите внимание, что shuffle-данные прекрасно сохраняются на диске после перезапуска NodeManager, проблема в том, что мы потеряли небольшой фрагмент состояния, позволяющий нам найти эти файлы). Решение, предлагаемое здесь, заключается в том, что внешний shuffle-сервис может записывать свое состояние в файл каждый раз, когда добавляется исполнитель. При работе с yarn этот файл находится в локальном каталоге NodeManager. При каждом запуске сервиса он ищет этот файл, и если он существует, то считывает его и перерегистрирует в нём всех исполнителей.

Каждый раз, когда запускается новый исполнитель, он регистрируется во внешнем shuffle-сервисе здесь:

public class ExternalShuffleBlockResolver {

  public void registerExecutor(
  	String appId, String execId, ExecutorShuffleInfo executorInfo) {
	AppExecId fullId = new AppExecId(appId, execId);
	logger.info("Registered executor {} with {}", fullId, executorInfo);
	try {
  	if (db != null) {
    	byte[] key = dbAppExecKey(fullId);
    	byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
    	db.put(key, value);
  	}
	} catch (Exception e) {
  	logger.error("Error saving registered executors", e);
	}
	executors.put(fullId, executorInfo);
  }

До выхода версии 3.4.0 Apache Spark поддерживал в качестве бэкенда «db» только LevelDB. Теперь вы можете настраивать это в spark.shuffle.service.db.backendи использовать RocksDB.

В этой связи стоит упомянуть о проблемах, с которыми вы можете столкнуться при работе компонентов на базе LevelDB на Apple Silicon, и внешнее хранилище состояний shuffle-сервиса — одна из них.

Push-based shuffle

В этом разделе мы начнем с одного исправления, предложенного Ваном Куном (Wan Kun), для удаления файлов данных слияния (merge) для shuffle на основе push-модели. Начиная с этого момента драйвер отправляет сообщение RemoveShuffleMergeво все места слияния, что приводит к удалению всех связанных с ними файлов данных слияния.

Второе исправление, на этот раз реализованное gaoyajun02, решает проблему чтения блоков нулевого размера, которые могут поступать от узлов с аппаратными проблемами. В этом случае, если это возможно, программа поиска shuffle-блоков будет откатываться к получению оригинальных shuffle-блоков.

Кроме того, есть и улучшения, связанные с отказоустойчивостью. Йе Чжоу (Ye Zhou) реализовала возможность для push-based shuffle-сервиса хранить свое состояние в инстансе LevelDB. Это позволяет избежать потери состояния в случае перезапуска NodeManager, что могло бы привести к невозможности обслуживать запросы на выборку объединенных данных.

Наконец, были внесены некоторые изменения, связанные с метриками. Тейдип Гудивада (Thejdeep Gudivada) добавил метрики чтения на стороне клиента. Благодаря им вы можете узнать о таких вещах, как количество поврежденных объединенных блоков, количество возвратов к нормальному чтению блоков и многое другое.

Я прекрасно понимаю, что эта статья в блоге гораздо более низкоуровневая, чем предыдущие из этой серии. Тем не менее, она служит хорошим напоминанием о дополнительных компонентах shuffle, включая shuffle-сервис и push-based shuffle, добавленных не так давно (в Apache Spark 3.2.0).

Как успешно пройти интервью на позицию Data Engineer с уклоном на Spark?
Разберемся на завтрашнем открытом уроке. Преподаватель OTUS также поделится практическими кейсами и ответит на вопросы участников. Записаться на урок можно на странице курса «Spark Developer».

© Habrahabr.ru