Запуск регулярных задач на кластере или как подружить Apache Spark и Oozie

59cd511673a9c343087598.png


Давно уже витала в воздухе необходимость реализовать запуск регулярных Spark задач через Oozie, но всё руки не доходили и вот наконец свершилось. В этой статье хочу описать весь процесс, возможно она упростит Вам жизнь.


Содержание


  • Задача
  • Оборудование и установленное ПО
  • Написание Spark задачи
  • Написание workflow.xml
  • Написание coordinator.xml
  • Размещение проекта на hdfs
  • Запуск регулярного выполнения
  • Заключение


Задача


Мы имеем следующую структуру на hdfs:


hdfs://hadoop/project-MD2/data
hdfs://hadoop/project-MD2/jobs
hdfs://hadoop/project-MD2/status


В директория data ежедневно поступают данные и раскладываются по директориям в соответствие с датой. Например, данные за 31.12.2017 запишутся по следующему пути: hdfs://hadoop/project/data/2017/12/31/20171231.csv.gz.


Формат входных данных

  • Разделитель строк:»\n»
  • Разделитель столбцов:»;»
  • Способ сжатия: gzip
  • Количество столбцов: 5 (device_id; lag_A0; lag_A1; flow_1; flow_2)
  • Заголовок в первой строке отсутствует
  • Данные за предыдущие сутки гарантированно записывается в соответствующую директория в интервал времени с 00:00 до 03:00 следующих суток.


В директории jobs располагаются задачи, которые имеют непосредственное отношение к проекту. Нашу задачу мы также будем размещать в этом каталоге.
В директорию status должна сохраняться статистика по количеству пустых полей за каждый день в формате json. Например, для данных за 31.12.2017 должен будет появиться файл hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json


Примет json файла:

{
   "device_id_count_empty" : 0, 
   "lag_A0_count_empty" : 10, 
   "lag_A1_count_empty" : 0, 
   "flow_1_count_empty" : 37, 
   "flow_2_count_empty" : 100
}


Оборудование и установленное ПО


В нашем распоряжение есть кластер из 10 машин, каждая из которых имеет 8-и ядерный процессор и оперативную память в размере 64 Гб. Общий объём жёстких дисков на всех машинах 100 Тб. Для запуска задач на кластере отведена очередь PROJECTS.


Установленное ПО:

  • Apache Hadoop 2.7.3 (Hortonworks)
  • Apache Spark 2.0.0
  • Apache Oozie 4.2.0
  • Scala 2.11.11
  • Sbt 1.0.2


Написание Spark задачи


Создадим структуру проекта, это можно очень просто сделать в любой среде разработки, поддерживающей scala или из консоли, как показано ниже:


mkdir -p daily-statistic/project
echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties
echo "" > daily-statistic/project/plugins.sbt
echo "" > daily-statistic/build.sbt
mkdir -p daily-statistic/src/main/scala


Замечательно, теперь добавил плагин для сборки, для этого в файле daily-statistic/project/plugins.sbt добавляем следующую строку:


addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")


Добавил описание проекта, зависимости и особенности сборки в файл daily-statistic/build.sbt:


name := "daily-statistic"

version := "0.1"

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided"
)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"


Перейдём в директорию daily-statistic и выполнил команду sbt update, для обновления проекта и подтягивания зависимостей из репозитория.
Создаём Statistic.scala в директории src/main/scala/ru/daily


Код задачи:

package ru.daily

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

object Statistic extends App {

   // инициализация   
   implicit lazy val spark: SparkSession = SparkSession.builder()
     .appName("daily-statistic")
     .getOrCreate()

   import spark.implicits._

   val workDir = args(0)
   val datePart = args(1)
   val saveDir = args(2)

   try {

      val date = read(s"$workDir/$datePart/*.csv.gz")
         .select(
            '_c0 as "device_id",
            '_c1 as "lag_A0",
            '_c2 as "lag_A1",
            '_c3 as "flow_1",
            '_c4 as "flow_2"
         )

         save(s"$saveDir/$datePart", agg(date))

   } finally spark.stop()

   // чтение исходных данных   
   def read(path: String)(implicit spark: SparkSession): DataFrame = {

      val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip")

      spark.read
         .options(inputFormat)
         .csv(path)
   }

