Leatherman для разработчика в Big Data

?v=1

Экосистема Big Data, а для определенности — Hadoop, достаточно большая, и включает в себя множество продуктов. Какие-то применяются чаще, какие-то реже. Но один из них в нашей команде мы выбрали для себя в качестве универсального инструмента «на все случаи жизни» — на нем пишутся как одноразовые скрипты, так и постоянно работающие приложения (в первую очередь — отчеты).

Этот инструмент — Spark Shell. Обычно такую штуку называют швейцарский нож, но лично я предпочитаю мультитулы Leatherman.
Фактически, Spark Shell — это Scala REPL, то есть обычная для scala интерактивная среда прототипирования и отладки, плюс Apache Spark. Тут можно вводить код с клавиатуры, при этом законченные предложения сразу вычисляются. А если при запуске указать параметр -i , то сразу будет прочитан (и интерпретирован) этот файл , написанный на scala. Можно загрузить код из файла и в интерактивном режиме, для этого есть команды, начинающеся с символа :. При этом написанный вами код может пользоваться средствами спарка для работы с большими данными, то есть например, выполнять SQL запросы к Hive, или читать/писать данные в HDFS, или работать с чем-то типа HBase, например.

Spark Shell — часть дистрибутива Apache Spark. Про то, как установить Spark на машине разработчика, во-первых уже есть много описаний, а во-вторых, возможно много вариантов. Поэтому эту часть опустим, и начнем сразу с запуска.

spark-shell [-i ]


Посмотрим, что там за версия:

...
Spark context Web UI available at http://192.168.1.12:4040

scala> scala.util.Properties.versionString
res1: String = version 2.11.8


А версия самого спарк?

scala> spark.version
res0: String = 2.2.0


Вы можете вводить построчно код на scala, или вставлять целые многострочные куски (для этого есть например команда : paste):

2+2
res0: Int = 4


Соответственно, мы ввели выражение, оно вычислилось, и мы получили ответ, который называется res0, и имеет тип Int. Дальше мы можем его использовать в следующих выражениях. Или же можно было сразу завести переменную/константу, названную так, как мы хотим:

val four= 2+2
four: Int = 4


Наконец, можно воспользоваться командой : load, чтобы загрузить код из подготовленного файла.

Как обычно, справка доступна по команде: help

scala> :help
All commands can be abbreviated, e.g., :he instead of :help.
:edit |        edit history
:help [command]          print this summary or command-specific help
:history [num]           show the history (optional num is commands to show)
:h?              search the history
:imports [name name ...] show import history, identifying sources of names
:implicits [-v]          show the implicits in scope
:javap       disassemble a file or class name
:line |        place line(s) at the end of history
:load              interpret lines in a file
:paste [-raw] [path]     enter paste mode or paste a file
:power                   enable power user mode
:quit                    exit the interpreter
:replay [options]        reset the repl and replay all previous commands
:require           add a jar to the classpath
:reset [options]         reset the repl to its initial state, forgetting all session entries
:save              save replayable session to a file
:sh        run a shell command (result is implicitly => List[String])
:settings       update compiler options, if possible; see reset
:silent                  disable/enable automatic printing of results
:type [-v]         display the type of an expression without evaluating it
:kind [-v]         display the kind of expression's type
:warnings                show the suppressed warnings from the most recent line which had any


Доступ к средствам Spark


Ну хорошо, на Scala мы посмотрели, а где Spark? Оказывается, сразу при запуске для нас определены две переменные, spark и sc:

Spark context available as 'sc' (master = local[*], app id = local-1576608607085).
Spark session available as 'spark'.

То есть, это спарк контекст, и сессия. Они используются для выполнения специфичных для Spark функций.

Для начала — word count


Одно из популярных применений MapReduce, которое приводят во всех курсах для начинающих.
Подсчитаем, сколько раз каждое из слов встретилось в файле.

scala> var hFile = sc.textFile("file:///d:/tmp/inp")
scala> val wc = hFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
scala> val cnt= wc.count
cnt: Long = 3
scala> val arr= wc.collect
arr: Array[(String, Int)] = Array((bbb,1), (ccc,1), (aaa,1))

scala> val ds= wc.toDS
ds: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

scala> ds.show
+---+---+
| _1| _2|
+---+---+
|bbb|  1|
|ccc|  1|
|aaa|  1|
+---+---+

scala> val sch= ds.schema
sch: org.apache.spark.sql.types.StructType = StructType(StructField(_1,StringType,true), StructField(_2,IntegerType,false))

scala> val fldn= ds.schema.fieldNames
fldn: Array[String] = Array(_1, _2)


В итоге, в несколько строк мы: прочитали текстовый файл с локального диска, разбили его на слова (по пробелам), преобразовали каждое слово в кортеж (слово, 1), и выполнили подсчет числа слов операцией reduceByKey (в данном случае ключом является само слово).

