Запуск регулярных задач на кластере или как подружить Apache Spark и Oozie
Давно уже витала в воздухе необходимость реализовать запуск регулярных Spark задач через Oozie, но всё руки не доходили и вот наконец свершилось. В этой статье хочу описать весь процесс, возможно она упростит Вам жизнь.
Содержание
- Задача
- Оборудование и установленное ПО
- Написание Spark задачи
- Написание workflow.xml
- Написание coordinator.xml
- Размещение проекта на hdfs
- Запуск регулярного выполнения
- Заключение
Задача
Мы имеем следующую структуру на hdfs:
hdfs://hadoop/project-MD2/data
hdfs://hadoop/project-MD2/jobs
hdfs://hadoop/project-MD2/status
В директория data ежедневно поступают данные и раскладываются по директориям в соответствие с датой. Например, данные за 31.12.2017 запишутся по следующему пути: hdfs://hadoop/project/data/2017/12/31/20171231.csv.gz
.
Формат входных данных
- Разделитель строк:»\n»
- Разделитель столбцов:»;»
- Способ сжатия: gzip
- Количество столбцов: 5 (device_id; lag_A0; lag_A1; flow_1; flow_2)
- Заголовок в первой строке отсутствует
- Данные за предыдущие сутки гарантированно записывается в соответствующую директория в интервал времени с 00:00 до 03:00 следующих суток.
В директории jobs
располагаются задачи, которые имеют непосредственное отношение к проекту. Нашу задачу мы также будем размещать в этом каталоге.
В директорию status
должна сохраняться статистика по количеству пустых полей за каждый день в формате json. Например, для данных за 31.12.2017 должен будет появиться файл hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json
Примет json файла:
{
"device_id_count_empty" : 0,
"lag_A0_count_empty" : 10,
"lag_A1_count_empty" : 0,
"flow_1_count_empty" : 37,
"flow_2_count_empty" : 100
}
Оборудование и установленное ПО
В нашем распоряжение есть кластер из 10 машин, каждая из которых имеет 8-и ядерный процессор и оперативную память в размере 64 Гб. Общий объём жёстких дисков на всех машинах 100 Тб. Для запуска задач на кластере отведена очередь PROJECTS
.
Установленное ПО:
- Apache Hadoop 2.7.3 (Hortonworks)
- Apache Spark 2.0.0
- Apache Oozie 4.2.0
- Scala 2.11.11
- Sbt 1.0.2
Написание Spark задачи
Создадим структуру проекта, это можно очень просто сделать в любой среде разработки, поддерживающей scala или из консоли, как показано ниже:
mkdir -p daily-statistic/project
echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties
echo "" > daily-statistic/project/plugins.sbt
echo "" > daily-statistic/build.sbt
mkdir -p daily-statistic/src/main/scala
Замечательно, теперь добавил плагин для сборки, для этого в файле daily-statistic/project/plugins.sbt
добавляем следующую строку:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
Добавил описание проекта, зависимости и особенности сборки в файл daily-statistic/build.sbt
:
name := "daily-statistic"
version := "0.1"
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.0" % "provided"
)
assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
Перейдём в директорию daily-statistic
и выполнил команду sbt update
, для обновления проекта и подтягивания зависимостей из репозитория.
Создаём Statistic.scala
в директории src/main/scala/ru/daily
Код задачи:
package ru.daily
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
object Statistic extends App {
// инициализация
implicit lazy val spark: SparkSession = SparkSession.builder()
.appName("daily-statistic")
.getOrCreate()
import spark.implicits._
val workDir = args(0)
val datePart = args(1)
val saveDir = args(2)
try {
val date = read(s"$workDir/$datePart/*.csv.gz")
.select(
'_c0 as "device_id",
'_c1 as "lag_A0",
'_c2 as "lag_A1",
'_c3 as "flow_1",
'_c4 as "flow_2"
)
save(s"$saveDir/$datePart", agg(date))
} finally spark.stop()
// чтение исходных данных
def read(path: String)(implicit spark: SparkSession): DataFrame = {
val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip")
spark.read
.options(inputFormat)
.csv(path)
}
// построение агрегата
def agg(data: DataFrame):DataFrame = data
.withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0))
.withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0))
.withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0))
.withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0))
.withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0))
.agg(
sum('device_id_empty) as "device_id_count_empty",
sum('lag_A0_empty) as "lag_A0_count_empty",
sum('lag_A1_empty) as "lag_A1_count_empty",
sum('flow_1_empty) as "flow_1_count_empty",
sum('flow_2_empty) as "flow_2_count_empty"
)
// сохранение результата
def save(path: String, data: DataFrame): Unit = data.write.json(path)
}
Собираем проект командой sbt assembly
из директории daily-statistic
. После успешного завершения сборки в директории daily-statistic/target/scala-2.11
появится пакет с задачей daily-statistic-0.1.jar
.
Написание workflow.xml
Для запуска задачи через Oozie нужно описать конфигурацию запуска в файле workflow.xml
. Ниже привожу пример для нашей задачи:
oozie.launcher.mapred.job.queue.name
${queue}
${jobTracker}
${nameNode}
yarn-client
project-md2-daily-statistic
ru.daily.Statistic
${nameNode}${jobDir}/lib/daily-statistic-0.1.jar
--queue ${queue}
--master yarn-client
--num-executors 5
--conf spark.executor.cores=8
--conf spark.executor.memory=10g
--conf spark.executor.extraJavaOptions=-XX:+UseG1GC
--conf spark.yarn.jars=*.jar
--conf spark.yarn.queue=${queue}
${nameNode}${dataDir}
${datePartition}
${nameNode}${saveDir}
Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]
В блоке global
устанавливается очередь, для MapReduce задачи которая будет находить нашу задачи и запускать её.
В блоке action
описывается действие, в нашем случае запуск spark задачи, и что нужно делать при завершении со статусом ОК
или ERROR
.
В блоке spark
определяется окружение, конфигурируется задача и передаются аргументы. Конфигурация запуска задачи описывается в блоке spark-opts
. Параметры можно посмотреть в официальной документации
Если задача завершается со статусом ERROR
, то выполнение переходит в блок kill
и выводится кратное сообщение об ошибки.
Параметры в фигурных скобках, например ${queue}
, мы будем определять при запуске.
Написание coordinator.xml
Для организации регулярного запуска нам потребуется ещё coordinator.xml
. Ниже приведу пример для нашей задачи:
${workflowPath}
datePartition
${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}
Здесь из интересного, параметры frequency
, start
, end
, которые определяют частоту выполнения, дату и время начала выполнения задачи, дату и время окончания выполнения задачи соответственно.
В блоке workflow
указывается путь к директории с файлом workflow.xml
, который мы определим позднее при запуске.
В блоке configuration
определяется значение свойства datePartition
, которое в данном случае равно текущей дате в формате yyyy/MM/dd
минус 1 день.
Размещение проекта на hdfs
Как уже было сказано ранее нашу задачу мы будем размещать в директории hdfs://hadoop/project-MD2/jobs
:
hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar
hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml
hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml
hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib
Здесь в принципе всё понятно без комментариев за исключением директории sharelib
. В эту директорию мы положим все библиотеки, которые использовались в процессе создания зашей задачи. В нашем случае это все библиотеки Spark 2.0.0, который мы указывали в зависимостях проекта. Зачем это нужно? Дело в том, что в зависимостях проекта мы указали "provided"
. Это говорит системе сборки не нужно включать зависимости в проект, они будут предоставлены окружением запуска, но мир не стоит на месте, администраторы кластера могут обновить версию Spark. Наша задача может оказаться чувствительной к этому обновлению, поэтому для запуска будет использоваться набор библиотек из директории sharelib
. Как это конфигурируется покажу ниже.
Запуск регулярного выполнения
И так сё готово к волнительному моменту запуска. Мы будем запускать задачу через консоль. При запуске нужно задать значения свойствам, которые мы использовали в xml
файлах. Вынесем эти свойства в отдельный файл coord.properties
:
# описание окружения
nameNode=hdfs://hadoop
jobTracker=hadoop.host.ru:8032
# путь к директории с файлом coordinator.xml
oozie.coord.application.path=/project-MD2/jobs/daily-statistic
# частота в минутах (раз в 24 часа)
frequency=1440
startTime=2017-09-01T07:00Z
endTime=2099-09-01T07:00Z
# путь к директории с файлом workflow.xml
workflowPath=/project-MD2/jobs/daily-statistic
# имя пользователя, от которого будет запускаться задача
mapreduce.job.user.name=username
user.name=username
# директория с данными и для сохранения результата
dataDir=/project-MD2/data
saveDir=/project-MD2/status
jobDir=/project-MD2/jobs/daily-statistic
# очередь для запуска задачи
queue=PROJECTS
# использовать библиотеке из указанной директории на hdfs вместо системных
oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib
oozie.use.system.libpath=false
Замечательно, тереть всё готово. Запускаем регулярное выполнение командой:
oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run
После запуска в консоль выведется id задачи. Используя это id можно посмотреть информацию о статусе выполнения задачи:
oozie job -info {job_id}
Остановить задачу:
oozie job -kill {job_id}
Если Вы не знаете id задачи, то можно найти его, показав все регулярные задачи для вашего пользователя:
oozie jobs -jobtype coordinator -filter user={user_name}
Заключение
Полечилось немного затянуто, но на мой взгляд лучше подробная инструкция чем квест-поиск по интернету. Надеюсь описанный опыт будет Вам полезен, спасибо за внимание!