Распределенное выполнение Python-задач с использованием Apache Mesos. Опыт Яндекса

Подготовка релиза картографических данных включают в себя запуск массовой обработки данных. Некоторые задачи хорошо ложатся на идеологию Map-Reduce. В этом случае задача инфраструктуры традиционно решается использованием Hadoop или YT


В реальности часть задач таковы, что разбиение их на маленькие подзадачи невозможно, или нецелесообразно (из-за наличия существующего решения и дорогой разработки, например). Для этого мы в Яндекс.Картах разработали и используем свою систему планирования и выполнения взаимосвязанных задач. Одним из элементов такой системы является планировщик, запускающий задачи на кластере с учетом доступных ресурсов.
Workflow Graph


Эта статья о том как мы решили эту задачу с использованием Apache Mesos.


Для простоты предположим, что существующей реализацией продиктован следующий интерфейс на Python:


class Task(object):
    # "Базовый" класс для всех задач. Предполагается, что аргументы
    # сохранены внутри объекта задачи.
    def consumption(self):
        # Возвращает список dict <имя ресурса> -> <необходимое количесво>
        # типичные ресурсы: "cpu" (например в штуках), "ram" в байтах, "db_connections" в штуках
        pass
    def run(self):
        # Выполняет задачу, и возвращает результат
        # может бросить exception -- это интерпертируется как невыполненная задача
        # которая может быть перезапущена (в ручную или автоматически).
        pass

class TaskExecuter(object):
    def execute(self, task):
        # Запланировать выполнение задачи `task`
        pass

    def cancel(self, task):
        # Отменить выполнение задачи `task`, по возможности убить если она уже запущена
        # быть готовым, к тому что задача может быть тут же перезапущена.
        pass

    def pop_finished(self, task):
        # Получить список завершившихся задач с их результатами.
        # Каждый элемент списка это tupple `(task, return_value, exception)`
        # одновременно быть установленным может быть только return_value или exception.
        pass

Терминология


Разберем основные концепции используемые в Mesos, необходимые для выполнения задач Mesos-master — координатор кластера, собирает информацию о имеющихся хостах и их ресурсах и предлагает приложениям.


  • Mesos-agent, Mesos-slave — программа, запущенная на каждом воркере, сообщает о своих ресурсах мастеру, запускает задачи.
  • Scheduler — планировщик задач, программа или часть программы-супервайзера. Знает какие задачи нужно выполнить. Принимает решения какие задачи выполнить с учетом имеющихся ресурсов.
  • Executor — «исполнитель» задач, отдельная программа, запускаемая на agent-хостах. Получая задачу, отправленную Scheduler'ом выполняет ее. Сообщает статус ее выполнения и отправляет результат.
  • Протокол общения между Scheduler’ом и Executor’ом — protobuf-сообщения, описанные в Mesos. Каждая задача описывается фактически строковым полем в этом сообщении. Интерпретация этой строки внутренне дело Scheduler’а и Executor’а. Как видно Scheduler и Executor тесно связанны, вместе они в терминологии Mesos называются Framework.

Схема работы при этом такая:


  • Приложение желающее запускать задачи в Mesos, создает объект Scheuler и регистрирует его в Mesos-master
  • Mesos-мастер собирает доступные ресурсы и предлагает их Scheduler’ам в виде resourceOffer'а.
  • Если Scheduler имеет подходящую задачу, то он отправляет ее на мастер, вместе ID resourceOffer'а.
  • Mesos доставляет задачу на slave, и запускает там Executor (если еще не запускал).
    И передает задачу в него.
  • Executer выполняет задачу, и сообщает результат Slave
  • Результат доставляется в Scheduler в виде сообщения StatusUpdate.

Установка локальной версии Mesos


Вообще говоря, рекомендованная установка Mesos включает 3 хоста с запущенным процессом Mesos-мастера и использование Zookeeper для их синхронизации.


Но для разработки достаточно одного, запущенного на локальной машине. На данный момент проще всего установить Mesos, собрав его из исходников. Установка для различных платформ описана в разделе Getting Started в документации по Mesos.


Вот как это выглядело для Mac OS (с учетом того, что все девелоперские утилиты у меня уже есть):


