[Перевод] Дежурный data-инженер: рабочие хроники

pw6p2zkno6nth4ea-fsgyeigtlm.jpeg

Команда VK Cloud перевела серию статей о том, как data-инженер принимает вызовы технической команды по работе с платформой данных. Он рассказывает о реальных неисправностях, которые возникают в процессе работы, и о том, как команда работы с данными их устраняет.

От Pandas к pandas_on_spark


Spark-приложение было запущено из Airflow, но airflow task, выполнявший Spark-приложение, упал. В логе задач не было никаких сообщений об ошибках. Я проверил Driver Pod в Kubernetes и выяснил, что он не работает.

State: Terminated
Reason: OOMKilled
Exit Code: 137


Потом я посмотрел на код и выяснил, что в кластере Spark выполнялось Spark-приложение Pandas. Пользователь считывал данные в датафрейме Spark и запускал команду toPandas, преобразуя их в датафрейм Pandas. Поскольку при этом он собирал все данные от executors и направлял их в driver, у driver заканчивалась память.

Я попросил пользователя использовать API pandas_on_spark, а не Pandas. После незначительных правок некоторых функций Spark-приложение завершилось без ошибок.

Исключение Class not found (ошибка сериализации Spark)


При выполнении нового Spark-приложения в Spark 3.2 возникло исключение class not found. Я посмотрел код и выяснил, что в нём содержится определение класса, не поддерживающего сериализацию. Попросил пользователя подправить код или исправить ссылку на объект так, чтобы класс стал сериализуемым.

Проблема совместимости PyArrow


В продакшене Spark-приложение завершилось ошибкой, когда в среде conda пользователь обновил версию numpy. Выполняемое Spark-приложение завершилось следующей ошибкой:

pyarrow.lib.check_status pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object


Попросил пользователя обновить версию PyArrow в пакете conda до версии 4.0.1, и проблему совместимости удалось решить.

Insert Overwrite в ту же партицию


Spark-приложение, которое корректно выполнялось в Spark 2.3, завершилось с ошибкой в версии Spark 3.2. Сообщение об ошибке:

Analysis Exception: Cannot overwrite a path that is also being read from.


Мы выяснили, что пользователь пытался выполнить insert overwrite из одной партиции в другую той же таблицы. Поэтому Spark проверяет путь ввода и вывода в расположении таблицы, а не партиции — это и привело к сообщению об ошибке. Это нерешённая проблема в Spark. Я попросил пользователя указать значение false для convertMetastoreParquet, чтобы для insert overwrite Spark использовал Hive Serde, а не встроенный инструмент. С ошибкой удалось справиться, поскольку Hive Serde использует подход staging dir.

Коммит Spark для информации:  https://github.com/apache/spark/pull/35608

Неоднозначные столбцы


Выполняемое Spark-приложение завершилось следующей ошибкой:

org.apache.spark.sql.AnalysisException: Reference is ambiguous


Изучая код, я выяснил, что в одну из таблиц, используемую в join, добавлен новый столбец, который имелся и в другой таблице. Код использовал select($"joined_df1.*"). Это проблема Spark: он не удаляет дублирующиеся столбцы в случае star-расширения alias вложенного запроса. Я попросил пользователя удалить дублирующийся столбец в одном из двух датафреймов и создал в бэклоге тикет с требованием устранить проблему в исходном коде.

Коммит Spark для информации:


Падение Spark-приложения в момент записи


Spak-приложение выполнялось нормально, но завершилось сбоем. В логах я увидел следующую ошибку:

An Error occurred while calling o5331.saveAsTable
java.lang.OutOfMemoryError: Java Heap Space


Изучив сведения об executors на history server, я выяснил, что мы имеем дело со случаем неравных партиций: в одних партициях было больше данных, чем в других. Поскольку перед записью, которую сделал executor, в коде пользователя не выполнялся метод repartitioning датафрейма, в «перекошенной» партиции возникала ошибка OOM. Чтобы выполнить Spark-приложение в продакшен-среде, я попросил пользователя изменить память executor; когда объём памяти увеличился, удалось выполнить Spark-приложение без ошибок. Но я попросил пользователя дополнительно выполнить repartition.

