Варианты использования Java ML библиотек совместно с Spring, Docker, Spark, Rapids, CUDA

Введение

В данной статье рассматривается способ использования GPU nVidia с технологией CUDA в Docker-контейнерах для распределенной тренировки моделей машинного обучения на нескольких машинах. Цель статьи — показать вариант использования Big Data Tool Apache Spark в Docker-контейнерах, совместно с акселератором GPU вычислений Rapids на устройствах nVidia CUDA, с применением библиотек DJL, Spark ML, XGBoost, в приложении Spring Boot на Java 8 (требование Rapids), на нескольких машинах под управлением ОС Windows 10 Pro для решения задачи тренировки моделей машинного обучения в распределенной системе. Те же контейнеры в дальнейшем можно использовать в Kubernetes.

Важное условие, из которого вытекают все болезненные решения — все действия выполняются в ОС Windows 10 Pro. Далее в статье будет рассмотрен рассмотрен вариант использования WSL2 (Linux Subsystem for Windows), но прежде важное требование — новейшие версии Rapids не работают с видеокартами на архитектуре Pascal, т.е. для запуска примеров в среде Windows требуется видеокарта nVidia на архитектуре Turing (серии 1600, 2000) и выше. Под OC Linux карты на архитектуре Pascal работать будут, рекомендуется запускать примеры ниже на Ubuntu 20.04 (наверняка, Debian 10 так же будет работать), но не выше — требование Rapids.

Еще одним важным условием является реализация всех примеров именно на Java. В мире Spark (вместе со Spark ML) более распространен вариант использования Scala. Scala я не знаю, и особого желания изучать нет, а вот желание изучить Spark и ML для собственного развития и решения широкого круга задач имеется. Учитывая, что Scala и Java равнозначны в среде Spark, в отличие от того же Python, и что на Java существует ряд библиотек ML, которые можно использовать совместно со Spark, и принимая во внимание уже имеющийся опыт работы с ним, решение попробовать реализовать несколько примеров не заставило себя ждать.

Код статьи проверен на Windows 10 Pro, GeForce RTX 2060 и 1080 Ti (с последней на Windows не заработало), часть скриншотов сделано во время настройки второй машины с картой GeForce 1650. Предваряя вопрос читателя, почему бы все не сделать на той же Ubuntu 20.04, отвечаю: а) так каждый сможет, вы на винде попробуйте; б) нет технической возможности (Cloud не вариант — дорогие машины с GPU).

Целевая схема запуска представлена на рисунке ниже:

941aaaa0cd251cf631bc5ae36c410571.png

Т.е. имеется от двух до n нод, на каждой из которых от 1 до m GPU устройств, docker runtime с контейнером Spark worker, из которого доступны GPU.

Hardware и software слои описываются схемой:

9a180aa4bfd5d1cab3c486c339b807e1.png

Докер позволяет запускать множество контейнеров с различными приложениями:

6ac0c7e919b9438c05f5d9c69bf51082.png

Это подходит для задач распределенной тренировки моделей ML в инфраструктуре Apache Spark: в настоящей статье рассматривается пример запуска Standalone кластера Apache Spark с одним Master узлом, двумя Worker узлами на разных машинах, и Spring Boot Java 8 приложением с использованием библиотек DJL, Spark ML и XGBoost в отдельном контейнере (спойлер — заработало не все, и не заработает без машины с Linux).

Интересным является возможность использования Embedded устройств nVidia для IoT-устройств.

a49bc917c440911f34fd79739c19910f.png

Весь нижеописанный код доступен в репозитории GitLab.

Подготовка окружения

Все нижеследующие действия выполняются на Windows 10 Pro. Важно выполнять именно на конфигурации не ниже Pro, и сборке Windows 10 Build 19043.1263 (21H1).

WSL, Docker и CUDA будут установлены в рамках данной статьи.

Рекомендуемая версия WSL 5.10.16.3+;

Docker 19.03+.

Рекомендуется установить Windows Terminal для открытия множества вкладок терминала: PowerShell, cmd, Ubuntu.

nVidia driver, CUDA

Убедитесь, что версия nVidia CUDA не ниже 11.7. Драйвер, который содержит данную версию, на момент написания статьи имеет версию 516.40.

Проверить версию драйвера и CUDA можно, открыв Powershell (лучше сразу открывать от имени администратора, но это требование для будущих действий) и выполнив команду

nvidia-smi

618480bbb1f9dfebef368a524baba970.png

WSL — Windows Subsystem for Linux

Для того, чтобы использовать GPU в Docker-контейнерах, необходимо установить ПО от nVidia (см ниже), которое требует установки WSL2.

Если на ПК пользователя WSL не установлена, то можно установить командой ниже:

wsl --install

Если WSL уже установлена, лучше обновиться до последней версии и проверить версию Ubuntu, должна быть 2.

Требуется перезагрузка. После перезапуска установится Ubuntu для Windows в отдельном окне

8b67595e212c7191823329b5938b8403.png

По окончанию установки можно проверить версию WSL в Powershell

wsl -l -v

9d89ca993d270aae13f3db60fc81d878.png

В случае, если версия Ubuntu 1, следует ее обновить

wsl --set-version Ubuntu-20.04 2

Docker Desktop

Следует установить Docker Desktop, если еще не установлен. Если установлен, рекомендуется обновить.

