Количество партиций в Spark при инициализации Data API: DataFrame, RDD, DataSet

187c0c0974b1b6284137d073eaff874a.png

6a76df80fff727e733f2fe7a6b83efce.pngАвтор: Вадим Опольский

Luxoft DXC Technology, Scala Big Data разработчик

Любое Spark Data API: DataFrame, RDD, DataSet состоит из партиций. Партиция — это часть данных, аллоцированных в оперативной памяти воркеров (жестком диске при кэшировании) для параллельных преобразований. Например, можно одновременно прибавить какое-то число к каждому элементу из партиции. Распределение элементов по партициям и их количество может происходить по принципу round-robin либо по хэшу от какой-то колонки в результате различных операций и зависит от типа операции.

Как узнать количество партиций?

В Apache Spark, например, если зайти в консоль spark-shell или pyspark и каким-то способом создать DataFrame, DataSet или RDD, количество партиций можно вывести в консоль следующим образом:

val exampleDF = Seq(1 to 500000: _*).toDF()
rdd.getNumPartitions

Давайте рассмотрим примеры, как будут распределяться элементы по партициям при создании нового DataFrame, RDD, DataSet. Как известно, новый инстанс такой структуры можно получить из памяти, с HDFS, FS, из Kafka topic, реляционной базы данных и других систем. 

  1. У DataFrame, DataSet, созданного из коллекции, будет количество партиций, равное следующему параметру:

sc.defaultParallelism*^1 => res19: Int = 2
val fromSeqDF = Seq(1 to 500000: _*).toDF()
fromSeqDF.rdd.getNumPartitions => 2
  1. У DataFrame, DataSet, созданного из памяти, будет ровно столько партиций, сколько в момент создания было доступных ядер или следующим параметром:

sc.defaultParallelism => res19: Int = 2
val fromMemoryDF = spark.range(0,100).toDF()
fromMemoryDF.rdd.getNumPartitions => 2
  1. В случае RDD, созданного из коллекции, количество партиций будет равно «default parallelism» или параметру метода parallelize:                                                 

val fromRangeRDD = sc.parallelize(0 to 100).toDF()
fromRangeRDD.getNumPartitions => 2
val fromRangeRDD = sc.parallelize(0 to 100, 4).toDF()
fromRangeRDD.getNumPartitions => 4
  1. DataFrame, DataSet, созданный из RDD, будет иметь столько партиций, сколько исходный RDD:

val schema = StructType(StructField(value,IntegerType,false)) 
val fromRddDF = spark.createDataFrame(fromRangeRDD, schema))
fromRddDF.rdd.getNumPartitions => 4

 То есть, например, 4 партиции, в случае RDD с 4-мя партициями.

  1. У DataFrame, DataSet, созданного из файла на HDFS, будет столько партиций, сколько блоков на HDFS имеет исходный файл. Либо в зависимости от параметра 

spark.sql.files.maxPartitionBytes which defaults to 128MB
val fromHdfsFileDF = spark
.read
.format("parquet”)
.load("hdfs://user/crimes_csv/part-00000-f8e2d087-fd31-4cb5-b96e-62d36ee7074b-c000.parquet”)
fromHdfsFileDF.rdd.getNumPartitions => 7
  1. У DataFrame, DataSet, созданного на файла из S3:

val fromS3FileDF = spark
.read
.format("csv”)
.load("s3://user/crimes_csv/part-00000-f8e2d087-fd31-4cb5-b96e-62d36ee7074b-c000.csv”)
fromS3FileDF.rdd.getNumPartitions => 7
  1. У DataFrame, DataSet, созданного в результате джойна, группировки или другой операции, которая вызывает shuffle:

val joinedShuffleDF = leftDF.join(rightDF, col("leftKey") === col("rightKey"), "inner")
val groupedShuffleDF = inputDF.orderBy(col("total trips").desc)
spark.conf.get("spark.sql.shuffle.partitions") => 200  *2

Остальные кейсы будут рассмотрены на занятиях курса Spark Developer. Какие кейсы рассмотрим:

  1. DataFrame, DataSet, созданного на основе группы файлов. 

  2. DataFrame, DataSet, созданного на основе реляционной базы данных.

  3. DataFrame, DataSet, созданного на таблицы в Hive.

  4. DataFrame, DataSet, созданного на основе топика в кафке. 

  5. DataFrame, DataSet, созданного на основе таблицы из Cassandra, Scylla DB.

  6. DataFrame, созданном на основе кастомного коннектора DataSource v2.

*1 spark.default.parallelism — Параметр спарк конфига с версии 0.5.0, который для таких распределенных shuffle операций как reduceByKey and join задает количество партиций в родительском RDD. Для таких операций, как parallelize без парент RDD, количество будет зависеть от ресурсного менеджера, например:

  • Local mode: количество ядер на локальной машине;

  • Mesos fine grained mode: 8;

  • Others: общее количество всех ядер на всех экзекьютерах или 2, смотря что больше.

*2 spark.sql.shuffle.partitions — Параметр спарк конфига с версии 1.1.0, который определяет количество партиций для DataFrame, DataSet которые получаются в результате джойнов или операций агрегаций. Для 

structured streaming эта конфигурация не может быть изменена в рантайме между двумя рестартами из одной и той же контрольной точки.  

Скоро состоится открытое занятие «Дата инженер и Spark в новых реалиях». На этом открытом уроке разберем:
— Как изменятся источники и получатели данных, объемы данных, языки для ETL, кластера, облака и IDE.
— Как изменится потребность на рынке в дата инженере и к чему нужно быть готовым.
— Обсудим open source технологии, примеры миграционных проектов.
Регистрируйтесь по ссылке.

© Habrahabr.ru