Запускаем локальный ML-процесс в облаке с помощью DataSphere Jobs
В сообществе ML‑инженеров и дата‑сайентистов популярны инструменты с быстрой обратной связью наподобие JupyterLab — они помогают легко и без лишних обвязок проверять гипотезы или создавать прототипы. Но довольно часто бывает, что при разработке ML‑пайплайна, будь то инференс или обучение модели, хочется пользоваться установленной локально полноценной IDE, в которой открыт проект со многими зависимостями, окружением, сложной структурой. При написании кода и его отладке хочется пользоваться дебагером и уметь быстро менять код, а при запуске — скейлить ресурсы исполнения и не думать о том, как перенести код и окружение на продакшн‑сервера. Всех этих возможностей в Jupyter‑экосистеме из коробки нет, поэтому разработчикам часто приходится создавать костыли.
Помочь в решении этих задач могут инструменты для удалённого исполнения кода в ML. Сегодня на конкретном примере покажу, как устроен и как работает один из таких инструментов, созданный нами для пользователей облака, — DataSphere Jobs. А в следующий раз вместе с моими коллегами рассмотрим опенсорс‑инструменты для подобных задач.
Чего нам не хватало в экосистеме
Всем пользователям облака важно следить за потреблением и внимательно распределять ресурсы, особенно для энергозатратных проектов, к которым машинное обучение относится в первую очередь. Поэтому новый инструмент должен был помочь ML‑инженерам с запуском локального кода в облаке за пределами JupyterLab и решить сразу несколько связанных задач:
Запуск тяжелых процессов: обучение моделей, расчёт статистики.
Инференс моделей не в режиме реального времени. А также возможность поставить обучение в очередь, например, на исполнение ночью.
Простой скейлинг исполнения — когда легко можно указать более мощную или менее мощную конфигурацию ВМ.
Запуск скриптов по крону или в качестве шагов в пайплайне Airflow.
Запуск любых бинарных файлов или других языков программирования.
Использование в своём коде отдельных облачных ресурсов DataSphere, таких как датасеты или S3.
В идеале хотелось избавить пользователя от ручных операций: чтобы ему оставалось написать простой конфиг в несколько строк и запустить задание. По заданию было важно сразу автоматически собирать окружение, зависимости, входные файлы, переносить необходимые библиотеки с локального компьютера в облако, запускать подготовленные виртуальные машины — и потом возвращать результат на локальный компьютер пользователя, с записью всех логов в реалтайме.
На уровне архитектуры это выглядит так:
Но гораздо интереснее посмотреть, как это работает вживую. Поэтому перейдём к практическому примеру.
Запускаем создание нейрокартинок с помощью Jobs
В качестве примера возьмем несложный скрипт, который генерирует изображения через Stable Diffusion XL. Эта задача довольно простая, но ресурсоёмкая для локального запуска: у меня, например, довольно слабая видеокарта и мне попросту не хватает памяти для запуска.
Берём модель с huggingface.co и пишем код на основе их примера:
import argparse
import torch
import shutil
from diffusers import DiffusionPipeline
from diffusers import LMSDiscreteScheduler
import os
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='test args')
parser.add_argument('-i', '--input', required=True, type=argparse.FileType('r'))
args = parser.parse_args()
queries = args.input.readlines()
base = DiffusionPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True
)
base.to("cuda")
base.scheduler = LMSDiscreteScheduler.from_config(base.scheduler.config)
refiner = DiffusionPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-refiner-1.0",
text_encoder_2=base.text_encoder_2,
vae=base.vae,
torch_dtype=torch.float16,
use_safetensors=True,
variant="fp16",
)
refiner.to("cuda")
n_steps = 100
high_noise_frac = 0.8
if not os.path.exists('images'):
os.makedirs('images')
i = 0
for prompt in queries:
print(f'готовимся генерировать картинку {i}')
image = base(
prompt=prompt,
num_inference_steps=n_steps,
denoising_end=high_noise_frac,
output_type="latent",
).images
print(f'Запускаем refine model')
image = refiner(
prompt=prompt,
num_inference_steps=n_steps,
denoising_start=high_noise_frac,
image=image,
).images[0]
print(f'картинка {i} сгенерирована')
image.save(f"images/{i}.png")
i += 1
print("Создаем архив")
shutil.make_archive('output_images', 'zip', 'images')
print("Архив создан")
Тут всё довольно просто — скрипт ожидает на вход текстовый файл со списком промтов, после чего генерирует по ним картинки и сохраняет их в архив по имени output_images.zip. Архив создаётся как раз для того, чтобы в конфигурации DataSphere Jobs было проще указать выходные файлы и проще принести их назад на мой локальный компьютер.
Создадим новое виртуальное окружение для Python — это поможет перенести только необходимые нашему коду зависимости при автоматическом переносе.
В папке проекта запустим:
python -m venv venv
source venv/bin/activate
После этого установим все необходимые зависимости:
pip install diffusers transformers accelerate scipy safetensors torch
В файл main.py положим сам скрипт с генерацией, а в файл queries.txt положим текст следующего содержания:
a full-length photo of women with red hair and green eyes in botanical garden, monstera plant in background, style dark botanical, 4k, soft light
a photo of cybernetic white eastern dragon with golden eyes in the sky, futuristic eastern skyscraper in the background, 4k, futurism
a picture of japanese yokai with horns in style of cyberpunk anime, clouds in the background
a picture cyberpunk pab with augmented punks with red and pink mohawk, one of the punks is holding a laptop on his lap, in style of Ian McQue
В итоге получим такую структуру файлов:
В целом, код уже можно запускать, если есть достаточно мощный компьютер с мощной видеокартой и настроенным окружением (например, установлены драйвера nvidia и cuda toolkit). Но мы пойдём дальше.
Добавим файл конфигурации для задания:
name: sd-xl-generate-new
desc: generate SD-xl image
cmd: python main.py --input ${QUERIES}
env:
python: auto
inputs:
- queries.txt: QUERIES
outputs:
- output_images.zip
cloud-instance-type: g1.1
Рассмотрим основные части файла конфигурации из документации:
namе и desc — описательные поля для интерфейса и cli.
cmd — параметризованная строка, которая говорит, как именно нужно запускать код. В данном примере в виде шаблона ${QUERIES} передаётся ссылка на входной файл, который описан ниже.
env — тут описано, как собирать окружение. В данном случае мы пользуемся авторежимом для Python —
datasphere‑cli
проанализирует окружение и перенесёт его.inputs — список входных файлов, где объявлено значение шаблона QUERIES.
outputs — список выходных файлов, которые так же можно использовать в шаблонах.
cloud‑instance‑type — мощность ВМ, на которой будет исполняться задание: в данном случае ВМ с одной ГПУ Nvidia V100.
Теперь установим библиотеку
datasphere
в это же виртуальное окружение Python:
pip install datasphere
Установим yc — утилиту для работы с Yandex Cloud — в соответствии с документацией облака.
Для MacOS:
curl -sSL https://storage.yandexcloud.net/yandexcloud-yc/install.sh | bash
yc init
Заведём бесплатный проект в DataSphere, в рамках которого можно запускать задания и получать статистику потребления: https://datasphere.yandex.ru/
Создаём комьюнити, если его нет, и привязываем биллинг-аккаунт:
И создаём проект в этом сообществе:
Наконец-то все приготовления сделаны и можно запускать код! Пишем в терминале:
datasphere project job execute -p ID_проекта_в_датасфере -c config.yaml
После этого шага библиотека datasphere соберёт окружение и инициирует исполнение задания, c записью логов в терминале в режиме реального времени. После исполнения их можно будет скачать в интерфейсе DataSphere.
Пример аутпута:
[SYSTEM] 2023-12-18 22:18:39,350 - [INFO] - creating job ...
[SYSTEM] 2023-12-18 22:18:40,252 - [INFO] - uploading 1 files (2.5KB) ...
[SYSTEM] 2023-12-18 22:18:40,440 - [INFO] - files are uploaded
[SYSTEM] 2023-12-18 22:18:40,442 - [INFO] - created job `bt1a528feopd3qebf11g`
[SYSTEM] 2023-12-18 22:18:41,075 - [INFO] - executing job ...
[PROGRAM] diffusion_pytorch_model.fp16.safetensors: 12%|█▏ | 608M/5.14G [00:07<00:54, 83.0MB/s] hon-39-darwin.so'}), is_binary=True)]
[PROGRAM] model.fp16.safetensors: 100%|██████████| 246M/246M [00:03<00:00, 73.1MB/s]02<01:10, 70.0MB/s]
[PROGRAM] Fetching 19 files: 21%|██ | 4/19 [00:03<00:16, 1.12s/it]B/s]MB/s]
[PROGRAM] diffusion_pytorch_model.fp16.safetensors: 4%|▎ | 189M/5.14G [00:02<01:06, 74.8MB/s]
[PROGRAM] model.fp16.safetensors: 9%|▊ | 21.0M/246M [00:00<00:04, 52.1MB/s]]
[PROGRAM] tokenizer_2/tokenizer_config.json: 100%|██████████| 725/725 [00:00<00:00, 266kB/s]
[PROGRAM] tokenizer_2/special_tokens_map.json: 0%| | 0.00/460 [00:00, ?B/s]your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.
[PROGRAM] tokenizer_2/tokenizer_config.json: 0%| | 0.00/725 [00:00, ?B/s]
[PROGRAM] 90%|█████████ | 18/20 [00:05<00:00, 3.17it/s]
[PROGRAM] 95%|█████████▌| 19/20 [00:05<00:00, 3.18it/s]
[PROGRAM] 100%|██████████| 20/20 [00:06<00:00, 3.17it/s]
[PROGRAM] 100%|██████████| 20/20 [00:06<00:00, 3.18it/s]
[PROGRAM] картинка 3 сгенерирована
[PROGRAM] Создаем архив
[PROGRAM] Архив создан
[SYSTEM] 2023-12-18 22:29:52,387 - [INFO] - job link: https://datasphere.yandex.ru/communities/.../projects/..../job/bt1a528feopd3qebf11g
[SYSTEM] 2023-12-18 22:29:52,387 - [INFO] - downloading 1 files (5.4MB) ...
[SYSTEM] 2023-12-18 22:29:53,244 - [INFO] - files are downloaded
[SYSTEM] 2023-12-18 22:29:53,245 - [INFO] - job completed successfully
После выполнения задания архив с картинками должен появиться на моём локальном компьютере.
В интерфейсе DataSphere это выглядит так:
Cама страница запущенного задания:
Результаты работы:
Что планируем дальше
Сейчас мы значительно оптимизировали скорость запуска задания за счёт кеширования зависимостей, кода и входных файлов. Таким образом, если что‑то уже загружено к нам, то загружать ещё раз мы это не будем, если хеш файла неизменен. Заодно экономим время на регулярной перезагрузке.
DataSphere‑cli научился обходить окружение и собирать необходимые зависимости, после чего отправлять в публичное API DataSphere запросы для конфигурации и исполнения задания.
В ближайших планах у нас добавить оператор для Apache Airflow. Также буду рад обсудить предложения по развитию фичи в нашем комьюнити DataSphere.