$ git clone https://github.com/apache/mesos.git
Cloning into 'mesos'...
remote: Counting objects: 90921, done.
remote: Compressing objects: 100% (13/13), done.
remote: Total 90921 (delta 3), reused 0 (delta 0), pack-reused 90908
Receiving objects: 100% (90921/90921), 281.56 MiB | 5.06 MiB/s, done.
Resolving deltas: 100% (65917/65917), done.
Checking connectivity... done.
$ cd mesos/
$ git checkout 0.28.2 # последняя стабильная версия на момент написания
$ ./bootstrap
$ mkdir build && cd build
$ ../configure --prefix=$HOME/opt/usr --with-python
$ make -j6 # собираем в 6 потоков, меньше слишком долго собирается,
           #больше невозможно параллельно работать
$ make install

Для удобства можно добавить пути до Mesos в переменные окружения.


export PATH=$PATH:$HOME/opt/usr/bin 
export PYTHONPATH=$HOME/opt/usr/lib/python/site-packages/
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/opt/usr/lib/

Запускаем локальный вариант


$ mesos-local

Теперь Mesos установлен и запущен. Его состояние можно посмотреть по адресу localhost:5050


Mesos UI


Первый Framework


Для начала импортируем необходимые библиотеки:


import mesos.interface
from mesos.interface import mesos_pb2
import mesos.native

Для запуска нам нужен Scheduler, для начала сделаем просто заглушку:


class SimpleScheduler(mesos.interface.Scheduler):
    pass

Опишем наш фреймворк:


framework = mesos_pb2.FrameworkInfo()
framework.user = "" # По умолчанию имя пользователя запустившего framwork
framework.name = "Simple Scheduler"

Создадим инстанс планировщика:


scheduler = SimpleScheduler()

Запустим driver через который происходит общение планировщика с Mesos-мастером.


driver = mesos.native.MesosSchedulerDriver(
    scheduler,
    framework,
    "localhost:5050"
)
driver.run()

Ура! Мы создали фреймворк, который бесконечно получает предложения ресурсов, и никогда их не использует.


Давайте попробуем позапускать задачи. Начнем с простого, с исполнения shell-команд. Для таких задач в Mesos уже есть встроенный Executor.


Чтобы запускать задачу в SimpleScheduler нужно описать функцию resourceOffers. Эта функция принимает на вход объект драйвера, который мы уже создали, и список предложений ресурсов. Мы для простоты будем всегда принимать первый.


class SimpleScheduler(mesos.interface.Scheduler):
    #...
    def resourceOffers(self, driver, offers):
        # создаем описание задачи

        task = mesos_pb2.TaskInfo()
        # Обязятельное поле
        task.name = "Simple Scheduler Task"
        # Обязательное поле, дожно быть уникально среди
        # запущенных фреймворком задач
        task.task_id.value = str(self._next_id)
        self._next_id +=1

        # Чтобы показать, где запустить задачу, передаем slave_id из offer'a
        task.slave_id.value = offers[0].slave_id.value

        # Если поле command заполнено, то Mesos будет использовать 
        # встроенный CommandExecutor, который выполнит эту команду в
        # shell'е
        task.command.value = "echo Hello Mesos World"

        # Теперь нужно заполнить информацию о потребляемых ресурсах
        # она обязательная.
        cpus = task.resources.add()
        cpus.name = "cpus"
        cpus.type = mesos_pb2.Value.SCALAR
        cpus.scalar.value = 1 # Декларируем один используемый процессор

        mem = task.resources.add()
        mem.name = "mem"
        mem.type = mesos_pb2.Value.SCALAR
        mem.scalar.value = 1 # Декларируем 1 Мегабайт

        # Запускаем задачи
        # Первый параметр описывает список офферов, которые мы приняли
        # для запуска задач.
        # Второй параметр -- список задач. Списки не должны соответствовать 
        # друг-другу. Все ресурсы, предложенные офферами суммируются.
        driver.launchTasks([offer.id for offer in offers], [task])

В принципе этого достаточно, для запуска задачи (если нас не интересует ее судьба). Можно запустить наш скрипт, и увидеть в логах mesos-local заветные строчки «Hello Mesosphere World»


Видимо одной статьи слишком мало чтобы решить поставленную задачу имплементации распределенной очереди. Продолжим ее решение во второй части.


Материалы по теме


Официальная документация Apache Mesos, http://mesos.apache.org/documentation/latest/
Книга David Greenberg, Building Applications on Mesos, http://shop.oreilly.com/product/0636920039952.do

Комментарии (0)

© Habrahabr.ru