Обзор фреймворка Luigi для построения последовательностей выполнения задач

Доброго времени суток! У нас открылось совершенно новое направление обучения — BigData, а это значит, что немного расширяется горизонт материалов, которыми мы будем делиться с вами. Сегодня рассмотрим Luigi, как часть того, что раскрывается на нашем курсе.

Luigi — фреймворк на языке Python для построения сложных последовательностей по выполнению зависимых задач. Довольно большая часть фреймворка направлена на преобразования данных из различных источников (MySql, Mongo, redis, hdfs) и с помощью различных инструментов (от запуска процесса до выполнения задач разных типов на кластере Hadoop). Разработан в компании Spotify и открыт в виде open source инструмента в 2012 году.

Самое главное преимущество фреймворка — возможность выстраивать последовательности зависимых задач. Фреймворк разрешает зависимости, отслеживает граф выполнения, управляет запуском задач, обрабатывает ошибки с возможностью перезапуска нужных задач, распределяет ресурсы рабочих процессов с возможностью параллельной работы независимых частей графа задач.

Для выполнения всех этих задач существуют и другие инструменты. Это Oozie, Pinball, Airflow (находится в статусе инкубации в Apache — проходит различные проверки, недавно вышел обзор на хабре). В данной статье рассмотрим только Luigi.

59de7ef4aa4ad086175249.jpeg

Установка и документация

Для установки можно воспользоваться командой:

pip install luigi


Документация доступна тут

Задача (Task)

В файле luigi_demo_tasks.py определяем класс, наследуемый от luigi.Task. Добавляем вызов run для возможности запуска из консоли.

from luigi import Task, run

class MyTask(Task):
   pass


if __name__ == '__main__':
   run()


Запускаем. Дополнительно указываем опцию --local-scheduler, чтобы пока что не обращаться к центральному планировщику задач.

python -m luigi_demo_tasks MyTask --local-scheduler


Примечание. В документации указан другой способ запуска без вызова run и с добавлением директории в PYTHONPATH.

Видим следующий результат:

DEBUG: Checking if MyTask() is complete
/usr/local/lib/python3.4/dist-packages/luigi/worker.py:334: UserWarning: Task MyTask() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   MyTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) running   MyTask()
INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host username=username, pid=5369) done      MyTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MyTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 MyTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====


В сообщениях видим, что задача MyTask ставится на выполнение и успешно выполняется. Повторный запуск дает точно такой же результат.

Сделаем теперь так, чтобы MyTask выполнял некоторую работу. Для этого переопределим метод run из базового класса:

from luigi import Task, run

class MyTask(Task):
   def run(self):
       print("Hello world!")


if __name__ == '__main__':
   run()


В информации о выполнении задачи увидим следующее:

INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) running   MyTask()
Hello world!
INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) done      MyTask()


Гарантия однократного выполнения задачи

Часто бывает так, что необходимо выполнить некую задачу единожды. Например из-за того, что её выполнение является ресурсоёмким. В Luigi задача считается сделанной, если сгенерирован некий объект (файл на машине, файл в hdfs, артефакт в MySql, таблица в Hive и другие), и можно проверить его существование. Для указания объекта необходимо переопределить в задаче метод output и в нем вернуть любого наследника или наследников класса Target. Для примера будем использовать LocalTarget — файл в локальной файловой системе.

from luigi import Task, run, LocalTarget

class MyTask(Task):

   filename = "hello_file.txt"

   def run(self):
       with open(self.filename, 'w') as f:
           f.write("Hello world!")

   def output(self):
       return LocalTarget(self.filename)


if __name__ == '__main__':
   run()


Первый запуск задачи генерирует файл hello_file.txt. Повторный запуск задачи сообщает нам, что все задачи выполнены.

Зависимые задачи

В luigi задачи могут зависеть от других задач. Для указания зависимости от другой задачи необходимо переопределить метод requires. В нем вернуть объект класса любой другой задачи. Определим две зависимые задачи, каждая из которых пишет файл.

from luigi import Task, run, LocalTarget

class MyTaskFirst(Task):

   filename = "first.txt"

   def run(self):
       with open(self.filename, 'w') as f:
           f.write("first!")

   def output(self):
       return LocalTarget(self.filename)


class MyTaskSecond(Task):

   filename = "second.txt"

   def run(self):
       with open(self.filename, 'w') as f:
           f.write("second!")

   def requires(self):
       return MyTaskFirst()

   def output(self):
       return LocalTarget(self.filename)


if __name__ == '__main__':
   run()


Запуск задачи немного изменился. Указываем для запуска самую последнюю задачу, фреймворк сам определит и выполнит все зависимости.

python -m luigi_demo_tasks MyTaskSecond --local-scheduler


Внешние зависимости

Иногда для выполнения задачи необходимы данные, генерируемые внешними системами. Так как в зависимостях в методе requires можно указывать только другие задачи, то нам понадобится задача-обертка для внешних данных. Для примера рассмотрим задачу подсчета частоты каждого символа в файле hello_file.txt:

from collections import defaultdict

from luigi import Task, run, LocalTarget, ExternalTask