   // построение агрегата
   def agg(data: DataFrame):DataFrame = data
      .withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0))
      .withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0))
      .withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0))
      .withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0))
      .withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0))
      .agg(
         sum('device_id_empty) as "device_id_count_empty",
         sum('lag_A0_empty) as "lag_A0_count_empty",
         sum('lag_A1_empty) as "lag_A1_count_empty",
         sum('flow_1_empty) as "flow_1_count_empty",
         sum('flow_2_empty) as "flow_2_count_empty"
      )

   // сохранение результата
   def save(path: String, data: DataFrame): Unit = data.write.json(path)

} 


Собираем проект командой sbt assembly из директории daily-statistic. После успешного завершения сборки в директории daily-statistic/target/scala-2.11 появится пакет с задачей daily-statistic-0.1.jar.


Написание workflow.xml


Для запуска задачи через Oozie нужно описать конфигурацию запуска в файле workflow.xml. Ниже привожу пример для нашей задачи:




   
      
         
            oozie.launcher.mapred.job.queue.name
            ${queue}
         
      
   

   

   
      
         ${jobTracker}
         ${nameNode}
         yarn-client
         project-md2-daily-statistic
         ru.daily.Statistic
         ${nameNode}${jobDir}/lib/daily-statistic-0.1.jar
         
            --queue ${queue}
            --master yarn-client
            --num-executors 5
            --conf spark.executor.cores=8
            --conf spark.executor.memory=10g
            --conf spark.executor.extraJavaOptions=-XX:+UseG1GC
            --conf spark.yarn.jars=*.jar
            --conf spark.yarn.queue=${queue}
         
         ${nameNode}${dataDir}
         ${datePartition}
         ${nameNode}${saveDir}
       

       
       

   

   
      Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]
   

   


В блоке global устанавливается очередь, для MapReduce задачи которая будет находить нашу задачи и запускать её.
В блоке action описывается действие, в нашем случае запуск spark задачи, и что нужно делать при завершении со статусом ОК или ERROR.
В блоке spark определяется окружение, конфигурируется задача и передаются аргументы. Конфигурация запуска задачи описывается в блоке spark-opts. Параметры можно посмотреть в официальной документации
Если задача завершается со статусом ERROR, то выполнение переходит в блок kill и выводится кратное сообщение об ошибки.
Параметры в фигурных скобках, например ${queue}, мы будем определять при запуске.


Написание coordinator.xml


Для организации регулярного запуска нам потребуется ещё coordinator.xml. Ниже приведу пример для нашей задачи:



    
        
            ${workflowPath}
            
                
                    datePartition
                    ${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}
                
            
        
    


Здесь из интересного, параметры frequency, start, end, которые определяют частоту выполнения, дату и время начала выполнения задачи, дату и время окончания выполнения задачи соответственно.
В блоке workflow указывается путь к директории с файлом workflow.xml, который мы определим позднее при запуске.
В блоке configuration определяется значение свойства datePartition, которое в данном случае равно текущей дате в формате yyyy/MM/dd минус 1 день.


Размещение проекта на hdfs

Как уже было сказано ранее нашу задачу мы будем размещать в директории hdfs://hadoop/project-MD2/jobs:


hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar
hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml
hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml
hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib


Здесь в принципе всё понятно без комментариев за исключением директории sharelib. В эту директорию мы положим все библиотеки, которые использовались в процессе создания зашей задачи. В нашем случае это все библиотеки Spark 2.0.0, который мы указывали в зависимостях проекта. Зачем это нужно? Дело в том, что в зависимостях проекта мы указали "provided". Это говорит системе сборки не нужно включать зависимости в проект, они будут предоставлены окружением запуска, но мир не стоит на месте, администраторы кластера могут обновить версию Spark. Наша задача может оказаться чувствительной к этому обновлению, поэтому для запуска будет использоваться набор библиотек из директории sharelib. Как это конфигурируется покажу ниже.


Запуск регулярного выполнения


И так сё готово к волнительному моменту запуска. Мы будем запускать задачу через консоль. При запуске нужно задать значения свойствам, которые мы использовали в xml файлах. Вынесем эти свойства в отдельный файл coord.properties:


# описание окружения
nameNode=hdfs://hadoop
jobTracker=hadoop.host.ru:8032

# путь к директории с файлом coordinator.xml
oozie.coord.application.path=/project-MD2/jobs/daily-statistic

# частота в минутах (раз в 24 часа)
frequency=1440
startTime=2017-09-01T07:00Z
endTime=2099-09-01T07:00Z

# путь к директории с файлом workflow.xml
workflowPath=/project-MD2/jobs/daily-statistic

# имя пользователя, от которого будет запускаться задача
mapreduce.job.user.name=username
user.name=username

# директория с данными и для сохранения результата
dataDir=/project-MD2/data 
saveDir=/project-MD2/status
jobDir=/project-MD2/jobs/daily-statistic 

# очередь для запуска задачи
queue=PROJECTS

# использовать библиотеке из указанной директории на hdfs вместо системных
oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib
oozie.use.system.libpath=false


Замечательно, тереть всё готово. Запускаем регулярное выполнение командой:


oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run


После запуска в консоль выведется id задачи. Используя это id можно посмотреть информацию о статусе выполнения задачи:


oozie job -info {job_id}


Остановить задачу:


oozie job -kill {job_id}


Если Вы не знаете id задачи, то можно найти его, показав все регулярные задачи для вашего пользователя:


oozie jobs -jobtype coordinator -filter user={user_name}


Заключение


Полечилось немного затянуто, но на мой взгляд лучше подробная инструкция чем квест-поиск по интернету. Надеюсь описанный опыт будет Вам полезен, спасибо за внимание!

© Habrahabr.ru