Значение Null в запросе Spark при наличии данных в таблице Hive


Пользователь сообщил, что видит нулевые значения, когда выполняет запрос к таблице в Spark SQL. Но при выполнении запроса к той же таблице в Hive выводились другие значения.
Проблема заключается в том, что Spark и Hive по-разному обрабатывают файлы ORC. Я попросил пользователя запустить Spark Query, указав значение false для convertMetastoreORC. После этого Spark стал выдавать такой же результат, что и Hive.

Проблема «Spark local dir full»


Одно Spark-приложение завершилось со сбоем. Посмотрев логи, я нашёл следующую ошибку:

java.nio.file.FileSystemException: No space left on device


Такое бывает, когда переполняется Scratch space (spark.local.dir). Это произошло потому, что одно из Spark-приложений записало слишком много данных в диск, настроенный как локальная директория. Так как Spark у нас работает в кластере Kubernetes, то на каждой ноде у нас есть daemonset для выполнения задач на уровне ноды. Я использовал тот же daemonset, чтобы очистить диск на этой ноде, потом перезапустил Spark-приложение, и на этот раз оно завершилось без ошибок.

Не работает broadcast hint


Пользователь сообщил, что в одном из Spark-приложений не применяетсяjoin hint. Размер таблицы был около 800 Мбайт. Из кода было видно, что пользователь создал alias для таблицы, но в hintупоминал первоначальноеимя. Я попросил его использовать в hint alias, после чего план Spark показал, что broadcast заработал.

PySpark не работает с Python 3.8 в Livy


При выполнении PySpark с Python 3.8 через Livy возникла следующая ошибка.

TypeError: required field "type_ignores" missing from Module


Python 3.8 внёс изменение в API и тем самым вызвал эту ошибку. Чтобы исправить ситуацию, нужно было включить изменения с Livy в этот PR: LIVY-795

Null Pointer Exception в PySpark


Пользователь сообщил об исключении в одном из Spark-приложений в продакшен-среде.

java.lang.NullPointerException at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.project_subExpr_5$(Unknown Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)


Проблема заключалась в том, что схема Parquet (в случае с именем столбца) различалась в двух разных партициях.

  • Когда запросы к этим двум партициям выполнялись по отдельности, всё отлично работало. Проблему можно решить с помощью выполнения запроса к данным по партициям, с дальнейшим объединением (union).
  • Следите, чтобы в разных партициях имена столбцов были согласованными.
  • В самом общем смысле:

    — Изучите путь и получите схему каждой партиции.

    — Преобразуйте столбцы в строки, чтобы обеспечить совместимость типов данных между схемами.

    — Преобразуйте датафрейм в JSON RDD, после этого переходите к объединению партиций.


Сбой executor из-за OOM


Один executor Spark-приложения выходил из строя из-за OOM. Изучив код, я выяснил, что в нём сокращали число партиций с помощью coalesce(8), а затем выполняли дальнейшую обработку. После перехода на метод repartition(8) партиции стали более сбалансированными и у executor исчезла ошибка OOM.

Несоответствие партиций на диске и в Hive Metastore


Сбой Spark-приложения из-за следующей ошибки:

>> spark.sql("select max() from schema.table_name")

Caused by: java.util.concurrent.ExecutionException: 
           org.apache.hadoop.mapred.InvalidInputException: 
           Input path does not exist:


Эта ошибка указывает на несоответствие между Metastore и фактическим файлом данных в хранилище. Spark получает метаданные (директорию хранения  партиции в файловой системе) из HMS, а затем считывает файлы, обращаясь непосредственно к ожидаемой директории хранения партиции. Если партиция там отсутствует, Spark выводит эту ошибку. Устранить проблему можно несколькими способами:

1. Запустить MSCK REPAIR в таблице Hive. Он синхронизирует метаданные таблицы с партициями, имеющимися на диске. Обратите внимание: этот метод не поддерживает отслеживание, так как Hive не хранит историю расположения партиций. Так что если по какой-то причине (например, из соображений комплаенса) вам нужно сохранить информацию о расположении партиций, сделайте резервную копию актуальной информации о нём.

MSCK REPAIR TABLE table_identifier [{ADD|DROP|SYNC} PARTITIONS]

# ADD - adds new partitions (DEFAULT)
# DROP - drops all partitions from the session catalog that have non-existing locations
# SYNC - combination of DROP and ADD.


Примечание. Также можно использовать ALTER TABLE table_identifier RECOVER PARTITIONS.


2. verifyPartitionPath. Это нерекомендуемый флаг; если отметить его, выводится предупреждение. Он говорит Spark отфильтровать полученные из HMS расположения партиций, сравнив их с расположениями на диске, и сохранить только те, которые совпадают и на диске, и в Hive. Если у нас много партиций, это существенно снизит производительность. 

spark.sql.hive.verifyPartitionPath=true


3. ignoreMissingFiles. Это новый флаг, который разрешает Spark игнорировать отсутствующие на диске файлы. Безотказное средство. Когда стоит этот флаг, Spark отбирает все исключения file not found и обрабатывает их, просто печатая предупреждение. spark.files.ignoreMissingFiles=true.

4. spark.sql.optimizer.metadataOnly. Этот флаг можно поставить, если мы выполняем совокупную операцию только со столбцом партиции. Он разрешает Spark просто выполнить вычисления с метаданными; при этом он не проверяет файлы в этих партициях. Если у вас нет данных в определённых партициях, но в Hive есть информация о партициях, можно получить неверные результаты. spark.sql.optimizer.metadataOnly=true

Объединение датафрейма в цикле


Пользователь пожаловался, что Spark-приложение начало сбоить с ошибкой GC overhead.

INFO yarn.ApplicationMaster: 
    Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: 
        java.lang.OutOfMemoryError: 
            GC overhead limit exceeded


Объём памяти составлял 32 Гбайт. Увеличивать память драйвера дальше было некуда, поэтому я начал изучать код, чтобы выяснить, откуда пошла проблема. Нашёл вот такой фрагмент (преобразовал его в Pyspark из Scala):

df = Empty DF
for files_metadata in file_metadata_list: # 10 items 
    for files_location in files_metadata: # 60 locations
        cur_df = spark.read.parquet(files_location)
        cur_df = cur_df.withColumn("new col", "logic based on files_metadata")
        df = df.union(cur_df)


Этот код выполняется циклично и создаёт слишком много ссылок на датафреймы (600). Драйверу трудно выдержать такую нагрузку. То же подтвердилось и в JVM heap dump.