Затем мы полученный RDD посчитали, получили его в массив, преобразовали в Dataset, и посмотрели на этот самый Dataset с разных сторон, включая схему.

Можно было сразу построить Dataset, используя новый API и spark:

scala> var ds= spark.read.text("file:///d:/tmp/inp")


Казалось бы, что тут особенного? Ну, для начала это довольно удобно. Скала, по крайней мере до определенного предела, язык лаконичный и простой. А во-вторых, что более важно, вы можете точно так же, не меняя ничего в коде (ну, или почти ничего) запустить этот скрипт для подсчета слов в 1 или в 100 терабайтах текста. Разумеется, для этого потребуется железо, потому что выполнить такое можно только на множестве машин, распределив между ними нагрузку.

И еще для этого потребуется некоторая настройка, потому что спарк далеко не всегда может правильно выбрать, каким образом ему параллелить задачу. В ваши функции, как разработчика или администратора кластера, входит указать ему, сколько и каких ресурсов он может употребить на решение задачи. И для 100 терабайт это совершенно отдельная история, которая заслуживает даже не статьи, но книги.

Чем вы еще можете спарку помочь?

И чего, не подумав, делать не следует


В сети достаточно много описаний работы с данными в Spark, как через RDD API, так и с более новым Dataset API. Вряд ли тут можно дополнить чем-то существенным, но некоторые замечания все-же хотелось бы озвучить.

Чуть ли не самый частый пример, который вы можете в сети увидеть — это подсчет числа строк в датасете. И я его тоже выше показал. Ну то есть:

val ds= spark.read.text("путь к файлу")
... обработка, например фильтрация
println ds.count
... делаем что-то еще с ds


Хотелось бы заметить, что эта операция может быть довольно дорогой. А точнее, до этого момента, работая в интерактивном режиме, вы получаете ответы достаточно быстро, а вот тут ответ может быть получен весьма не сразу.

Почему? Начнем издалека. Dataset — это ленивая структура. Пока вы не выполните над ним операцию, которой реально нужны данные — он представляет собой скорее не данные, а инструкции по их обработке. То есть, что-то примерно такое:

  • прочитать данные из файла такого-то, папки или таблицы
  • отфильтровать их по такому-то условию
  • отобрать колонки (в том числе в виде выражений)
  • выполнить группировку, сортировку и т.п.


Можно посмотреть на все это, запросив план датасета (логический или физический). В нем вы увидите доступ к таблицам Hive или папкам, разного рода UNION, проекции (выборка подмножества колонок), фильтрацию и т.п.

Spark (как в общем-то и Hadoop) работает с данными в разных форматах. Например, с данными в формате Parquet, которые хранятся в HDFS, то есть в распределенной системе, на разных узлах кластера. То есть, это данные, которые в общем случае не содержат в себе статистик, указывающих, что тут, допустим, датасет из 100 млн записей. Схема обычно есть —, а вот статистик зачастую нет.

Когда вы «читаете» датасет, выполняя spark.read…, вы на самом деле еще не считываете сами данные, хранящиеся в указанном файле (или папке, или таблице Hive), которые разбросаны по неизвестному числу узлов кластера, и размер которых мы еще не знаем. А в общем случае — и не можем узнать, потому что многие из форматов, с которыми Spark (и Hadoop) умеет работать, не содержат метаданных, говорящих нам о числе строк данных. Чтобы узнать число — нужно прочесть сами данные, и подсчитать.

То есть, нужно физически прочитать все файлы (которых может быть много терабайт, и у которых в общем случае нет даже таких метаданных, где было бы записано, что файл пуст). А дальше — выполнить инструкции, заложенные в датасет (например, отфильтровать данные).

В итоге, попросив у спарка число строк, вы фактически вынудили его вычислить для вас терабайтовый датасет. Именно в этот момент — до этого ленивый датасет был лишь набором инструкций по его построению.

Если же дальше, узнав это число, вы захотите что-то еще с датасетом проделать, его, вполне возможно, придется вычислить повторно (потому что промежуточные результаты сами по себе не сохраняются).

С другой стороны, если сделать например вот так:

val ds= spark.read.text("путь к файлу")
println ds.toRDD.isEmpty


вы можете узнать, пуст ли датасет. Очевидная разница состоит в том, что для определения непустоты датасета не нужно будет читать все файлы и блоки данных, а достаточно найти первый непустой. Таким образом, эта, на первый взгляд, аналогичная операция, может быть значительно дешевле предыдущей.

Почти такая же ситуация с операцией show над датасетом. Да, это чрезвычайно удобно, что вы можете посмотреть промежуточные результаты, просто выполнив ds.show, но надо понимать, что если ваш датасет, допустим, был получен изначально как-то так:

val ds= spark.read.jdbc("параметры доступа к реляционной базе")
ds.show
...
прочие операции над тем же датасетом.


то выполнение .show приведет к выполнению SQL-запроса к базе данных, а прочие операции далее — к выполнению еще одного такого же запроса. В какой-то степени проблему может снять кеширование (ds.cache), или сохранение датасета (ds.persist), но оба способа не гарантируют вас от повторного вычисления. Почему? Да все потому же — чтобы кешировать или сохранить датасет для повторного использования, нужно где-то иметь место, куда его сохранять. Для датасета большого у вас может просто этого места не быть, и в любом случае это сохранение — это тоже работа.

В тоже время, иногда может быть выгодно просто прочитать датасет из источника, сохранить его в HDFS в некотором хорошо оптимизированном формате (типа паркета), а потом прочитать снова. В чем тут причина?

Одна из причин в том, что источник данных может иметь намного меньшую производительность, чем ваш кластер Hadoop. Если источник — что-то вроде базы данных Oracle, то одно ядро для этой базы стоит как одно ядро Hadoop + стоимость лицензии Oracle. Поэтому зачастую бывает выгоднее наращивать ядра в кластере, чем покупать новые лицензии для СУБД. Соответственно, повторное чтение данных из HDFS и базы примерно одинаково эффективно, при условии что у вас одинаковое число ядер и/или дисковых шпинделей —, но обычно это условие не выполняется, кроме того, чтение из HDFS параллелится обычно намного лучше, за счет того что данные распределены по узлам, и реплицированы — то есть, один блок доступен более чем на одном узле.

И вторая причина — в том, что вы можете сохранять уже частично вычисленный датасет. То есть, возможно отфильтрованный, с меньшим числом колонок, и существенно меньшего размера. Для базы это что-то вроде материализованной версии View.

Заметим так же, что колоночно ориентированные форматы хранения, такие как паркет, очень эффективны при выборке подмножества колонок. Так что прочитать из них лишь одну колонку сильно дешевле, чем прочитать все. И вы можете выбирать, в каком формате (строчно или колоночно ориентированном) вам сохранить данные для каждой отдельной таблицы, в зависимости от типа операций над ней.

Менее стандартные применения


Посмотрим, как в Spark Shell можно работать например с Hive MetaStore.

Базы данных Hive, кто они, и где они?


Для начала, получим свой экземпляр ExternalCatalog:

scala> val cat= spark.sharedState.externalCatalog
cat: org.apache.spark.sql.catalyst.catalog.ExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog@4a4979bf


В нашей команде прижился идентификатор cat для этой переменной, поэтому мы между собой зовем его «котик». Это API для доступа к метаданным Hive о базах, таблицах, партициях и т.п.

Посмотрим, что за базы у нас есть в Hive:

scala> cat.listDatabases
res0: Seq[String] = Buffer(default)


А вот нет у нас никаких баз кроме default, у нас и Hive-то не имеется, мы на локальном ноуте все это делаем…

Но в тоже время, мы видим, что список баз — это Seq[String], т.е. возвращается нам коллекция названий.

Дальше мы можем сделать что-то вот такое:

scala> val db= cat.getDatabase("default")
db: org.apache.spark.sql.catalyst.catalog.CatalogDatabase = CatalogDatabase(default,Default Hive database,file:./spark-warehouse,Map())

val loc= db.locationUri
loca: java.net.URI = file:.../spark-warehouse


База данных под названием default существует всегда, по умолчанию она пустая.

Что мы можем сказать про базу? Не так уж и много — это достаточно простая конструкция для Hive. У нее есть имя (default), описание (Default Hive database), location (т.е. расположение файлов таблиц по умолчанию), и свойства (набор key-value пар). Ну и еще в ней хранятся таблицы. У Hive двухуровневая структура, т.е. база→таблица, поэтому термин «база» (database) используется наравне и термином «схема» (schema), и означает одно и тоже. Далее я постараюсь применять термин база, так как у термина схема есть и другое значение, и оно нам понадобится ниже.

А теперь все тоже самое, но одновременно:

scala> cat.listDatabases.map{db=>(db, cat.getDatabase(db).locationUri)}


Что мы тут проделали? Мы получили список баз данных Hive, потом сами базы, и для каждой из них — location, т.е. расположение базы в файловой системе. И вернули мы Seq[(String, java.net.URI)], то есть последовательность кортежей, где _1 это имя базы, а _2 это location.

Посмотрим на таблицы


