Количество партиций в Spark при инициализации Data API: DataFrame, RDD, DataSet
Luxoft DXC Technology, Scala Big Data разработчик
Любое Spark Data API: DataFrame, RDD, DataSet состоит из партиций. Партиция — это часть данных, аллоцированных в оперативной памяти воркеров (жестком диске при кэшировании) для параллельных преобразований. Например, можно одновременно прибавить какое-то число к каждому элементу из партиции. Распределение элементов по партициям и их количество может происходить по принципу round-robin либо по хэшу от какой-то колонки в результате различных операций и зависит от типа операции.
Как узнать количество партиций?
В Apache Spark, например, если зайти в консоль spark-shell или pyspark и каким-то способом создать DataFrame, DataSet или RDD, количество партиций можно вывести в консоль следующим образом:
val exampleDF = Seq(1 to 500000: _*).toDF()
rdd.getNumPartitions
Давайте рассмотрим примеры, как будут распределяться элементы по партициям при создании нового DataFrame, RDD, DataSet. Как известно, новый инстанс такой структуры можно получить из памяти, с HDFS, FS, из Kafka topic, реляционной базы данных и других систем.
У DataFrame, DataSet, созданного из коллекции, будет количество партиций, равное следующему параметру:
sc.defaultParallelism*^1 => res19: Int = 2
val fromSeqDF = Seq(1 to 500000: _*).toDF()
fromSeqDF.rdd.getNumPartitions => 2
У DataFrame, DataSet, созданного из памяти, будет ровно столько партиций, сколько в момент создания было доступных ядер или следующим параметром:
sc.defaultParallelism => res19: Int = 2
val fromMemoryDF = spark.range(0,100).toDF()
fromMemoryDF.rdd.getNumPartitions => 2
В случае RDD, созданного из коллекции, количество партиций будет равно «default parallelism» или параметру метода parallelize:
val fromRangeRDD = sc.parallelize(0 to 100).toDF()
fromRangeRDD.getNumPartitions => 2
val fromRangeRDD = sc.parallelize(0 to 100, 4).toDF()
fromRangeRDD.getNumPartitions => 4
DataFrame, DataSet, созданный из RDD, будет иметь столько партиций, сколько исходный RDD:
val schema = StructType(StructField(value,IntegerType,false))
val fromRddDF = spark.createDataFrame(fromRangeRDD, schema))
fromRddDF.rdd.getNumPartitions => 4
То есть, например, 4 партиции, в случае RDD с 4-мя партициями.
У DataFrame, DataSet, созданного из файла на HDFS, будет столько партиций, сколько блоков на HDFS имеет исходный файл. Либо в зависимости от параметра
spark.sql.files.maxPartitionBytes which defaults to 128MB
val fromHdfsFileDF = spark
.read
.format("parquet”)
.load("hdfs://user/crimes_csv/part-00000-f8e2d087-fd31-4cb5-b96e-62d36ee7074b-c000.parquet”)
fromHdfsFileDF.rdd.getNumPartitions => 7
У DataFrame, DataSet, созданного на файла из S3:
val fromS3FileDF = spark
.read
.format("csv”)
.load("s3://user/crimes_csv/part-00000-f8e2d087-fd31-4cb5-b96e-62d36ee7074b-c000.csv”)
fromS3FileDF.rdd.getNumPartitions => 7
У DataFrame, DataSet, созданного в результате джойна, группировки или другой операции, которая вызывает shuffle:
val joinedShuffleDF = leftDF.join(rightDF, col("leftKey") === col("rightKey"), "inner")
val groupedShuffleDF = inputDF.orderBy(col("total trips").desc)
spark.conf.get("spark.sql.shuffle.partitions") => 200 *2
Остальные кейсы будут рассмотрены на занятиях курса Spark Developer. Какие кейсы рассмотрим:
DataFrame, DataSet, созданного на основе группы файлов.
DataFrame, DataSet, созданного на основе реляционной базы данных.
DataFrame, DataSet, созданного на таблицы в Hive.
DataFrame, DataSet, созданного на основе топика в кафке.
DataFrame, DataSet, созданного на основе таблицы из Cassandra, Scylla DB.
DataFrame, созданном на основе кастомного коннектора DataSource v2.
*1 spark.default.parallelism — Параметр спарк конфига с версии 0.5.0, который для таких распределенных shuffle операций как reduceByKey and join задает количество партиций в родительском RDD. Для таких операций, как parallelize без парент RDD, количество будет зависеть от ресурсного менеджера, например:
Local mode: количество ядер на локальной машине;
Mesos fine grained mode: 8;
Others: общее количество всех ядер на всех экзекьютерах или 2, смотря что больше.
*2 spark.sql.shuffle.partitions — Параметр спарк конфига с версии 1.1.0, который определяет количество партиций для DataFrame, DataSet которые получаются в результате джойнов или операций агрегаций. Для
structured streaming эта конфигурация не может быть изменена в рантайме между двумя рестартами из одной и той же контрольной точки.
Скоро состоится открытое занятие «Дата инженер и Spark в новых реалиях». На этом открытом уроке разберем:
— Как изменятся источники и получатели данных, объемы данных, языки для ETL, кластера, облака и IDE.
— Как изменится потребность на рынке в дата инженере и к чему нужно быть готовым.
— Обсудим open source технологии, примеры миграционных проектов.
Регистрируйтесь по ссылке.