[Перевод] Ray: Распределенная система для использования ИИ

Здравствуйте, коллеги.

Надеемся еще до конца августа приступить к переводу небольшой, но поистине базовой книги о реализации возможностей ИИ на языке Python.

n1bie-mblp_vav_j89bejegoj8m.jpeg

Господин Гифт, пожалуй, в дополнительной рекламе не нуждается (для любопытствующих — профиль мэтра на GitHub):

9nl03xcz2j1mpum2a8_c2y4gy-y.jpeg

В предлагаемой сегодня статье будет коротко рассказано о библиотеке Ray, разработанной в Калифорнийском университете (Беркли) и упомянутой в книге Гифта мелким петитом. Надеемся, что в качестве раннего тизера — то, что надо. Добро пожаловать под кат
По мере развития алгоритмов и приемов машинного обучения, все больше и больше приложений для машинного обучения требуется запускать сразу на множестве машин, и они не могут обойтись без параллелизма. Однако инфраструктура для выполнения машинного обучения на кластерах по-прежнему формируется ситуативно. Сейчас уже существуют хорошие решения (например, серверы параметров или поиск гиперпараметров) и высококачественные распределенные системы (например, Spark или Hadoop), исходно создававшиеся не для работы с ИИ, но практикующие специалисты часто создают инфраструктуру для собственных распределенных систем с нуля. На это тратится масса лишних усилий.

В качестве примера рассмотрим концептуально простой алгоритм, скажем, Эволюционные стратегии для обучения с подкреплением. На псевдокоде этот алгоритм укладывается примерно в дюжину строк, и его реализация на Python немногим больше. Однако, для эффективного использования этого алгоритма на более крупной машине или кластере требуется значительно более сложная программная инженерия. В реализации этого алгоритма от авторов данной статьи — тысячи строк кода, здесь необходимо определять протоколы связи, стратегии сериализации и десериализации сообщений, а также различные способы обработки данных.

Одна из целей Ray — помочь специалисту-практику превратить алгоритм-прототип, запускаемый на ноутбуке, в высокопроизводительное распределенное приложение, эффективно работающее на кластере (или на отдельно взятой многоядерной машине), добавив относительно немногочисленные строки кода. Такой фреймворк должен по части производительности обладать всеми преимуществами системы, оптимизированной вручную, и не требовать, чтобы пользователь сам задумывался о планировании, передаче данных и машинных сбоях.

Свободный фреймворк для ИИ

Связь с другими фреймворками глубокого обучения: Ray полностью совместим с такими фреймворками глубокого обучения как TensorFlow, PyTorch и MXNet, поэтому во многих приложениях совершенно естественно использовать вместе с Ray один или более других фреймворков глубокого обучения (например, в наших библиотеках для обучения с подкреплением активно применяются TensorFlow и PyTorch).

Связь с другими распределенными системами: Сегодня используется много популярных распределенных систем, однако, большинство из них проектировалось без учета задач, связанных с ИИ, поэтому не обладают требуемой производительностью для поддержки ИИ и не имеют API для выражения прикладных аспектов ИИ. В современных распределенных системах отсутствуют (те или иные, в зависимости от системы) такие нужные возможности:

  • Поддержка задач на уровне миллисекунд и поддержка выполнения миллионов задач в секунду
  • Вложенный параллелизм (распараллеливание задач внутри задач, например, параллельные имитации при поиске гиперпараметров) (см. на следующем рисунке)
  • Произвольные зависимости между задачами, динамически во время исполнения (например, чтобы не приходилось ожидать, подстраиваясь под темп медленных работников)
  • Задачи, оперирующие разделяемым изменяемым состоянием (например, весовые коэффициенты в нейронных сетях или симулятор)
  • Поддержка неоднородных ресурсов (CPU, GPU, т.д.)

9-0ojeen_te_v9xyudtqh59aesq.png

Простой пример вложенного параллелизма. В нашем приложении параллельно выполняется два эксперимента (каждый из них — это долгосрочная задача), и в каждом эксперименте моделируется несколько параллельных процессов (каждый процесс — это тоже задача).

Существует два основных способа использования Ray: через его низкоуровневые API и через высокоуровневые библиотеки. Высокоуровневые библиотеки надстраиваются поверх низкоуровневых API. В настоящее время к их числу относятся Ray RLlib (масштабируемая библиотека для обучения с подкреплением) и Ray.tune, эффективная библиотека для распределенного поиска гиперпараметров.

Низкоуровневые API Ray

Цель Ray API — обеспечить естественное выражение самых общих вычислительных паттернов и прикладных задач, не ограничиваясь при этом такими фиксированными паттернами как MapReduce.

Динамические графы задач

Базовый примитив в приложении (задании) Ray — динамический граф задач. Он очень отличается от вычислительного графа в TensorFlow. Тогда как в TensorFlow вычислительный граф представляет нейронную сеть и выполняется по множеству раз в каждом отдельном приложении, в Ray граф задач соответствует целому приложению и выполняется всего один раз. Граф задач заранее не известен. Он строится динамически, пока работает приложение, и выполнение одной задачи может инициировать выполнение многих других задач.

swu8xsuyr5gtqoghtk5gkfgihme.png

Пример вычислительного графа. В белых овалах показаны задачи, а в голубых прямоугольниках — объекты. Стрелками указано, что одни задачи зависят от объектов, а другие — создают объекты.

Произвольные функции Python можно выполнять как задачи, причем, они в произвольном порядке могут зависеть от вывода других задач. См. пример ниже.

