Знакомимся с PySpark

Фреймворк с открытым исходным кодом Apache Spark, входящий в экосистему проектов Apache Hadoop, используется для реализации распределённой обработки данных. Для работы в Spark могут использоваться различные языки программирования: Scala, Java, Python и R.

В рамках данной статьи мы не будем рассуждать о преимуществах использования того или иного языка, на эту тему достаточно публикаций. Мы рассмотрим PySpark — фреймворк с открытым исходным кодом, построенный на базе Apache Spark и предназначенный для упрощения и ускорения решения масштабных задач обработки данных и аналитики. PySpark предлагает высокоуровневый API для языка программирования Python, что позволяет легко интегрироваться с существующими экосистемами Python.

Но для начала рассмотрим архитектуру Apache Spark.

Архитектура Spark

Приложения, работающие в Spark запускаются как независимые наборы процессов на кластере, тем самым обеспечивая их распределенное выполнение. Для контроля работы этих процессов используется объект SparkContext в вашей основной программе (также называемой программой‑драйвером).

Драйвер SparkContext может подключаться к нескольким типам менеджеров кластеров: автономному менеджеру кластеров Spark, Mesos или YARN. Менеджер распределяет ресурсы между приложениями.

После подключения Spark назначает исполнителей на узлах кластера — процессы, которые выполняют вычисления и хранят данные для вашего приложения.

Затем SparkContext отправляет код приложения, в нашем случае написанный на Python исполнителям для выполнения.

ad4c295b49ef978ca63f542508857d06.png

Как видно, принцип работы Apache Spark достаточно прост. Далее давайте посмотрим из каких основных компонентов состоит Spark.

6e70e6c274c3a6d61bda3bad10f9bee0.png

Spark Core отвечает за все основные функции ввода‑вывода, планирование и мониторинг заданий на кластерах Spark, диспетчеризацию, сетевое взаимодействие с различными системами хранения данных, восстановление после сбоев и эффективное управление памятью.

Spark SQL — это модуль Spark для обработки структурированных данных. В основном он используется для выполнения SQL‑запросов.

Spark Streaming — это расширение основного интерфейса Spark API, обеспечивающее масштабируемую, высокопроизводительную и отказоустойчивую обработку потоков данных в реальном времени. Данные могут поступать из различных источников, таких как Kafka, Flume, Kinesis или TCP‑сокеты.

MLlib — это библиотека машинного обучения в Spark. Ее цель — сделать практическое машинное обучение масштабируемым и простым.

И, наконец, GraphX — это компонент для графов и графо‑параллельных вычислений.

Для получения общего представления об архитектуре и компонентах Spark, полагаю, информации будет достаточно. Теперь перейдем непосредственно к рассмотрению PySpark.

Используем PySpark

PySpark — это фреймворк с открытым исходным кодом, построенный на базе Apache Spark и предназначенный для упрощения и ускорения решения задач обработки данных и аналитики. Он предлагает высокоуровневый API для языка программирования Python, что позволяет легко интегрироваться с существующими экосистемами Python.

PySpark позволяет быстро обрабатывать значительные объемы данных. При этом, он может работать с различными форматами данных, поддерживает сложные аналитические операции.

В целом, различные возможности PySpark представлены на следующем рисунке:

05c6999c9bc0d94c53c949578ecccf4c.png

Но мы не будем долго останавливаться на функционале, а перейдем сразу к практической части.

Установка PySpark

Для установки Spark нужно перейти на страницу с загрузками, выбрать релиз и тип пакета и загрузить архив. Установить PySpark можно с помощью pip, выполнив:

pip install pyspark

Ну, а те, кто по‑настоящему ленив ценит свое время и просто хочет поэкспериментировать с фреймворком, могут воспользоваться готовым контейнером, в котором уже есть все необходимое, для работы PySpark.

docker container run -it ghcr.io/dbca-wa/pyspark-docker /opt/spark/bin/pyspark

В образе этого контейнера, пожалуй, самое большое количество слоев, которое мне приходилось видеть, да и размер довольно внушительный, но при этом сам контейнер прекрасно работает.