На момент установки на систему без Docker Desktop, моему выборы были представлены следующие настройки:

3ad1e6fe4320b9440bf12c3908b94c86.png

Я оставил оба чекбокса активированными. По окончанию установки требуется перезагрузка. В настройках необходимо убедиться, что чекбокс «Use WSL 2 based engine» активирован.

0d8faa75c6568c9a3579c5bdd2cb7159.png

Apply & Restart.

Можно проверить, что в wsl появились новые записи в списке

932437c61209582d9860977d2776c52c.png

Проверить работу Docker можно командой

docker run -d -p 5000:5000 --restart=always --name registry registry:2

Установится локальный docker registry, который будет полезен в последующей работе.

docker ps
CONTAINER ID   IMAGE        COMMAND                  CREATED         STATUS         PORTS                    NAMES
745b50d66906   registry:2   "/entrypoint.sh /etc…"   2 minutes ago   Up 2 minutes   0.0.0.0:5000->5000/tcp   registry

CUDA Support for WSL 2

Ключевым моментом является поддержка работы CUDA в докер-контейнерах, для этого у nVidia есть решение.

Выполнить следующие шаги в PowerShell под именем администратора:

wsl
sudo -i
apt-key del 7fa2af80
wget https://developer.download.nvidia.com/compute/cuda/repos/wsl-ubuntu/x86_64/cuda-wsl-ubuntu.pin
mv cuda-wsl-ubuntu.pin /etc/apt/preferences.d/cuda-repository-pin-600
wget https://developer.download.nvidia.com/compute/cuda/11.7.0/local_installers/cuda-repo-wsl-ubuntu-11-7-local_11.7.0-1_amd64.deb
# see the output of the previosly command
cp /var/cuda-repo-wsl-ubuntu-11-7-local/cuda-B81839D3-keyring.gpg /usr/share/keyrings/
apt-get update
apt-get -y install cuda

Теперь нужно проверить в отдельном окне PowerShell работу тестового контейнера nVidia с флагом benchmark:

docker run --gpus all nvcr.io/nvidia/k8s/cuda-sample:nbody nbody -gpu -benchmark

Если вывод содержит нечто подобное, все шаги выполнены корректно и можно продолжать работу.

eb6db33bf9e4629095f28b032d7e56c1.png

Если имели место ошибки, рекомендуется обратиться к страницам документации nVidia здесь и здесь для их решения.

Подготовка образов и запуск контейнеров

Учитывая, что для работы Rapids необходимо использовать Java 8, следующие шаги по подготовке всех необходимых Docker образов, а в последующем и самих приложений, будут выполнены исходя из данного требования.

Базовый образ для приложений и Spark Workers

Первоначально необходим самый базовый образ. Ниже листинг Dockerfile.

Используется базовый образ Ubuntu 20.04 с CUDA 11.7.0 из репозитория образов nVidia. Доступный образ с Ubuntu версии 22.04 не подошел по причине совместимости всех компонентов системного ПО, необходимого для запуска прикладного ПО.

FROM nvcr.io/nvidia/cuda:11.7.0-devel-ubuntu20.04

ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8'

ARG DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools

RUN apt-get update && \
	apt-get install -y openjdk-8-jdk && \
	apt-get install -y ant && \
	apt-get clean && \
	rm -rf /var/lib/apt/lists/* && \
	rm -rf /var/cache/oracle-jdk8-installer;

# Fix certificate issues
RUN apt-get update && \
	apt-get install -y ca-certificates-java && \
	apt-get clean && \
	update-ca-certificates -f && \
	rm -rf /var/lib/apt/lists/* && \
	rm -rf /var/cache/oracle-jdk8-installer;

# Setup JAVA_HOME, this is useful for docker commandline
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME

CMD ["tail", "-f", "/dev/null"]

Используемым в образе JDK является openjdk8, что соответствует требованиям Rapids и не испытывает проблем с лицензионным соглашением Oracle JDK. Также в образ устанавливается набор приложений для дебага.

Инструкция CMD не обязательна, но удобна для отладки.

Стоит заметить, что первоначально используется базовый образ nVidia с пометкой «devel» — тестирование происходило именно на нем, чтобы исключить возможные ошибки, связанные с недостаточностью компонентов.

786b777eda1f592538cca2364e91bb63.png

При этом, имеется образ:

98b751362e02d9bbfd449544143ab841.png

отличие от devel — отсутствие «nvcc».

Собирается образ командой:

docker build -f Dockerfile-cuda-java8 -t localhost:5000/cuda-jdk8:v1 .

Обращаю внимание, что на моей локальной машине имеется контейнер с репозиторием образов Docker, мне удобно при работе с локальным кластером Kubernetes указывать в манифесте свои образа из localhost:5000, и загружать их, не используя внешние репозитории.

Запускается контейнер командой:

docker run --gpus all --name=cuda-jdk8 -it -d localhost:5000/cuda-jdk8:v1

Примечание: важным флагом является »--gpus», которому передается значение «all» — благодаря данному флагу контейнеру доступны все ресурсы gpu локальной машины.

Проверить работоспособность базового образа можно путем выполнения в контейнере двух команд:

$ nvidia-smi
Sun Jul 10 13:58:20 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 515.48.07    Driver Version: 516.40       CUDA Version: 11.7     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  NVIDIA GeForce ...  On   | 00000000:0B:00.0  On |                  N/A |
|  0%   42C    P8    20W / 250W |   1241MiB / 11264MiB |      4%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+ 

$ nvcc --version
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2022 NVIDIA Corporation
Built on Tue_May__3_18:49:52_PDT_2022
Cuda compilation tools, release 11.7, V11.7.64
Build cuda_11.7.r11.7/compiler.31294372_0

При использовании базового образа «runtime» nvcc --version выведет ошибку, так как nvcc отсутствует в данном образе.

Если нет похожего вывода какой-либо из команд, следует вернуться к предыдущим разделам и проверить корректность выполнения всех шагов.

Образ Spark Worker

Следующий шаг — подготовка образа Spark Worker.

Здесь следует отметить, что в данной статье рассматривается запуск кластера Spark как Standalone кластера, без менеджера ресурсов. Spark Master запускается на локальной виртуальной машине (у меня уже имелся настроенный мастер на виртуальной машине для тестирования работы Spark с Cassandra в рамках другой задачи, которая в настоящей статье не рассматривается), и к ней подключается Spark Worker в Docker контейнере. Полезность данного теста состоит в том, что:

а) тестируется работоспособность GPU-нагрузки в контейнерах;

б) для последующей работы остается пример Standalone кластера и Docker-образ для Kubernetes кластера.

Следует так же отметить, что образ Spark может быть использован и для запуска контейнера с Spark Master.

Кратко обо всех способах запуска Spark, как в локальном и standalone режимах, так с использованием Kubernetes, можно прочесть здесь, а о различиях менеджеров Yarn и Mesos можно прочесть здесь, или изучить вопрос самостоятельно.

Подготовка

Необходимо загрузить архив со Spark с официального сайта. Ввиду проблем с совместимостью именно в моей программно-аппаратной конфигурации, мне пришлось использовать версию 3.2.1, хотя, на момент тестирования (и написания данной статьи) уже доступна версия 3.3.0.

Распаковать содержимое в директорию spark (или воспользоваться подготовленными примерами из репозитория).

После распаковки архива директория spark должна иметь следующий вид:

a1c2559b5a8547b7cfee6311c994b9ae.png

Rapids resources

За исключением директории rapids. Ее нужно создать и загрузить в нее файлы *.jar с сайта Rapids. На момент написания статьи доступен релиз 22.06.0, который совмещает в себе два представленных на скриншоте файла. Но на момент тестирования свежей версией была 22.04.0.

Сначала я хотел написать, что оставляю этот момент без изменений, однако, когда я тестировал Spring сервис перед публикацией статьи на 1080 Ti, я все же попробовал использовать 22.06.0. На 1080 Ti все равно не заработало, однако последняя версия вывела сообщение для дебага, благодаря которому я узнал, что новая версия Rapids в связке с Pascal и WSL2 работать не будет. Читатель может использовать любую версию из упомянутых, а в репозитории с примером остается 22.06.0.

Скрипт getGpusResources.sh нужен для обнаружения GPU ресурсов:

#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This script is a basic example script to get resource information about NVIDIA GPUs.
# It assumes the drivers are properly installed and the nvidia-smi command is available.
# It is not guaranteed to work on all setups so please test and customize as needed
# for your environment. It can be passed into SPARK via the config
# spark.{driver/executor}.resource.gpu.discoveryScript to allow the driver or executor to discover
# the GPUs it was allocated. It assumes you are running within an isolated container where the
# GPUs are allocated exclusively to that driver or executor.
# It outputs a JSON formatted string that is expected by the
# spark.{driver/executor}.resource.gpu.discoveryScript config.
#
# Example output: {"name": "gpu", "addresses":["0","1","2","3","4","5","6","7"]}

ADDRS=`nvidia-smi --query-gpu=index --format=csv,noheader | sed -e ':a' -e 'N' -e'$!ba' -e 's/\n/","/g'`
echo {\"name\": \"gpu\", \"addresses\":[\"$ADDRS\"]}

Datasets

Еще одна директория — datasets. В ней хранятся файлы *.csv и *.parquet, которые в последующем будут использованы в приложениях как обучающие и валидирующие датасеты. Взять можно здесь.

Spark config files

Пройдемся по всем директориям, в которых нужно внести изменения.

Все рабочие конфиги представлены в репозитории примера.

Директория conf

Директория содержит шаблоны конфигов. Задействовать каждый можно путем копирования шаблона в ту же директорию и удаления ».template» в имени файла:

Таким образом, редактируется файл spark-defaults.conf:

spark.master                     spark://192.168.5.129:7077
spark.executor.memory            2g
spark.executor.cores             4

spark.worker.resource.gpu.amount 1
spark.worker.resource.gpu.discoveryScript /opt/sparkRapidsPlugin/getGpusResources.sh

spark.executorEnv.NCCL_DEBUG     INFO

spark-env.sh:

#!/usr/bin/env bash

# Options for the daemons used in the standalone deploy mode
SPARK_MASTER_HOST="192.168.5.129"
SPARK_MASTER_PORT="7077"
SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=1 -Dspark.worker.resource.gpu.discoveryScript=/opt/sparkRapidsPlugin/getGpusResources.sh -Dspark.rapids.memory.pinnedPool.size=2G -Dspark.executor.resource.gpu.amount=1 -Dspark.executorEnv.NCCL_DEBUG=INFO""

Docker

Следуя документации Spark, следующим шагом должен быть запуск скрипта для создания Docker-образов:

$ ./bin/docker-image-tool.sh -r  -t my-tag build

Подготовленный Dockerfile необходимо изменить до вида:

ARG java_image_tag=11-jre-slim

# проставляется в команде docker-build
FROM ${java_image_tag} 

ARG spark_uid=1001
ARG UID_GID=1001
ENV UID=${UID_GID}
ENV GID=${UID_GID}

ENV SPARK_RAPIDS_DIR=/opt/sparkRapidsPlugin
ENV SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-22.06.0.jar
# old
#ENV SPARK_CUDF_JAR=${SPARK_RAPIDS_DIR}/cudf-22.04.0-cuda11.jar
#ENV SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-22.04.0.jar

RUN set -ex && \
    sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools iptables sudo \
    wget software-properties-common build-essential libnss3-dev zlib1g-dev libgdbm-dev libncurses5-dev \
    libssl-dev libffi-dev libreadline-dev libsqlite3-dev libbz2-dev python3 && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/examples && \
    mkdir -p /opt/spark/conf && \
    mkdir -p /opt/spark/work-dir && \
    mkdir -p /opt/sparkRapidsPlugin && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd
RUN apt-get install libnccl2 libnccl-dev -y --allow-change-held-packages && rm -rf /var/cache/apt/*

COPY jars /opt/spark/jars
COPY rapidsNew /opt/sparkRapidsPlugin
# old
#COPY rapids /opt/sparkRapidsPlugin
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
COPY datasets /opt/spark/

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh

# USER
RUN groupadd --gid $UID appuser && useradd --uid $UID --gid appuser --shell /bin/bash --create-home appuser
RUN mkdir /var/logs && chown -R appuser:appuser /var/logs
RUN mkdir /opt/spark/logs && chown -R appuser:appuser /opt/spark/
RUN chown -R appuser:appuser /tmp

RUN ls -lah /home/appuser
RUN touch /home/appuser/.bashrc

RUN echo -e '\
export SPARK_HOME=/opt/spark\n\
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin\
' > /home/appuser/.bashrc

RUN chown -R appuser:appuser /home/appuser

# Specify the User that the actual main process will run as
USER ${spark_uid}

EXPOSE 4040
EXPOSE 8081

ENTRYPOINT [ "/opt/entrypoint.sh" ]

entrypoint.sh:

#!/bin/bash

SPARK_DRIVER_BIND_ADDRESS=192.168.5.129:7077 # spark master address
NCCL_DEBUG=INFO

source ~/.bashrc

start-worker.sh spark://$SPARK_DRIVER_BIND_ADDRESS
tail -f /dev/null

SPARK_DRIVER_BIND_ADDRESS — адрес Spark Master, в моем случае — это адрес локальной виртуальной машины. Для дебага возможных неисправностей в ходе работы с библиотекой nccl следует выставить уровень дебага INFO. Командой start-worker.sh spark://$SPARK_DRIVER_BIND_ADDRESS запускается воркер, подключаясь к мастеру.

Исходники kubernetes/dockerfiles/Dockerfile и содержимое той же директории можно найти в репозитории примера.

Сборка образа и запуск контейнера:

cd spark
docker build -f kubernetes/dockerfiles/spark/Dockerfile --build-arg java_image_tag=localhost:5000/cuda-jdk8:v1 -t localhost:5000/cuda-jdk8-spark:v1 .
docker run --memory="6g" --cpus="4" --gpus all --name=cuda-jdk8-spark -p 8081:8081 -it -d localhost:5000/cuda-jdk8-spark:v1

Следует убедиться, что в образ установилась библиотека nccl корректной версии, для чего нужно экзекнуться в созданный контейнер Spark Worker и выполнить:

$ dpkg -l | grep nccl
ii  libnccl-dev                     2.12.12-1+cuda11.7                amd64        NVIDIA Collective Communication Library (NCCL) Development Files
ii  libnccl2                        2.12.12-1+cuda11.7                amd64        NVIDIA Collective Communication Library (NCCL) Runtime

на момент написания статьи и тестирования корректной версией является 2.12.12-1+cuda11.7. В версиях ниже может встречаться проблема с запуском задач XGBoost, т.к. nccl не может найти сетевое устройство по причине того, что в докер-контейнере оно является виртуальным.

Проверяем доступность воркера, путем открытия его WEB GUI по адресу localhost:8081 (в соответствии с командой docker run выше):

c338d44dc8521c40052c907b6ee4e1d9.png

Видим, что помимо Cores и Memory, доступен ресурс Resources: gpu. На моей локальной машине одно устройство, и его id обозначен в массиве как »0».

Проверяем WEB GUI мастера (адрес моей локальной виртуальной машины http://192.168.5.129:8080/):

209c069674b131c516ffb44ee2ad5391.png

Spark Worker, запущенный в контейнере, появился в списке Workers. Можно переходить к приложению.

Разработка и запуск приложения

В текущем разделе рассматривается пример работы простого веб-сервиса, являющимся так же Spark Driver. Приложение будет иметь 3 HTTP Endpoint«a, на каждом будет доступен пример одной из библиотек: DJL, Spark ML, XGBoost.

Каркас приложения

В виде каркаса приложения используется Spring Boot с зависимость spring-boot-starter-web, используемый JDK — OpenJDK 8 (держим в уме требование Rapids). Я создаю новый проект в тот момент, когда пишу эту статью, поэтому финальный результат также должен заработать у читателя при условии выполнения предварительных шагов, описанных выше.

380af36f522d3a4e9253f51a00a5c0d9.png

Структура проекта:

d8a0606f79b0c64580d211297da25732.PNG

Файл pom.xml можно посмотреть в репозитории, я остановлюсь на важном моменте. Для работы XGBoost на Windows с WSL2 в Docker контейнере проведено детальное обследование проблемы в GitHub Issue.

На данный момент версия библиотеки XGBoost, используемая в данном примере, не имеет релизной версии, поэтому в Maven central она отсутствует. Для загрузки библиотеки нужно добавить в pom.xml репозиторий с версиями SNAPSHOT:


    
        XGBoost4J Snapshot Repo
        XGBoost4J Snapshot Repo
        https://s3-us-west-2.amazonaws.com/xgboost-maven-repo/snapshot/
    

Однако, есть нюанс. Доступа с Российских и Казахстанских IP (с других не проверялось) к данному репо с недавнего времени нет. Варианты: либо VPN, либо воспользоваться репозиторием проекта и загрузить джарники xgboost4j-gpu_2.12–2.0.0-SNAPSHOT.jar и xgboost4j-spark-gpu_2.12–2.0.0-SNAPSHOT.jar в локальный m2 репозиторий:

a813f8e38f6c78b97b7cca26b2c45c93.pngf19fd9508f177c10e333b224f27be997.png36734a168f4a2ca534b66741da6eefab.png

Также эти джарники необходимо загрузить в директорию jars проекта (см. скрин выше). Данные *.jar файлы будут переданы в Spark Executor как зависимости для запуска кода драйвера. Список таких файлов описывается в SparkConfiguration:

package com.mlwebservice.config;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetAddress;
import java.net.UnknownHostException;

@Configuration
public class SparkConfiguration {
    @Value("${spring.application.name}")
    private String appName;
    @Value("${spark.masterHost}")
    private String masterHost;

    @Bean
    public JavaSparkContext javaSparkContext() throws UnknownHostException {
        String host = InetAddress.getLocalHost().getHostAddress();
        SparkConf sparkConf = new SparkConf(true)
                .setAppName(appName)
                .setMaster("spark://" + masterHost)
                .setJars(new String[]{
                        "service.jar",
                        "jars/config-1.4.1.jar",
                        "jars/rapids-4-spark_2.12-22.06.0.jar",
                        "jars/spark-nlp_2.12-3.4.1.jar",
                        "jars/xgboost4j-gpu_2.12-2.0.0-SNAPSHOT.jar",
                        "jars/xgboost4j-spark-gpu_2.12-2.0.0-SNAPSHOT.jar"})
                // Spark settings
                .set("spark.worker.cleanup.enabled", "true")
                // executors
                .set("spark.executor.cores", "4")
                .set("spark.executor.memory", "2g")
                .set("spark.executor.resource.gpu.amount", "1")
                .set("spark.executorEnv.NCCL_DEBUG", "INFO")
                .set("spark.task.resource.gpu.amount", "1")
                // driver
                .set("spark.ui.enabled", "true")
                .set("spark.ui.port", "4040")
                .set("spark.driver.host", host)
                .set("spark.driver.bindAddress", host)
                .set("spark.driver.blockManager.port", "45029")
                .set("spark.driver.port", "33139")
                .set("spark.port.maxRetries", "16")
                .set("spark.driver.maxResultSize", "2g")
                .set("spark.executor.heartbeatInterval", "200000")
                .set("spark.network.timeout", "300000")
                // rapids
                .set("spark.rapids.memory.gpu.pooling.enabled", "false")
                .set("spark.rapids.memory.gpu.minAllocFraction", "0.0001")
                .set("spark.rapids.memory.gpu.reserve", "2")
                .set("spark.rapids.sql.enabled", "true")
                .set("spark.sql.adaptive.enabled", "false")
                .set("spark.rapids.sql.explain", "ALL")
                .set("spark.rapids.sql.hasNans", "false")
                .set("spark.rapids.sql.csv.read.float.enabled", "true")
                .set("spark.rapids.sql.castFloatToString.enabled", "true")
                .set("spark.rapids.sql.csv.read.double.enabled", "true")
                .set("spark.rapids.sql.castDoubleToString.enabled", "true")
                .set("spark.rapids.sql.exec.CollectLimitExec", "true")
                .set("spark.locality.wait", "0s")
                .set("spark.sql.files.maxPartitionBytes", "512m")
                .set("spark.plugins", "com.nvidia.spark.SQLPlugin")
                .set("spark.driver.extraClassPath", "/opt/sparkRapidsPlugin/rapids-4-spark_2.12-22.06.0.jar");
        return new JavaSparkContext(sparkConf);
    }

    @Bean
    public SparkSession sparkSession(JavaSparkContext context) {
        return SparkSession.builder()
                .master("spark://" + masterHost)
                .appName(appName)
                .config(context.getConf())
                .config("spark.executorEnv.NCCL_DEBUG", "INFO")
                .getOrCreate();
    }
}

Параметров конфигурации Spark очень много, подробнее с ними можно ознакомиться на странице Configuration — Spark 3.3.0 Documentation .

Контроллер максимально простой, он содержит три сервиса, каждый сервис реализует по 1–2 метода каждой библиотеки. Обращаю внимание, что данный контроллер является инструментом запуска соответствующего примера, сделанный в угоду скорости и самому факту, что несколько технологий можно объединить в приложении Spring, и никак не претендует на использование приложения в продуктивной среде. Для реального приложения здесь должны быть как минимум другие HTTP глаголы, обработчики сообщений, информативные DTO, асинхронные операции, брокеры сообщений для потоков данных, реактивщины, вебсокеты и вот это вот все.

package com.mlwebservice.controller;

import ai.djl.translate.TranslateException;
import com.mlwebservice.service.DJLService;
import com.mlwebservice.service.RapidsService;
import com.mlwebservice.service.SparkMLService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;

@RestController
@RequestMapping("/")
@RequiredArgsConstructor
public class MLController {
    private final DJLService djlService;
    private final SparkMLService sparkMLService;
    private final RapidsService rapidsService;

    @GetMapping("/djl")
    public ResponseEntity djl() {
        try {
            djlService.mlWork();
        } catch (TranslateException | IOException e) {
            return ResponseEntity.status(500).body(e.getMessage());
        }
        return ResponseEntity.ok().build();
    }

    @GetMapping("/forest")
    public ResponseEntity sparkML() {
        sparkMLService.randomForestTest();
        return ResponseEntity.ok().build();
    }

    @GetMapping("/gpu_test")
    public ResponseEntity rapidsGpuTest() {
        rapidsService.testRapids();
        return ResponseEntity.ok().build();
    }

    @GetMapping("/xgboost")
    public ResponseEntity rapidsXGBoost() {
        rapidsService.xgBoost();
        return ResponseEntity.ok().build();
    }
}

Deep Java Library — DJL

Первая библиотека на очереди — DJL. Это удобная библиотека машинного обучения для языка Java, особенностью которой является зоопарк моделей (Model Zoo), позволяющий получить готовую модель по описываемым параметрам из списка доступных моделей. Также имеется возможность создать свою модель, сохранить на диск и загрузить для дальнейшего использования.

В данном примере рассматривается реализация модели линейной регрессии. К сожалению, в виду архитектурной особенности данной модели, распараллелить процесс ее обучения довольно сложно, и, вероятно, решается в определенных случаях определенными движками, такими как PyTorch. По крайней мере, распараллеливание обучения с помощью Spark модели линейной регрессии мне не попалось, и быстро сам придумать реализацию также не смог. Однако, есть распространенный пример применения Spark в паре с DJL для классификации изображений с использованием модели из Model Zoo, например данная статья.

Реализация модели линейной регрессии сделана на основе статей 3.2. Linear Regression Implementation from Scratch — Dive into Deep Learning 0.1.0 documentation и 3.3. Concise Implementation of Linear Regression — Dive into Deep Learning 0.1.0 documentation и отображена в сервисе DJLService.

В целях отладки в main методе приложения логируется вызов нескольких методов, с помощью которых легко опознать некорректность конфигурации приложения. При корректной конфигурации должен вывестись лог вида:

2022-07-18 19:38:45.346  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : Initializing DJL lib...
2022-07-18 19:38:45.349  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : CPU: cpu()
2022-07-18 19:38:45.349  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : GPU: gpu(0)
2022-07-18 19:38:45.439  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : CUDA available: true
2022-07-18 19:38:45.440  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : CUDA GPU count: 1
OpenJDK 64-Bit Server VM warning: You have loaded library /root/.djl.ai/pytorch/1.11.0-20220510-cu113-linux-x86_64/libtorch_cpu.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c ', or link it with '-z noexecstack'.
2022-07-18 19:38:45.739  INFO 1 --- [           main] ai.djl.pytorch.engine.PtEngine           : Number of inter-op threads is 8
2022-07-18 19:38:45.740  INFO 1 --- [           main] ai.djl.pytorch.engine.PtEngine           : Number of intra-op threads is 8
2022-07-18 19:38:45.740  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : GPU count: 1
2022-07-18 19:38:45.741  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : Engine: PyTorch:1.11.0, capabilities: [
CUDA,
CUDNN,
OPENMP,
MKL,
MKLDNN,
]
PyTorch Library: /root/.djl.ai/pytorch/1.11.0-20220510-cu113-linux-x86_64

Код примера:

package com.mlwebservice.service;

import ai.djl.Model;
import ai.djl.metric.Metrics;
import ai.djl.ndarray.NDArray;
import ai.djl.ndarray.NDManager;
import ai.djl.ndarray.types.Shape;
import ai.djl.nn.Block;
import ai.djl.nn.ParameterList;
import ai.djl.nn.SequentialBlock;
import ai.djl.nn.core.Linear;
import ai.djl.training.DefaultTrainingConfig;
import ai.djl.training.EasyTrain;
import ai.djl.training.Trainer;
import ai.djl.training.dataset.ArrayDataset;
import ai.djl.training.dataset.Batch;
import ai.djl.training.listener.TrainingListener;
import ai.djl.training.loss.Loss;
import ai.djl.training.optimizer.Optimizer;
import ai.djl.training.tracker.Tracker;
import ai.djl.translate.TranslateException;
import com.mlwebservice.model.DataPoints;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;

@Slf4j
@Service
public class DJLService {

    public void mlWork() throws TranslateException, IOException {
        // Generating the Dataset
        NDManager manager = NDManager.newBaseManager();

        NDArray trueW = manager.create(new float[]{2, -3.4f});
        float trueB = 4.2f;

        DataPoints dp = DataPoints.syntheticData(manager, trueW, trueB, 1000);
        NDArray features = dp.getX();
        NDArray labels = dp.getY();

        // Reading dataset
        int batchSize = 10;
        ArrayDataset dataset = loadArray(features, labels, batchSize, false);

        // mini test
        Batch testBatch = dataset.getData(manager).iterator().next();
        NDArray X = testBatch.getData().head();
        NDArray y = testBatch.getLabels().head();
        log.info("X = {}", X);
        log.info("y = {}", y);
        testBatch.close();

        // Defining the model
        Model model = Model.newInstance("lin-reg");

        SequentialBlock net = new SequentialBlock();
        Linear linearBlock = Linear.builder().optBias(true).setUnits(1).build();
        net.add(linearBlock);

        model.setBlock(net);

        // Defining the Loss function
        Loss l2loss = Loss.l2Loss();

        // Defining the Optimization Algorithm
        Tracker lrt = Tracker.fixed(0.03f);
        Optimizer sgd = Optimizer.sgd().setLearningRateTracker(lrt).build();

        // Instantiate Configuration and Trainer
        DefaultTrainingConfig config = new DefaultTrainingConfig(l2loss)
                .optOptimizer(sgd) // Optimizer (loss function)
                .optDevices(manager.getEngine().getDevices(1)) // single GPU
//                .addTrainingListeners(TrainingListener.Defaults.logging()); // Logging
                .addTrainingListeners(TrainingListener.Defaults.basic()); // Without logging for increase speed

        Trainer trainer = model.newTrainer(config);

       log.info("Trainer devices: {}", Arrays.toString(trainer.getDevices()));

        // Initializing Model Parameters
        // First axis is batch size - won't impact parameter initialization
        // Second axis is the input size
        trainer.initialize(new Shape(batchSize, 2));

        // Metrics
        Metrics metrics = new Metrics();
        trainer.setMetrics(metrics);

        // Training
        int numEpochs = 30;

        long startTime = System.currentTimeMillis();

        for (int epoch = 1; epoch <= numEpochs; epoch++) {
            // Iterate over dataset
            for (Batch batch : trainer.iterateDataset(dataset)) {
                // Update loss and evaulator
                EasyTrain.trainBatch(trainer, batch);

                // Update parameters
                trainer.step();

                batch.close();
            }
            // reset training and validation evaluators at end of epoch
            trainer.notifyListeners(listener -> listener.onEpoch(trainer));
        }

        Block layer = model.getBlock();
        ParameterList params = layer.getParameters();
        NDArray wParam = params.valueAt(0).getArray();
        NDArray bParam = params.valueAt(1).getArray();

        long endTime = System.currentTimeMillis();

        float[] w = trueW.sub(wParam.reshape(trueW.getShape())).toFloatArray();
        log.info("Error in estimating w: [{} {}]", w[0], w[1]);
        log.info("Error in estimating b: {}", trueB - bParam.getFloat());

        log.info("Training time: " + (endTime - startTime) + " ms");

        // Save the model
        Path modelDir = Paths.get("models/lin-reg");
        Path savedDir = Files.createDirectories(modelDir);
        model.setProperty("Epoch", Integer.toString(numEpochs)); // save epochs trained as metadata
        model.save(modelDir, "lin-reg");
       log.info("Model saved in " + savedDir.toAbsolutePath());
    }

    // Save in the file for later use
    public ArrayDataset loadArray(NDArray features, NDArray labels, int batchSize, boolean shuffle) {
        return new ArrayDataset.Builder()
                .setData(features) // set the features
                .optLabels(labels) // set the labels
                .setSampling(batchSize, shuffle) // set the batch size and random sampling
                .build();
    }
}

Модель DataPoints:

package com.mlwebservice.model;

import ai.djl.ndarray.NDArray;
import ai.djl.ndarray.NDManager;
import ai.djl.ndarray.types.DataType;
import ai.djl.ndarray.types.Shape;

public class DataPoints {
    private final NDArray x;
    private final NDArray y;

    public DataPoints(NDArray x, NDArray y) {
        this.x = x;
        this.y = y;
    }

    public NDArray getX() {
        return x;
    }

    public NDArray getY() {
        return y;
    }

    // Generate y = X w + b + noise
    public static DataPoints syntheticData(NDManager manager, NDArray w, float b, int numExamples) {
        NDArray x = manager.randomNormal(new Shape(numExamples, w.size()));
        NDArray y = x.matMul(w).add(b);
        // Add noise
        y = y.add(manager.randomNormal(0, 0.01f, y.getShape(), DataType.FLOAT32));
        return new DataPoints(x, y);
    }
}

Результат выполнения:

2022-07-18 20:29:27.461  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : X = ND: (10, 2) gpu(0) float32
[[ 0.7017, -0.7652],
 [ 2.495 , -0.3341],
 [-2.175 , -0.452 ],
 [ 1.1075,  0.8347],
 [-1.8369, -0.7469],
 [ 0.5647,  2.1323],
 [-0.2754,  0.3807],
 [ 0.2902,  1.5136],
 [-0.5902,  0.6777],
 [ 0.4059, -1.0304],
]
2022-07-18 20:29:27.473  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : y = ND: (10) gpu(0) float32
[ 8.1976, 10.324 ,  1.3922,  3.5564,  3.0556, -1.9248,  2.3501, -0.361 ,  0.7023,  8.4904]
2022-07-18 20:29:27.491  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : Trainer devices: [gpu(0)]
2022-07-18 20:29:34.665  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : Error in estimating w: [-4.7445297E-5 -1.2493134E-4]
2022-07-18 20:29:34.670  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : Error in estimating b: 1.9073486E-4
2022-07-18 20:29:34.670  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : Training time: 7112 ms
2022-07-18 20:29:34.676  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : Model saved in /usr/src/app/models/lin-reg

Spark ML

Существует замечательная документация для начинающих от nVidia по работе со Spark ML на примере модели Random Forest. Учитывая специфику данной модели, процесс обучения можно распараллелить на несколько исполнителей, а затем пользоваться либо средним значением в случае решения задач регрессии, либо голосованием по большинству в случае решения задач классификации. Подробнее можно почитать на хабре, в документации Spark, примеры кода Spark ML также можно посмотреть в документации.

В данном примере понадобятся датасеты для тренировки и валидации, можно взять отсюда, либо воспользоваться кодом репозитория. Отмечу, в данном разделе не полностью переписан пример из статьи nVidia по Spark ML, а скорее является реализацией задачи из статьи nVidia по XGBoost, но с применением Random Forest из Spark ML. Датасеты копируются в сценарии Dockerfile, а в сервисе пути к ним хардкодятся (пример же, можно себе позволить).

Обращаю внимание: в статье по Spark ML говорится, что только XGBoost поддерживает GPU-ускорение в Spark ML. Вполне может быть, что документация устарела (как писали в одном из Issue на GitHub) и в данный момент, так как в документации Rapids указывается репозиторий с еще как минимум одним примером для алгоритма Principal component analysis (PCA).

Код сервиса:

package com.mlwebservice.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Service;

import static org.apache.spark.sql.functions.col;

@Slf4j
@Service
@RequiredArgsConstructor
public class SparkMLService {
    private final SparkSession session;

    public void randomForestTest() {
        String trainPath = "/opt/spark/train/train.parquet";
        //test
        String evalPath = "/opt/spark/eval/eval.parquet";

        Dataset tdf = session.read().parquet(trainPath);
        Dataset edf = session.read().parquet(evalPath);

        String labelName = "fare_amount";
        String[] featureColumns = {"passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "rate_code",
                "dropoff_longitude", "dropoff_latitude", "hour", "day_of_week", "is_weekend", "h_distance"};

        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(featureColumns)
                .setOutputCol("rawfeatures");

        StandardScaler standardScaler = new StandardScaler()
                .setInputCol("rawfeatures")
                .setOutputCol("features")
                .setWithStd(true);

        RandomForestRegressor regressor = new RandomForestRegressor()
                .setLabelCol(labelName)
                .setFeaturesCol("features");

        Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{assembler, standardScaler, regressor});

        ParamMap[] paramGrid = new ParamGridBuilder()
                .addGrid(regressor.maxBins(), new int[]{100, 200})
                .addGrid(regressor.maxDepth(), new int[]{2, 7, 10})
                .addGrid(regressor.numTrees(), new int[]{5, 20})
                .build();

        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setLabelCol(labelName)
                .setPredictionCol("prediction")
                .setMetricName("rmse");

        CrossValidator crossvalidator = new CrossValidator()
                .setEstimator(pipeline)
                .setEvaluator(evaluator)
                .setEstimatorParamMaps(paramGrid)
                .setNumFolds(3);

        CrossValidatorModel pipelineModel = crossvalidator.fit(tdf);

        ParamMap[] bestEstimatorParamMap = pipelineModel.getEstimatorParamMaps();

        log.info("best params map {}", bestEstimatorParamMap);

        Dataset predictions = pipelineModel.transform(edf);
        Dataset result = predictions.withColumn("error", col("prediction").minus(col(labelName)));
        result.select(labelName, "prediction", "error").show();
        result.describe(labelName, "prediction", "error").show();

        RegressionEvaluator maevaluator = new RegressionEvaluator()
                .setLabelCol(labelName)
                .setMetricName("mae");
        log.info("mae evaluation: {}", maevaluator.evaluate(predictions));

        RegressionEvaluator rmsevaluator = new RegressionEvaluator()
                .setLabelCol(labelName)
                .setMetricName("rmse");
        log.info("rmse evaluation: {}", rmsevaluator.evaluate(predictions));
    }
}

Rapids и XGBoost

Последним примером является реализация примера из статьи nVidia по XGBoost, который использует и Spark, и Rapids вместе. Данный пример является самым интересным, так как обеспечивает действительно лучшую скорость вычислений по сравнению со Spark ML Random Forest.

Кроме того, в документации Rapids первым примером рассматривается операция Join двух датафреймов из 10 млн чисел. Данный пример также реализован в тестовом методе сервиса RapidsService:

@Slf4j
@Service
@RequiredArgsConstructor
public class RapidsService {
    private final SparkSession session;

    public void testRapids() {
        int capacity = 1000000;
        List list = new ArrayList<>(capacit
    
            

© Habrahabr.ru