# Определяем две удаленные функции. При вызове этих функций создаются задачи,
# выполняемые удаленно.

@ray.remote
def multiply(x, y):
    return np.dot(x, y)

@ray.remote
def zeros(size):
    return np.zeros(size)

# Параллельно запускаем две задачи. Они сразу же возвращают футуры,
# и эти задачи выполняются в фоновом режиме.
x_id = zeros.remote((100, 100))
y_id = zeros.remote((100, 100))

# Запускаем третью задачу. Она не будет назначена, пока первые две задачи  
# не завершатся.
z_id = multiply.remote(x_id, y_id)

# Получаем результат. Он останется заблокирован до тех пор, пока не завершится третья задача.
z = ray.get(z_id)

Акторы

При помощи одних только удаленных функций и вышеописанного обращения с задачами невозможно добиться, чтобы несколько задач одновременно работали над одним и тем же разделяемым изменяемым состоянием. Такая проблема при машинном обучении возникает в разных контекстах, где разделяемым может быть состояние симулятора, весовые коэффициенты в нейронной сети, что-нибудь совершенно иное. Абстракция актора используется в Ray для инкапсуляции изменяемого состояния, разделяемого между множеством задач. Вот иллюстративный пример, демонстрирующий, как это делается с симулятором Atari.

import gym

@ray.remote
class Simulator(object):
    def __init__(self):
        self.env = gym.make("Pong-v0")
        self.env.reset()

    def step(self, action):
        return self.env.step(action)

# Создаем симулятор, он запускает удаленный процесс, который, в свою очередь,
# запустит все методы этого актора
simulator = Simulator.remote()

observations = []
for _ in range(4):
    # Совершаем в симулятор действие 0. Этот вызов не приводит к блокировке 
    # и возвращает футуру
    observations.append(simulator.step.remote(0))

При всей простоте актор очень гибок в использовании. Например, в акторе может инкапсулироваться симулятор или политика нейронной сети, также он может использоваться для распределенного обучения (как, например, с сервером параметров) или для обеспечения политик в «живом» приложении.

hrorfevzvb-zqne47z4xa1octgu.png

Слева: Актор выдает прогнозы/действия некоторому количеству клиентских процессов. Справа: Множество акторов сервера параметров выполняют распределенное обучение множества рабочих процессов.

Пример с сервером параметров

Сервер параметров можно реализовать в виде актора Ray следующим образом:

@ray.remote
class ParameterServer(object):
    def __init__(self, keys, values):
        # Эти значения будут изменяться, поэтому необходимо создать локальную копию.
        values = [value.copy() for value in values]
        self.parameters = dict(zip(keys, values))

    def get(self, keys):
        return [self.parameters[key] for key in keys]

    def update(self, keys, values):
        # Эта функция обновления выполняет сложение с имеющимися значениями, но 
        # функцию обновления можно определять произвольно
        for key, value in zip(keys, values):
            self.parameters[key] += value

Вот более полный пример.

Чтобы инстанцировать сервер параметров, поступим так.

parameter_server = ParameterServer.remote(initial_keys, initial_values)

Чтобы создать четыре долгоиграющих работника, постоянно извлекающих и обновляющих параметры, сделаем так.

@ray.remote
def worker_task(parameter_server):
    while True:
        keys = ['key1', 'key2', 'key3']
        # Получаем наиболее актуальные параметры
        values = ray.get(parameter_server.get.remote(keys))
        # Вычисляем некоторые обновления параметров
        updates = …
        # Обновляем параметры
        parameter_server.update.remote(keys, updates)

# Запускаем 4 долгосрочные задачи
for _ in range(4):
    worker_task.remote(parameter_server)

Высокоуровневые библиотеки Ray

Ray RLlib — это масштабируемая библиотека для обучения с подкреплением, созданная для использования на множестве машин. Ее можно задействовать при помощи приведенных для примера обучающих сценариев, а также через API на Pytho. В настоящее время он включает реализации алгоритмов:

  • A3C
  • DQN
  • Эволюционных стратегий
  • PPO

Идет работа и над реализацией других алгоритмов. RLlib полностью совместима с OpenAI gym.

Ray.tune — эффективная библиотека для распределенного поиска гиперпараметров. В ней предоставляется API на Python для решения задач глубокого обучения, обучения с подкреплением и других задач, требующих большой вычислительной мощности. Вот иллюстративный пример такого рода:

from ray.tune import register_trainable, grid_search, run_experiments

# Функция для оптимизации. Гиперпараметры находятся в аргументе config
def my_func(config, reporter):
    import time, numpy as np
    i = 0
    while True:
        reporter(timesteps_total=i, mean_accuracy=(i ** config['alpha']))
        i += config['beta']
        time.sleep(0.01)

register_trainable('my_func', my_func)

run_experiments({
    'my_experiment': {
        'run': 'my_func',
        'resources': {'cpu': 1, 'gpu': 0},
        'stop': {'mean_accuracy': 100},
        'config': {
            'alpha': grid_search([0.2, 0.4, 0.6]),
            'beta': grid_search([1, 2]),
        },
    }
})

Текущие результаты можно динамически визуализировать при помощи специальных инструментов, например, Tensorboard и VisKit от rllab (либо напрямую читать логи JSON). Ray.tune поддерживает поиск по сетке, случайный поиск и более нетривиальные алгоритмы раннего останова, например, HyperBand.

Подробнее о Ray

© Habrahabr.ru