Работа с PySpark

Рассмотрим несколько примеров работы с PySpark. Начнем с самого простого — работы с RDD. В PySpark устойчивые распределенные наборы данных (Resilient Distributed Datasets, RDD) являются основной абстракцией, представляющей коллекцию объектов. RDD устойчивы к сбоям, распределены по кластеру и могут работать параллельно.

Создадим RDD вручную, посчитаем число элементов и выведем значение на экран:

rdd = sc.parallelize([1, 2, 3, 4, 5])
count = rdd.count()
print(count)
f6ec9d23515d9ba11bd22603f4efacf9.png

Конечно, помещать данные в RDD можно не только вручную. Например, если мы хотим загрузить данные из файла, то можно воспользоваться следующей функцией:

text_file_rdd = sc.textFile("path_to_file.txt")

При этом каждая строка из текстового файла станет элементом RDD.

Также может оказаться полезным кэширование RDD, когда один и тот же RDD необходимо использовать несколько раз. Кэшированные RDD хранятся в памяти, что снижает необходимость их повторного вычисления.

Доработаем наш пример:

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.cache()
count = rdd.count()
print(count)

Теперь RDD кэшируется, и все последующие действия выполняются быстрее.

Но это были довольно простые примеры и теперь давайте посмотрим более сложные случаи.

Работа с данными

Перед началом работы с данными, познакомимся с DataFrame. DataFrame — это распределенный набор данных, состоящий из данных, расположенных в строках и столбцах с именованными атрибутами. Он имеет сходство с таблицами реляционных баз данных или фреймами данных Python, но включает в себя сложные оптимизации.

Использование списка — один из самых простых способов создания DataFrame. Если у вас уже есть RDD, вы можете легко преобразовать его в DataFrame, используя функцию createDataFrame().

Для начала создадим DataFrame:

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

Для того, чтобы понять, что у нас получилось, выполним df.show()

6d9a29d106d78c8172afe69124825fd6.png

У нас есть таблица со столбцами и значениям в соответствующих столбцах. Для того, чтобы выполнить к примеру, сортировку по полю firstname нам достаточно выполнить следующую команду:

df.sort("firstname").show(truncate=False)
7d13ae7ee9248f3a8da3481ff4e13fbb.png

Если мы хотим отсортировать значения по какому‑либо полю, то необходимо воспользоваться df.filter.

Так, в следующем примере мы отфильтруем записи по значению поля gender равному M:

 df.filter(df.gender == "M").show(truncate=False)
4f67e60869e5bf2db42ec9d93d588bd1.png

Это основные функции для работы с DataFrame. Далее мы поговорим о работе с SQL.

Работаем с SQL

Фреймворк PySpark позволяет использовать SQL без особых сложностей. Мы можем выполнять SQL запросы на DataFrame и обрабатывать получаемые данные с помощью модуля PySpark SQL.

Для выполнения SQL‑запросов используется метод sql() объекта SparkSession, который возвращает возвращает DataFrame.

Прежде всего нам необходимо создать таблицу, к которой мы затем будем делать запросы.

df.createOrReplaceTempView("PERSON_DATA")

Выполним запрос по всем полям к нашей созданной таблице, затем выведем схему и результаты запроса:

df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()
bf41f4a0c74ba95e63aba5f39f898d59.png

Попробуем немного усложнить запрос, посчитав количество записей по значениям поля gender:

groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()
a87269ffbbd6a8b5e975b03f52dfd521.png

Заключение

Фреймворк PySpark обладает достаточно широким функционалом, позволяющим эффективно работать с Apache Spark обрабатывая большие объемы данных с высокой скоростью. В этой статье мы рассмотрели основные функции, входящие в PySpark.

В завершение рекомендуем обратить внимание на открытые уроки, которые пройдут в рамках онлайн-курса Otus «Spark developer»:

  • 3 марта: «Обработка графов на Spark». Подробнее

  • 11 марта: «Обработка геопространственных и временных данных на Spark». Подробнее

  • 19 марта: «Обзор Spark API». Подробнее

© Habrahabr.ru