Распределенное выполнение Python-задач с использованием Apache Mesos. Опыт Яндекса
Подготовка релиза картографических данных включают в себя запуск массовой обработки данных. Некоторые задачи хорошо ложатся на идеологию Map-Reduce. В этом случае задача инфраструктуры традиционно решается использованием Hadoop или YT
В реальности часть задач таковы, что разбиение их на маленькие подзадачи невозможно, или нецелесообразно (из-за наличия существующего решения и дорогой разработки, например). Для этого мы в Яндекс.Картах разработали и используем свою систему планирования и выполнения взаимосвязанных задач. Одним из элементов такой системы является планировщик, запускающий задачи на кластере с учетом доступных ресурсов.
Эта статья о том как мы решили эту задачу с использованием Apache Mesos.
Для простоты предположим, что существующей реализацией продиктован следующий интерфейс на Python:
class Task(object):
# "Базовый" класс для всех задач. Предполагается, что аргументы
# сохранены внутри объекта задачи.
def consumption(self):
# Возвращает список dict <имя ресурса> -> <необходимое количесво>
# типичные ресурсы: "cpu" (например в штуках), "ram" в байтах, "db_connections" в штуках
pass
def run(self):
# Выполняет задачу, и возвращает результат
# может бросить exception -- это интерпертируется как невыполненная задача
# которая может быть перезапущена (в ручную или автоматически).
pass
class TaskExecuter(object):
def execute(self, task):
# Запланировать выполнение задачи `task`
pass
def cancel(self, task):
# Отменить выполнение задачи `task`, по возможности убить если она уже запущена
# быть готовым, к тому что задача может быть тут же перезапущена.
pass
def pop_finished(self, task):
# Получить список завершившихся задач с их результатами.
# Каждый элемент списка это tupple `(task, return_value, exception)`
# одновременно быть установленным может быть только return_value или exception.
pass
Терминология
Разберем основные концепции используемые в Mesos, необходимые для выполнения задач Mesos-master — координатор кластера, собирает информацию о имеющихся хостах и их ресурсах и предлагает приложениям.
- Mesos-agent, Mesos-slave — программа, запущенная на каждом воркере, сообщает о своих ресурсах мастеру, запускает задачи.
- Scheduler — планировщик задач, программа или часть программы-супервайзера. Знает какие задачи нужно выполнить. Принимает решения какие задачи выполнить с учетом имеющихся ресурсов.
- Executor — «исполнитель» задач, отдельная программа, запускаемая на agent-хостах. Получая задачу, отправленную Scheduler'ом выполняет ее. Сообщает статус ее выполнения и отправляет результат.
- Протокол общения между Scheduler’ом и Executor’ом — protobuf-сообщения, описанные в Mesos. Каждая задача описывается фактически строковым полем в этом сообщении. Интерпретация этой строки внутренне дело Scheduler’а и Executor’а. Как видно Scheduler и Executor тесно связанны, вместе они в терминологии Mesos называются Framework.
Схема работы при этом такая:
- Приложение желающее запускать задачи в Mesos, создает объект Scheuler и регистрирует его в Mesos-master
- Mesos-мастер собирает доступные ресурсы и предлагает их Scheduler’ам в виде resourceOffer'а.
- Если Scheduler имеет подходящую задачу, то он отправляет ее на мастер, вместе ID resourceOffer'а.
- Mesos доставляет задачу на slave, и запускает там Executor (если еще не запускал).
И передает задачу в него. - Executer выполняет задачу, и сообщает результат Slave'у
- Результат доставляется в Scheduler в виде сообщения StatusUpdate.
Установка локальной версии Mesos
Вообще говоря, рекомендованная установка Mesos включает 3 хоста с запущенным процессом Mesos-мастера и использование Zookeeper для их синхронизации.
Но для разработки достаточно одного, запущенного на локальной машине. На данный момент проще всего установить Mesos, собрав его из исходников. Установка для различных платформ описана в разделе Getting Started в документации по Mesos.
Вот как это выглядело для Mac OS (с учетом того, что все девелоперские утилиты у меня уже есть):
$ git clone https://github.com/apache/mesos.git
Cloning into 'mesos'...
remote: Counting objects: 90921, done.
remote: Compressing objects: 100% (13/13), done.
remote: Total 90921 (delta 3), reused 0 (delta 0), pack-reused 90908
Receiving objects: 100% (90921/90921), 281.56 MiB | 5.06 MiB/s, done.
Resolving deltas: 100% (65917/65917), done.
Checking connectivity... done.
$ cd mesos/
$ git checkout 0.28.2 # последняя стабильная версия на момент написания
$ ./bootstrap
$ mkdir build && cd build
$ ../configure --prefix=$HOME/opt/usr --with-python
$ make -j6 # собираем в 6 потоков, меньше слишком долго собирается,
#больше невозможно параллельно работать
$ make install
Для удобства можно добавить пути до Mesos в переменные окружения.
export PATH=$PATH:$HOME/opt/usr/bin
export PYTHONPATH=$HOME/opt/usr/lib/python/site-packages/
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/opt/usr/lib/
Запускаем локальный вариант
$ mesos-local
Теперь Mesos установлен и запущен. Его состояние можно посмотреть по адресу localhost:5050
Первый Framework
Для начала импортируем необходимые библиотеки:
import mesos.interface
from mesos.interface import mesos_pb2
import mesos.native
Для запуска нам нужен Scheduler, для начала сделаем просто заглушку:
class SimpleScheduler(mesos.interface.Scheduler):
pass
Опишем наш фреймворк:
framework = mesos_pb2.FrameworkInfo()
framework.user = "" # По умолчанию имя пользователя запустившего framwork
framework.name = "Simple Scheduler"
Создадим инстанс планировщика:
scheduler = SimpleScheduler()
Запустим driver через который происходит общение планировщика с Mesos-мастером.
driver = mesos.native.MesosSchedulerDriver(
scheduler,
framework,
"localhost:5050"
)
driver.run()
Ура! Мы создали фреймворк, который бесконечно получает предложения ресурсов, и никогда их не использует.
Давайте попробуем позапускать задачи. Начнем с простого, с исполнения shell-команд. Для таких задач в Mesos уже есть встроенный Executor.
Чтобы запускать задачу в SimpleScheduler
нужно описать функцию resourceOffers
. Эта функция принимает на вход объект драйвера, который мы уже создали, и список предложений ресурсов. Мы для простоты будем всегда принимать первый.
class SimpleScheduler(mesos.interface.Scheduler):
#...
def resourceOffers(self, driver, offers):
# создаем описание задачи
task = mesos_pb2.TaskInfo()
# Обязятельное поле
task.name = "Simple Scheduler Task"
# Обязательное поле, дожно быть уникально среди
# запущенных фреймворком задач
task.task_id.value = str(self._next_id)
self._next_id +=1
# Чтобы показать, где запустить задачу, передаем slave_id из offer'a
task.slave_id.value = offers[0].slave_id.value
# Если поле command заполнено, то Mesos будет использовать
# встроенный CommandExecutor, который выполнит эту команду в
# shell'е
task.command.value = "echo Hello Mesos World"
# Теперь нужно заполнить информацию о потребляемых ресурсах
# она обязательная.
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = 1 # Декларируем один используемый процессор
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = 1 # Декларируем 1 Мегабайт
# Запускаем задачи
# Первый параметр описывает список офферов, которые мы приняли
# для запуска задач.
# Второй параметр -- список задач. Списки не должны соответствовать
# друг-другу. Все ресурсы, предложенные офферами суммируются.
driver.launchTasks([offer.id for offer in offers], [task])
В принципе этого достаточно, для запуска задачи (если нас не интересует ее судьба). Можно запустить наш скрипт, и увидеть в логах mesos-local
заветные строчки «Hello Mesosphere World»
Видимо одной статьи слишком мало чтобы решить поставленную задачу имплементации распределенной очереди. Продолжим ее решение во второй части.
Материалы по теме
Официальная документация Apache Mesos, http://mesos.apache.org/documentation/latest/
Книга David Greenberg, Building Applications on Mesos, http://shop.oreilly.com/product/0636920039952.do