Таблица — штука немного интереснее. Кроме все тех же имени и описания, у таблицы также есть название базы, куда она входит. Естественно, у таблицы есть колонки, которые составляют ее схему данных. Колонки — это упорядоченный набор из имени, типа данных, и описания. Кроме обычных атомарных типов данных (числовых, строковых, бинарных, дата или timestamp), обычных для реляционных баз, в Hive есть и типы составные, а именно структуры (struct), которые эквивалентны таблице (т.е. вложенные таблицы), массивы (array), и словари (map), содержащие пары ключ→значение.

Так же, как у базы, у таблицы есть location — т.е. URI, указывающий на файлы данных. URI, в случае HDFS (а это обычно так и бывает), может указывать как на текущий кластер, так и на другой, т.е. в другой экземпляр файловой системы HDFS. location — это папка, а не файл, а файлы с данными уже лежат в ней. Также как и у базы, у таблицы есть свойства (тоже набор key-value пар), куда приложение может, например, складывать собранную статистику.

В отличие от типовой реляционной СУБД, для Hive у таблицы обязателен формат хранения данных. Абстрактного формата «по умолчанию» не бывает. Набор форматов расширяемый, фактически — формат хранения базы это набор из нескольких Java классов, которые умеют с ним работать. Например, набор классов, умеющих работать с CSV. А еще — их параметры (для CSV это разделители, наличие кавычек или апострофов, метасимвол для кодирования разделителей и ограничителей внутри данных колонки). Можно указать эти классы явно (когда мы работаем с API — то всегда так), а можно просто сообщить Hive, что таблица хранится в формате Parquet, например, или в виде текста (несколько стандартных форматов).

Общая схема преобразования данных при чтении и записи такова:

Файлы в HDFS –>
Hadoop InputFileFormat –>
–>
Deserializer –>
объект Row
объект Row –>
Serializer –>
–>
OutputFileFormat –>
файлы в HDFS

Тут важно помнить, что от key/value Hive использует только value. А так называемый интерфейс SerDe, или его конкретная реализация, отвечает за то, чтобы преобразовать это значение в набор колонок таблицы.

По сравнению с обычными реляционными СУБД Hive не умеет делать (или не делает) некоторые вещи самостоятельно. Например, не поддерживает ключи, и не создает индексы для них, не сортирует данные, и еще многое другое. Все это делает приложение, а Hive мы сообщаем, что данные таблицы будут например отсортированы или партиционированы определенным образом.

Побегаем по HDFS


API для доступа к файловой системе в Hadoop (как HDFS, так и локальной) представляет собой по большей части класс FileSystem:

import org.apache.hadoop.fs.FileSystem

val fs= FileSystem.get(sc.hadoopConfiguration)


Мы получили экземпляр FileSystem, построенный на основе конфигурации Hadoop, взятой из Spark. Это будет файловая система HDFS того кластера, на котором мы сейчас работаем.

Ну, а дальше начинается довольно обычная возня с методами FileSystem и вспомогательных классов типа Path:

import org.apache.hadoop.fs.{Path,FileStatus}
fs.exists(new Path(path))
val status= fs.getFileStatus(new Path(path))
val files = fs.listStatus(new Path(path))
val cs= fs.getContentSummary(new Path(path))


Ну то есть, проверить существование файла по пути, получить его статус (где, например, написаны даты модификации, или признак того, что это папка), посмотреть список файлов в папке (на самом деле статусов), или скажем summary (где есть занимаемое место, число файлов и папок внутри, и т.д.). Ну в общем обычный такой API, не похожий на Java IO, но вполне рабочий.

Если мы хотим работать с содержимым файла, то можно получить входной или выходной поток, обычный java.io.Stream:

val is = new BufferedInputStream(fs.open(new Path(path)))
Source.fromInputStream(is).getLines().foreach(mapper)


Таким образом мы читаем из файла в HDFS все строки, и применяем к каждой функцию mapper. Аналогично для записи: fs.create (p: Path): OutputStream.

Методов у FIleSystem много, поэтому за полной справкой отошлю к документации.


Итак, у нас есть инструмент, в котором можно интерактивно попрограммировать на скале, воспользоваться средствами спарка для анализа данных (насколько больших — зависит от имеющихся у нас ресурсов, и от того, как долго мы согласны подождать результата), а также посмотреть на разные аспекты Hadoop, Hive (на самом деле — почти всех продуктов экосистемы, так как в конечном счете это scala, и мы можем пользоваться любыми API, доступными для Java приложений.

Все это делает Spark Shell очень удобным инструментом для создания приложений самого разнообразного назначения, таких как отчеты, средства мониторинга, ну и ETL немножко. В первую очередь, конечно же, Spark Shell, как и другие похожие скриптовые языки, подходит на роль «клея», на котором мы можем собрать приложение из готовых компонентов. Как именно собирать, и как должны выглядеть компоненты в случае Spark — это уже другая история.

© Habrahabr.ru