def assign_col_based_on_metadata(col_val):
    ‘'' 
    logic here 
    '''
    return new_val

helper_udf = udf(assign_col_based_on_metadata, returnType)

df = spark.read.parquet(glob_exp)
df = df.withColumn("file_name", input_file_name())
df = df.withColumn("new col", helper_udf(file_name))


Таким образом мы устранили ошибку драйвера GC overhead.

Поиск при проверке принадлежности


Пользователь пожаловался, что его Spark-приложение выполняется очень медленно. Изучив код, я выяснил, что он выполнял поиск всех строк одной таблицы, отсутствующих в другой таблице. В обеих таблицах были миллиарды записей. Запрос выглядел примерно так:

select a.* from huge_table a where a.id not in (select id from big_table)


Он выполнялся почти четыре часа. Чтобы решить проблему, я попросил пользователя использовать LEFT ANTI JOIN.

SELECT a.* FROM huge_table a LEFT ANTI JOIN big_table b ON a.id = b.id


На эту операцию ушло восемь минут.

Чрезмерное перераспределение данных при агрегировании в цикле


Пользователь пожаловался на низкую скорость выполнения Spark-приложения. То же Spark-приложение отлично выполнялось с небольшим объёмом данных. Посмотрев на метрики stage, я заметил огромные объёмы перераспределения данных (shuffle). Код, с которым работал пользователь, выглядел примерно вот так:

def aggregate_calls(df, t_age, agg_func):
    return df.filter(df['t_age'] <= t_age).groupBy('pos_type').agg(agg_func)

agg_funcs = [F.sum('t_amount'), F.count('*'), F.avg('t_amount')]

transaction_age = [1, 7, 15, 30, 90]

for t_age, agg_func in itertools.product(transaction_age, agg_funcs):
    result = aggregate_calls(df, t_age, agg_func)
    # other business logic


Вроде бы всё ясно и понятно. Пользователь предпочёл этот подход, потому что фильтрация данных уменьшает объём данных, которые попадают в операцию groupBy. Но из-за того, что мы неоднократно вызывали groupBy внутри цикла, перераспределение данных (shuffle) происходило несколько раз.

Я попросил изменить логику действий, переместив фильтр в группе самой операцией.

aggregations = []

for t_age in [1, 7, 15, 30, 90]:
    aggregations.extend([
        F.sum(F.when(df['t_age'] <= t_age, df['t_amount'])).alias(f'total_transcation_amount_last_{t_age}_days'),
        F.count(F.when(df['t_age'] <= t_age, True)).alias(f'num_transaction_last_{t_age}_days'),
        (F.sum(F.when(df['t_age'] <= t_age, df['t_amount'])) / F.count(F.when(df['t_age'] <= t_age, True))).alias(f'avg_tran_amount_last_{days}_days')
    ])
result = df.groupBy('pos_type').agg(*aggregations)


В этом коде мы используем цикл, чтобы генерировать выражение агрегирования для каждой агрегатной функции и каждого количества дней. Это сокращает количество перераспределений до 1.

Spark History Server перестал работать


С одного сервера перестала поступать история клиентов.

Spark History Server использует level_db/rocks_db для кеширования данных приложения, чтобы ускорить работу пользовательского интерфейса. Проблема заключалась в том, что для одновременного получения разных старых данных одна команда запустила много потоковых Spark-приложений. Приложение оказалось перегруженным, а диск, содержащий данные level_db, быстро переполнялся.

Чтобы это исправить, можно удалить настройку spark.history.store.path и перезапустить Spark History Server.

Низкая скорость выполнения Spark-приложения


Пользователь сообщил, что приложение, которому обычно хватало 30 минут, работает вот уже четыре часа и ещё не завершило Spark-приложение. Проверив Spark History Server, мы увидели, что за несколько минут запускается и выполняется множество Spark-приложений (больше 2,5 тысячи). Я сравнил это с данными за предыдущие дни: раньше эти Spark-приложения выполнялись примерно за 300 мс. Копнул глубже: был вызов БД с целью проверки и удаления дубликатов. Когда я проверил журнал Spark-приложения, выяснилось, что подключение к БД занимает 4–5 секунд. Кроме того, вызов этой проверки дубликатов выполняется в каждом Spark-приложении. Так что эта задержка объяснялась низкой скоростью работы базы данных.

Я попросил команду БД исправить проблему со скоростью, а пока что предложил владельцу приложения переместить логику подключения к БД в единую точку. После этого выполнение Spark-приложения уложилось в 40 минут.

Пролептический григорианский календарь

Caused by: org.apache.spark.SparkUpgradeException:
    Spark 3.0+'s Proleptic Gregorian calendar.


Здесь всё просто. Spark 3.2 считывал файл, записанный Spark 2.3, а в старой дате использовался другой формат. В Spark версии 2.4 и ранее используется гибридный календарь (юлианский + григорианский), а в Spark 3.2 используется пролептический григорианский календарь. Изменения касаются результатов, в которых фигурируют даты до 15 октября 1582 года (григорианское летосчисление). Spark 3.2 не удаётся выполнить чтение, если он видит старые даты или временные метки, и ему непонятно, к какому календарю они относятся.

--conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED 
--conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED


Если задать эту конфигурацию, Spark не переносит изменения и считывает даты и метки времени как есть.

Запись через df.write.insertInto


При выполнении записи в партиционированную таблицу с помощью df.write.insertInto не удаётся выполнить команду 

df.write.mode(‘append’).insertInto(‘mybigtable’)


Ошибка:

org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2562)
at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1238)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1234)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:436)


d3b_wobcywdamkvcha3zxolgb5m.png

Как видно на рисунке выше, Spark выполняет вызовы Hive Metastore, чтобы получить информацию обо всех партициях. Spark использует этот API getPartition() в Hive 2.3.9 для извлечения всех метаданных обо всех партициях за один вызов. Из-за огромного количества партиций (~6L) размер метаданных превышает скромный лимит в 2 Гбайт, установленный для буфера сервера. В результате получаем таймаут сокета. Эту проблему можно исправить двумя способами:

  1. Внедрить пакетный API в клиенте Hive (Hive 27505).
  2. Перейти на Hive Serde для операции записи датафрейма. Для этого устанавливаем значение false для spark.sql.hive.convertMetastoreOrc или spark.sql.hive.convertMetastoreParquet.


Проблема с обновлением каталога


Каталог Spark не обновляется новыми данными, сохраненными во внешнюю таблицу Hive.

У нас есть потоковое Spark-приложение, которое получает ту или иную информацию из Kafka topic и на её основании направляет запросы к таблице Hive. При выполнении запроса к таблице не удавалось получить её последнее представление.

oihurlnsmud129fehywd7lfrml8.png

Начиная с версии 2.4 Spark кеширует список файлов для таблицы; нужно выполнить REFRESH TABLE, если список файлов за пределами Spark изменился.

spark.sql.metadataCacheTTLSeconds 180s
spark.sql.catalogImplementation hive


Ссылка:  SPARK-30616

Переполнение стека


Spark-приложение завершилось следующей ошибкой:

org.apache.spark.SparkException: Job aborted due to stage failure:
ExecutorLostFailure (executor 4 exited caused by one of the running tasks) 
Reason: Container from a bad node: container_xxx_xx_000009 on host: xx.xx.com. Exit status: 50.


В логе executor находим ошибку:

ERROR Executor: Exception in task 102.0 in stage 884.0 (TID 31106)
java.lang.StackOverflowError
    at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:763)
    at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:600)
    at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:600)


8m0qylozrkdobnv-e4ronom9hzs.png

На рисунке выше видно, как партицирована память executor. Но это ошибка переполнения стека, она касается размера стека Java.

Spark позволяет установить свойства JVM с помощью extraJavaOptions, так что выполняем Spark-приложение со следующей конфигурацией Spark.

# set stack size to 1G
"spark.executor.extraJavaOptions=-Xss1G" 


Пустые ссылки


Spark-приложение завершилось следующей ошибкой:

Caused by: org.apache.spark.SparkException: Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations.


Когда Spark выполняет сериализацию датасетов, ссылки на SparkContext и SparkSession обнуляются (то есть отмечаются как @transient или обнуляются посредством Closure Cleaner). В результате методы датасета, которые ссылаются на эти объекты со стороны драйвера (то есть действия или преобразования), видят пустые ссылки и выводят вышеуказанное исключение.

В коде пользователь выполнял persist для датафрейма и потом, используя sparkContext.broadcast с broadcast переменной, выполнял некую операцию. Это и было причиной проблемы.

# order of method call
df.cache
df.broadcast
df.apply()


Мы попросили пользователя удалить broadcast операцию, и код заработал как надо.

Несогласованность типов данных

error: type mismatch;
found   : org.apache.spark.sql.types.DecimalType.type
required: org.apache.spark.sql.types.DataType
        StructField("col_name", DecimalType, true)


Обновление метаданных Hive и Parquet

С точки зрения обработки схемы таблицы в Hive и Parquet есть два основных различия.

  1. Parquet чувствителен к регистру, а Hive — нет.
  2. Hive считает, что все столбцы допускают значение null, а в Parquet допустимость значений null имеет принципиальное значение.


По этой причине, когда мы преобразуем таблицу Parquet из Hive Metastore в Spark SQL, нужно согласовать схему Hive Metastore и схему Parquet. 

Правила согласования:  

  • У полей с одинаковыми именами в обеих схемах должен быть одинаковый тип данных вне зависимости от допустимости значений null. 
  • Чтобы учитывать такую допустимость, у согласованного поля тип данных должен быть как в Parquet.


Согласованная схема содержит именно те поля, которые определены в схеме Hive Metastore.

  • Поля, которые фигурируют только в схеме Parquet, удаляются в согласованной схеме.
  • Поля, которые фигурируют только в схеме Hive Metastore, добавляются в согласованную схему как допускающие значение null.


Решение этой проблемы — переписать файл Parquet и указать в нём тип данных, как в таблице Hive.

Разница версий Scala 


После обновления Spark-приложения возникло следующее исключение:

java.lang.ClassCastException: cannot assign instance of scala.None$ to field 
org.apache.spark.scheduler.Task.appAttemptId of type scala.Option in instance of 
org.apache.spark.scheduler.ResultTask
.
.
.
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)


Оно вызвано тем, что версии Scala различаются в Spark и в приложении. Я изучил Spark окружение (вкладка Environment в пользовательском интерфейсе) и узнал версию Scala, используемую в classpath.

Java Home — /usr/mware/jdk8u352/jre
Java Version — 1.8.0_352
Scala Version — version 2.12.15


После этого я проверил версию Scala в App Jar. Обнаружилась транзитивная зависимость там, где в classpath подтягивали другую версию Scala. После удаления с помощью исключения в файле pom проблема исчезла.

Слишком большая партиция


Spark-приложение завершилось следующей ошибкой:

IllegalArgumentException: Cannot grow BufferHolder error. 
  java.lang.IllegalArgumentException: C
annot grow BufferHolder by size 95969 because the size after growing exceeds size limitation 2147483632


Как мы уже знаем, партиция  BufferHolder имеет максимальный размер 2 147 483 632 байта (примерно 2 Гбайт). Если партиция больше указанного размера и для него нужно перераспределить или буферизировать данные, возникает описанная ошибка. Я попросил пользователя выполнить repartition данных на основе двух ключей, а не одного, и это решило проблему.

Слишком много преобразований


Ещё одно Spark-приложение завершилось ошибкой:

java.lang.StackOverflowError at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$5466/589672638.get$Lambda(Unknown Source)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:777)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)


Изучили физический план Spark-приложения. Оказалось, он очень большой, со множеством повторений. Пользовательский код содержал много преобразований withColumn, и из-за этого переполнялся стек JVM.

Я попросил пользователя увеличить размер стека драйвера. По умолчанию задано значение 1024 Кбайт, но его можно увеличить до 4 Мбайт, указав для spark.driver.extraJavaOptions параметр -Xss4M.

Ещё одна проблема


Spark-приложение пользователя завершилось ошибкой OOM в Java Heap space. При этом на вкладке Stage система вывела следующее сообщение об ошибке:

ujquxcaymswbl_c9ktkimrf_i_4.png

Когда я включил дополнительные метрики и отсортировал задачи по признаку Failed, я заметил, что память выполнения была перегружена, что приводило к сбою.

rrsytoyfrgxcwxlcbuh97ij5vl8.png

Я попросил пользователя изменить следующие параметры, и Spark-приложение завершилось без сбоя:

spark.memory.fraction 0.8
spark.memory.storageFraction 0.4


Я предложил именно такую конфигурацию, поскольку размер executor составлял 28 Гбайт, а у нас executors не должны быть больше 32 Гбайт.

Присоединяйтесь к Telegram-каналу «Данные на стероидах». В нём вы найдёте всё об инструментах и подходах к извлечению максимальной пользы из работы с данными: регулярные дайджесты, полезные статьи, а также анонсы конференций и вебинаров.

© Habrahabr.ru