class ExternalData(ExternalTask):
   def output(self):
       return LocalTarget("hello_file.txt")


class TaskWithExternalData(Task):

   filename = "char_counts.txt"

   def run(self):
       frequencies = defaultdict(int)

       with open(self.requires().output().path) as f_in:
           for line in f_in:
               for c in line:
                   frequencies[c] += 1

       with open(self.filename, 'w') as f_out:
           for c, count in frequencies.items():
               f_out.write('{}\t{}\n'.format(c, count))

   def requires(self):
       return ExternalData()

   def output(self):
       return LocalTarget(self.filename)


if __name__ == '__main__':
   run()


Планировщик

Для централизованного выполнения задач можно использовать центральный планировщик. Его основные задачи: обеспечить отсутствие одновременного выполнения одинаковых задач, предоставить визуализацию всех запущенных задач с их зависимостями.

Из документации запуск планировщика:

$ luigid --background --pidfile  --logdir  --state-path 


Адрес планировщика по умолчанию:
localhost:8082/

Запустим предыдущую задачу с использованием планировщика, предварительно удалив файл char_counts.txt:

python -m luigi_demo_tasks TaskWithExternalData


В планировщике увидим обе задачи:

59de81fe3bb22538110452.png

А так же граф зависимостей:

59de82247c23f948022540.png

Параллельный запуск задач

Реализуем несколько зависимых задач, зависимость оформим в виде песочных часов: корневая задача типа 1 зависит от десяти одинаковых задач типа 2. Эти 10 задач типа 2 зависят от одной типа 3. она, в свою очередь, зависит от 10 задач типа 4, и все эти 10 зависят от одной задачи типа 5.

Каждая задача пишет в результате работы файл со своим именем и номером, а так же спит 10 секунд. Это нужно для того, чтобы в планировщике можно было проследить порядок выполнения задач. Обратите внимание на то, что в задачу можно передать параметр. Это позволяет запускать разные по сути задачи с одинаковым кодом.

Для реализации будем использовать наследование, так как это позволит сократить код. Запустим одновременно 5 процессов указав опцию --workers=5

python -m luigi_demo_tasks Task1 --Task1-task-index=0 --workers=5


В планировщике обновляя страницу увидим следующую последовательность:

59de82495b457558795603.png
59de828b94e72880941352.png
59de829bb932e549855535.png
59de82baa4de7668076374.png
59de82c613a5e091505594.png
59de82ecf372c358974421.png
59de83008124e999800860.png
59de830f26c33275128065.png

Одновременно выполняется не более пяти задач, а иногда только одна, так как от нее зависят все остальные. При этом выделенные воркеры простаивают.

Запуск задач

Для запуска задач необходимо использовать какой-либо внешний планировщик, например cron. Соответственно необходимо самостоятельно настраивать получение актуального кода для запуска, логирование и конфигурирование всех задач.

Дополнительные возможности

В случае возникновения ошибок в работе luigi может отправить email.

Для каждого набора задач можно указать внешний файл с настройками, настроив перед запуском переменную LUIGI_CONFIG_PATH.

Каждая задача может быть запущена с некоторым приоритетом, для указания приоритета необходимо в классе указать поле priority.

Реализовано довольно много классов задач, связанных с типичными примерами обработки данных на кластере — hadoop streaming задача на Python, hadoop jar задача, spark задача и другие. При этом часто они требуют существенной доработки.

Возможно выполнение любой задачи в виде запуска консольной команды с отслеживанием процесса выполнения.

Код самой библиотеки чаще всего довольно прост. Для понимания того, как выстроить зависимые задачи, достаточно посмотреть исходный код базовых классов в luigi. У них довольно простой интерфейс и неплохая документация.

Разработан крупной компанией. На данный момент стабильно есть несколько коммитов в мастер каждую неделю. Скорее всего будет и дальше поддерживаться.

Недостатки

Проверка того, выполнена ли задача, происходит только во время построения графа зависимостей. Из-за этого, если запускать выполнение набора задач чаще, чем он весь успевает выполниться, может возникнуть ситуация, когда планировщик запускает две одинаковые задачи.

Нет встроенного планировщика.

Нет способа получить метаинформацию о выполняемых задачах. Нельзя без вспомогательных средств получить документацию по запускаемым задачам, процессам обработки данных. Нет способа, например, получить общий список задач, которые зависят от данной задачи.

Веб-интерфейс планировщика полезен только для того, чтобы увидеть, почему тот или иной набор задач не выполняется. Обычно можно посмотреть на граф зависимостей и увидеть, каких именно данных не хватает.

Не совсем очевидна настройка логирования выполнения задач.

Довольно часто встречается неожиданное поведение.

Поддержка и развитие менее активное, чем, например у Airflow. Для сравнения в luigi и в airflow

Вывод

Luigi хорошо подходит для построения процессов обработки данных. Однако прежде, чем начинать использовать эту библиотеку, есть смысл попробовать реализовать несколько типичных задач на разных фреймворках и выбрать такой, который лучше подойдет именно для ваших задач.

THE END

Как всегда рады мнениям, вопросам и тапкам.

© Habrahabr.ru