Масштабируемая Big Data система в Kubernetes с использованием Spark и Cassandra

В предыдущей статье я рассказал, как организовать систему распределенного машинного обучения на GPU NVidia, используя язык Java с фреймворками Spring, Spark ML, XGBoost, DML в standalone кластере Spark. Особенностью поставленной задачи являлось организация системы под управлением ОС Windows 10 Pro, в Docker-контейнерах. Эксперимент оказался не вполне успешным. В данной статье я покажу, как воспользоваться имеющимися наработками и запустить Spark Jobs в Kubernetes в режимах client и cluster, опишу особенности работы с Cassandra в Spark, покажу пример обучения модели и ее дальнейшего использования. В этот раз буду использовать язык Kotlin. Репозиторий с кодом доступен на GitLab.

Данная статья представляет интерес для тех, кто интересуется системами Big Data и стремится создать систему, позволяющую, в том числе, выполнять задачи распределенного машинного обучения на Spark в Kubernetes, используя GPU NVidia и Cassandra для хранения данных.

Версии библиотек и фреймворков

Первое, что нужно принять во внимание при построении подобной системы — она довольно сложная, с рядом элементов, которые не совместимы друг с другом. Так как используется Rapids, использовать Boot 3 не получится — для него требуется Java 17, и, хоть Spark и поддерживает данную версию, ускоритель GPU-вычислений Rapids на момент написания статьи не поддерживает данную версию в полной мере. Скорее всего, Java 17 будет добавлена в версии, которая также обзаведется поддержкой Spark 3.4.0. По этой причине, последний Spark (на момент написания статьи 3.4.0) брать за основу не стоит. К тому же, DataStax так же не обновила свой cassandra connector, и данная версия не поддерживается.

Версии компонентов, с которыми система заработала:

  • JDK 8

  • Spring Boot 2.7.11

  • Spark 3.3.2

  • NVidia Rapids 23.04.0

  • Cassandra 4.1.1

  • PostgreSQL 15.3–1.pgdg110+1

  • scala-library 2.12.15

  • spark-cassandra-connector_2.12 3.3.0

  • com.fasterxml.jackson.core 2.13.5

На уровне инфраструктуры:

  • Kubernetes 1.26.3

  • ContainerD 1.7.0

  • NVidia GPU Operator 0.13.0

  • NVidia Driver 530.30.02–1 (ставится с CUDA)

  • CUDA — желательно не ниже 11.8 в базовом образе NVidia, в последних же драйверах по умолчанию 12+

  • Ubuntu 22.04.2

Конфигурация стенда:

Узел

CPU

RAM

GPU

Адрес в ЛВС

master1

Intel i7–2700k

16 Gb

NVidia GTX 1650 4 Gb

192.168.0.150

worker1

AMD 3800x

32 Gb

NVidia RTX 2600 6 Gb

192.168.0.125

Особенности

  1. Не использовать spring-boot-starter-parent. Он тянет за собой множество библиотек, которые конфликтуют с библиотеками Spark. В том числе, для последующего перехода на Java 17 (выпустят же когда-нибудь разработчики Rapids библиотеку, поддерживающую данную версию) лучше не использовать Tomcat, и использовать Undertow (на момент написания статьи были проблемы с classloader’ами Rapids и Tomcat, а на Undertow успешно запустилось). Но и тут есть особенности: необходимо подключить корректную версию Jackson (указана выше) и ряд библиотек jakarta и javax servlet:

pom.xml


  org.glassfish.web
  jakarta.servlet.jsp.jstl
  3.0.1


  jakarta.servlet
  jakarta.servlet-api
  6.0.0
  provided

  
  jakarta.servlet.jsp.jstl
  jakarta.servlet.jsp.jstl-api
  3.0.0


  javax.servlet
  javax.servlet-api
  4.0.1
  provided


  ch.qos.logback
  logback-classic
  1.2.9
  1. Подключить scala-library 2.12.15. Зависимость spark-cassandra-connector_2.12 3.3.0 использует версию 2.12.11, а Spark 3.3.2 — 2.12.15. Если не заменить, будут конфликты SerialVersionUID. Чтобы заменить, в pom.xml просто нужно поставить scala-library перед spark-cassandra-connector.

  2. Могут быть другие нюансы совместимости компонентов, следует быть готовым к отладке. Полностью рабочую конфигурацию можно посмотреть в репозитории.

Подготовка инфраструктуры

В этот раз постараюсь этот вопрос изложить покороче, но на важных моментах остановлюсь подробнее. Подготовка образа executor подробно описана в предыдущей статье, но в этой сам метод сборки будет отличаться. В первую очередь нужно подготовить кластер Kubernetes. Описывать установку не буду, скажу лишь, что взял KubeSpray и раскатал 1.26.3.

Nvidia drivers

Выбрать последнюю Cuda, драйвер поставится автоматом:

install CUDA and NVidia driver

wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-ubuntu2204.pin
sudo mv cuda-ubuntu2204.pin /etc/apt/preferences.d/cuda-repository-pin-600
wget https://developer.download.nvidia.com/compute/cuda/12.1.0/local_installers/cuda-repo-ubuntu2204-12-1-local_1
sudo dpkg -i cuda-repo-ubuntu2204-12-1-local_12.1.0-530.30.02-1_amd64.deb
sudo cp /var/cuda-repo-ubuntu2204-12-1-local/cuda-*-keyring.gpg /usr/share/keyrings/
sudo apt-get update
sudo apt-get -y install cuda

Выполнить шаги установки, перезагрузить машину, проверить:

nvidia-smi

nvidia-smi
Sun Mar 19 20:56:18 2023       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 530.30.02              Driver Version: 530.30.02    CUDA Version: 12.1     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                  Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf            Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|=========================================+======================+======================|
|   0  NVIDIA GeForce GTX 1650         On | 00000000:02:00.0 Off |                  N/A |
|  0%   47C    P2               N/A /  75W|    332MiB /  4096MiB |      2%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                                         
+---------------------------------------------------------------------------------------+
| Processes:                                                                            |
|  GPU   GI   CI        PID   Type   Process name                            GPU Memory |
|        ID   ID                                                             Usage      |
|=======================================================================================|
|    0   N/A  N/A      1346      G   /usr/lib/xorg/Xorg                           62MiB |
|    0   N/A  N/A      1905    C+G   ...libexec/gnome-remote-desktop-daemon      132MiB |
|    0   N/A  N/A      2013      G   /usr/bin/gnome-shell                        131MiB |
+---------------------------------------------------------------------------------------+

Донастроить:

nvidia-container-toolkit

sudo -i
apt-get update && apt-get install -y nvidia-container-toolkit

distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list

sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit
sudo systemctl restart docker

nvidia-ctk runtime configure --runtime=docker

Не обязательно: запустить для проверки контейнеры со спарк воркерами (можно взять из прошлой статьи).

Containerd

Выполнить:

nano /etc/containerd/config.toml

Отредактировать:

config.toml

version = 2
root = "/var/lib/containerd"
state = "/run/containerd"
oom_score = 0

[grpc]
  max_recv_message_size = 16777216
  max_send_message_size = 16777216

[debug]
  level = "info"

[metrics]
  address = ""
  grpc_histogram = false

[plugins]
  [plugins."io.containerd.grpc.v1.cri"]
    sandbox_image = "registry.k8s.io/pause:3.8"
    max_container_log_line_size = -1
    enable_unprivileged_ports = false
    enable_unprivileged_icmp = false
    [plugins."io.containerd.grpc.v1.cri".containerd]
      default_runtime_name = "nvidia"
      snapshotter = "overlayfs"

      [plugins."io.containerd.grpc.v1.cri".containerd.runtimes]
        [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia]
          privileged_without_host_devices = false
          runtime_engine = ""
          runtime_root = ""
          runtime_type = "io.containerd.runc.v2"

          [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia.options]
            BinaryName = "/usr/bin/nvidia-container-runtime"

        [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc]
          runtime_type = "io.containerd.runc.v2"
          runtime_engine = ""
          runtime_root = ""
          base_runtime_spec = "/etc/containerd/cri-base.json"

          [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options]
            systemdCgroup = true

    [plugins."io.containerd.grpc.v1.cri".registry]
      [plugins."io.containerd.grpc.v1.cri".registry.mirrors]
        [plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io"]
          endpoint = ["https://registry-1.docker.io"]
      [plugins."io.containerd.grpc.v1.cri".registry.configs]
        [plugins."io.containerd.grpc.v1.cri".registry.configs."registry-1.docker.io".auth]
            username = "login"
            password = "password"
            auth = ""
            identitytoken = ""

Обращаю внимание на последнюю секцию — в ней указывается ваш репозиторий, здесь я привел пример с публичным Docker Hub "registry-1.docker.io".

Перезапустить сервис:

sudo systemctl restart containerd

Kubernetes

Начиная с версии Spark 3.1.1, поддержка Spark в Kubernetes доведена production-ready. Это означает, что появлилась возможность использовать Kubernetes как менеджер кластера Spark, указывая в качестве мастера Kube API Server:

k8s://https://${KUBE_API_SERVER}:6443

или

k8s://https://kubernetes.default.svc

Теперь, при запуске Spark Job, в кластере K8S будут запускаться Spark Executors с указанным Docker образом, ресурсами и прочими необходимыми конфигами. Если образ уже находится в локальном репозитории машины, POD поднимается довольно быстро, на моих машинах в течении 5 секунд после старта приложения поднимались 2 экзекуктора, полностью готовых к работе.

В связке со Spring приложением есть возможность создать Spring Bean с JavaSparkContext/SparkSession. Однако, есть другой способ: создавать SparkSession под каждую Spark Job, чтобы высвобождать ресурсы кластера. Это хорошо для редких задач по расписанию / запросу, в том числе запросам с различающимся количеством необходимых ресурсов (количество Spark Executors, CPU, память, видеопамять и т.п.), но имеет накладные расходы на поднятие экзекуторов и время выполнения задач. На моей практике все команды для работы со Spark требовали «прогрева», т.е. каждая первая операция выполнялась дольше, чем все последующие. Стоит иметь это в виду, и, если необходимо постоянно запускать одинаковые задачи по расписанию, имеет смысл зарезервировать ресурсы кластера под спарковые экзекуторы.

Nvidia GPU Operator

Для использования ресурсов GPU внутри кластера Kubernetes необходимо установить Nvidia GPU Operator. Следует выполнить:

Install GPU Operator

kubectl create -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.13.0/nvidia-device-plugin.yml

helm repo add nvidia https://nvidia.github.io/gpu-operator
helm repo update

kubectl create ns gpu-operator
     
helm install --wait gpu-operator \
     -n gpu-operator \
     nvidia/gpu-operator \
     --set operator.defaultRuntime=containerd

Проверить работу можно следующим образом:

kubectl run gpu-test \
     --rm -t -i \
     --restart=Never \
     --image=nvcr.io/nvidia/cuda:10.1-base-ubuntu18.04 nvidia-smi

В случае успеха вернется результат nvidia-smi:

1ff34ad5c576efa259426b5a8879378e.png

В целях проверки можно запустить deployment с двумя репликами заготовленного образа Spark executor:

executor deployment

---
kind: Deployment
apiVersion: apps/v1
metadata:
  name: cuda-example
spec:
  replicas: 2
  selector:
    matchLabels:
      app: cuda-example
  template:
    metadata:
      labels:
        app: cuda-example
    spec:
      containers:
        - name: cuda-jdk8
          image: repo/cuda-jdk8-spark-3.3.2:v2
          resources:
            limits:
              cpu: 2
              memory: 4Gi
              nvidia.com/gpu: 1
            requests:
              cpu: "200m"
              memory: 0.5Gi
              nvidia.com/gpu: 1

на каждой машине должно подняться по одному поду, и, если экзекнуться в них, команда nvidia-smi должна вывести результат. Обращаю внимание, что для запуска экзекуторов не нужно поднимать их вручную, K8S Scheduler сделает это сам. Манифест приведен для примера и проверки работоспособности GPU Operator, после проверки его можно удалить.

Образ Spark executor

Следует изменить spark/kubernetes/dockerfiles/Dockerfile:

Dockerfile

ARG java_image_tag=17-jre
FROM ${java_image_tag}

ARG spark_uid=1001
ARG UID_GID=1001
ENV UID=${UID_GID}
ENV GID=${UID_GID}

ENV SPARK_RAPIDS_DIR=/opt/sparkRapidsPlugin
ENV SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-23.04.0.jar

RUN set -ex && \
    sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools iptables sudo \
    wget software-properties-common build-essential libnss3-dev zlib1g-dev libgdbm-dev libncurses5-dev \
    libssl-dev libffi-dev libreadline-dev libsqlite3-dev libbz2-dev python3 && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/examples && \
    mkdir -p /opt/spark/conf && \
    mkdir -p /opt/spark/work-dir && \
    mkdir -p /opt/sparkRapidsPlugin && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
    rm -rf /var/cache/apt/*
RUN apt-get install libnccl2 libnccl-dev -y --allow-change-held-packages && rm -rf /var/cache/apt/*

COPY jars /opt/spark/jars
COPY rapids ${SPARK_RAPIDS_DIR}
COPY extraclasspath /opt/spark/extraclasspath
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
COPY datasets /opt/spark/

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh
RUN chmod a+x /opt/entrypoint.sh
RUN chmod a+x /opt/sparkRapidsPlugin/getGpusResources.sh
RUN ls -lah /opt

RUN groupadd --gid $UID appuser && useradd --uid $UID --gid appuser --shell /bin/bash --create-home appuser
RUN mkdir /var/logs && chown -R appuser:appuser /var/logs
RUN mkdir /opt/spark/logs && chown -R appuser:appuser /opt/spark/
RUN chown -R appuser:appuser /tmp

RUN ls -lah /home/appuser
RUN touch /home/appuser/.bashrc

RUN echo -e '\
export SPARK_HOME=/opt/spark\n\
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin\
' > /home/appuser/.bashrc

RUN chown -R appuser:appuser /home/appuser

EXPOSE 4040
EXPOSE 8081

USER ${spark_uid}

ENTRYPOINT [ "/opt/entrypoint.sh" ]

entrypoint.sh из прошлой статьи пригодится, но для локального standalone кластера из одного воркера (подходит для отладки в Windows). spark/kubernetes/dockerfiles/entrypoint.sh следует изменить до стандартного вида:

entryoint.sh

#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# echo commands to the terminal output
set -ex

# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
mygid=$(id -g)
# turn off -e for getent because it will return error code in anonymous uid case
set +e
uidentry=$(getent passwd $myuid)
set -e

# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
    if [ -w /etc/passwd ] ; then
	echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
    else
	echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
    fi
fi

SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt

if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
  SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi

if ! [ -z ${PYSPARK_PYTHON+x} ]; then
    export PYSPARK_PYTHON
fi
if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
    export PYSPARK_DRIVER_PYTHON
fi

# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
if [ -n "${HADOOP_HOME}"  ] && [ -z "${SPARK_DIST_CLASSPATH}"  ]; then
  export SPARK_DIST_CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath)"
fi

if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
  SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
fi

if ! [ -z ${SPARK_CONF_DIR+x} ]; then
  SPARK_CLASSPATH="$SPARK_CONF_DIR:$SPARK_CLASSPATH";
elif ! [ -z ${SPARK_HOME+x} ]; then
  SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH";
fi

case "$1" in
  driver)
    shift 1
    CMD=(
      "$SPARK_HOME/bin/spark-submit"
      --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
      --deploy-mode client
      "$@"
    )
    ;;
  executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
      --resourceProfileId $SPARK_RESOURCE_PROFILE_ID
    )
    ;;

  *)
    echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..."
    CMD=("$@")
    ;;
esac

# Execute the container CMD under tini for better hygiene
exec /usr/bin/tini -s -- "${CMD[@]}"

Файл spark/kubernetes/dockerfiles/decom.sh должен выглядеть следующим образом:

decom.sh

#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


set -ex
echo "Asked to decommission"
# Find the pid to signal
date | tee -a ${LOG}
WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }')
echo "Using worker pid $WORKER_PID"
kill -s SIGPWR ${WORKER_PID}
# For now we expect this to timeout, since we don't start exiting the backend.
echo "Waiting for worker pid to exit"
# If the worker does exit stop blocking the cleanup.
timeout 60 tail --pid=${WORKER_PID} -f /dev/null
date
echo "Done"
date
sleep 1

Скрипт сборки образов, который собирает два образа — для чистого JVM и PySpark (на всякий случай):

build-java-8-spark-3.3.2.sh

#!/bin/bash
echo "=========== 1st stage ==========="
echo "Docker repo: $DOCKER_REPO"
echo "### build base cuda11.8 java8 image ###"
docker build -f Dockerfile-cuda-java8 -t localhost:5000/cuda11.8-jdk8:v1 .

echo "=========== 2nd stage ==========="
echo "### build Spark and PySpark images ###"
cd spark-3.3.2

./bin/docker-image-tool.sh -r localhost:5000 \
      -t jdk8-3.3.2 -b java_image_tag=localhost:5000/cuda11.8-jdk8:v1 \
      -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile \
      -n build

cd ..

docker tag localhost:5000/spark-py:jdk8-3.3.2 $DOCKER_REPO/cuda-jdk8-spark-py-3.3.2:v2
docker tag localhost:5000/spark:jdk8-3.3.2 $DOCKER_REPO/cuda-jdk8-spark-3.3.2:v2

echo "=========== 3rt stage ==========="
echo "### push Spark and PySpark images ###"
docker push $DOCKER_REPO/cuda-jdk8-spark-py-3.3.2:v2
docker push $DOCKER_REPO/cuda-jdk8-spark-3.3.2:v2

, где $DOCKER_REPO — ваш репозиторий Docker.

Для сборки образа executor необходим образ localhost:5000/cuda-jdk8:v1. Его можно собрать со следующим Dockerfile:

Dockerfile localhost:5000/cuda11.8-jdk8: v1

FROM nvcr.io/nvidia/cuda:11.8.0-runtime-ubuntu22.04

ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8'

ARG DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools

RUN apt-get update && \
	apt-get install -y openjdk-8-jdk && \
	apt-get install -y ant && \
	apt-get clean && \
	rm -rf /var/lib/apt/lists/* && \
	rm -rf /var/cache/oracle-jdk8-installer;

RUN apt-get update && \
	apt-get install -y ca-certificates-java && \
	apt-get clean && \
	update-ca-certificates -f && \
	rm -rf /var/lib/apt/lists/* && \
	rm -rf /var/cache/oracle-jdk8-installer;

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME

CMD ["tail", "-f", "/dev/null"]

Команда сборки приведена в скрипте выше:

docker build -f Dockerfile-cuda-java8 -t localhost:5000/cuda-jdk8:v1 .

Требования для запуска Spark Driver

Service account

Для драйвера необходим K8S Service Account с полными правами на неймспейс (в данном примере namespace default, но в продуктивном кластере, разумеется, следует выделять отдельный namespace). Следующий манифест создает необходимы SA и CRB:

Service Account и ClusterRoleBinding

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark-driver
  namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-driver-crb
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: cluster-admin
subjects:
  - kind: ServiceAccount
    name: spark-driver
    namespace: default

Имя SA указывается в конфиге SparkContext:

.set("spark.kubernetes.authenticate.driver.serviceAccountName", driverSa)

В манифесте приложения так же указывается SA:

serviceAccountName: spark-driver

В идеале, для Executor необходим отдельный SA с ограниченными правами (ClusterRole «edit»), но в данном примере он не конфигурировался.

Headless service

Для того, чтобы экзекуторы могли делать запросы к драйверу, необходимо сделать headless service:

kubectl expose pod $SPARK_DRIVER_NAME --port=$SPARK_DRIVER_PORT \
  --type=ClusterIP --cluster-ip=None

Манифест такого сервиса выглядит следующим образом:

headless service

kind: Service
apiVersion: v1
metadata:
  labels:
    app: ml-app
  name: ml-app
spec:
  type: ClusterIP
  clusterIP: None
  clusterIPs:
    - None
  internalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack
  ports:
    - name: driver-port
      port: 33139
      targetPort: 33139
      protocol: TCP
    - name: block-manager-port
      port: 45029
      targetPort: 45029
      protocol: TCP
  selector:
    app: ml-app
  sessionAffinity: None

Запуск Spring приложения

На ряду с указанием мастера, необходимо указать параметр

spark.submit.deployMode

Значением cluster или client.

Cluster mode

В режиме работы Cluster приложение (spark driver) существует вне кластера K8S, его под не создается внутри кластера. Хорошо подходит для отладки, либо когда приложение запускается в отдельном кластере/сервере.

Client mode

В режиме работы Client приложение (spark driver) существует в кластере, поднимается как POD. При этом следует указать дополнительные настройки (будет рассмотрено ниже, в разделе с конфигурацией приложения).

Манифест приложения

application.yml

---
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: ml-app
  name: ml-app
spec:
  serviceAccountName: spark-driver
  containers:
    - name: ml-app
      image: repo/ml:cuda11.8-jdk8
      imagePullPolicy: Always
      env:
        - name: "SPARK_EXECUTORS"
          value: "2"
        - name: "SPARK_MODE"
          value: "client"
        - name: "POSTGRES_HOST"
          value: "192.168.0.125"
        - name: "CASSANDRA_HOST"
          value: "192.168.0.125,192.168.0.150"
          # add your own env vars from ConfigMaps/Secrets
      resources:
        limits:
          cpu: "2"
          memory: 4Gi
        requests:
          cpu: "200m"
          memory: 0.5Gi
      ports:
        - containerPort: 4040
        - containerPort: 9090
        - containerPort: 33139
        - containerPort: 45029
---
kind: Service
apiVersion: v1
metadata:
  labels:
    app: ml-app
  name: ml-app
spec:
  type: ClusterIP
  clusterIP: None
  clusterIPs:
    - None
  internalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack
  ports:
    - name: driver-port
      port: 33139
      targetPort: 33139
      protocol: TCP
    - name: block-manager-port
      port: 45029
      targetPort: 45029
      protocol: TCP
  selector:
    app: ml-app
  sessionAffinity: None

После применения манифеста автоматически поднимаются два Spark Executor в отдельных подах:

183f8d54c12feb64d680888832512316.png

Cassandra

Cassandra — великолепный инструмент для хранения больших объемов данных. Это OLTP БД, те. рассчитанная на частые записи. Также особенностью этой БД является то, что модель данных нужно строить исходя из того, как будут запрашиваться данные: при не корректном задании ключей партиционирования и ключей кластеризации таблиц легко прийти к запросам с ALLOW FILTERING или организовать неравномерное заполнение узлов кластера Cassandra.

Для частных запросов на чтение больше подходят аналитические (OLAP) БД. Если посмотреть в сторону wide‑column DB, то аналогом Cassandra может быть Clickhouse. Однако,  бывают различные случаи принципиального выбора Cassandra как источника аналитических данных и невозможности использования Clickhouse: недостаток вычислительных ресурсов, компетенций команд разработки и администрирования, и прочие ограничения.

Результатом такого решения может быть сложность выборки необходимых данных, так как, в отличие от реляционных БД, в Cassandra нет механизма отношений и поддержки join‑операций. В таком случае можно использовать Apache Spark, который позволяет выполнять запросы к Cassandra как к реляционной базе данных. Если имеется несколько Cassandra датацентров, один из них можно настроить для аналитических операций, разгрузив при этом датацентр (ы) для транзакций записи.

Связкой Spark‑Cassandra можно реализовать ETL‑процессы, как между таблицами Cassandra, так и в другие БД, например, в тот же Clickhouse, используя различные коннекторы Spark.

Сущности Big Data системы

Случайно или нет, статья публикуется во время сезона Big Data на Habr. Поскольку статья является руководством и ориентирована в том числе для новичков в этой области, кратко рассмотрим некоторые ключевые понятия: что такое ETL, Data Lake, DWH и Data Mart.

Data Lake — это централизованное хранилище данных, которое позволяет хранить данные в исходном формате без предварительной структуризации или преобразования. Data Lake предоставляет возможность хранить большие объемы разнородных данных, включая структурированные, полуструктурированные и неструктурированные данные. Это позволяет проводить различные виды анализа, исследований и обработки данных в будущем, когда появится необходимость.

DWH — это хранилище данных, и это специально организованная структура, предназначенная для хранения и управления данными. DWH интегрирует данные из разных источников, таких как транзакционные базы данных, файлы, веб‑сервисы и другие источники данных. Он предоставляет единое и консолидированное представление данных, которое удобно для анализа и отчетности.

Data Mart (Витрина данных) представляет собой сегментированное подмножество данных из хранилища данных (т. е. является подмножеством DWH), ориентированное на конкретные потребности бизнеса или отделов компании. Data Mart содержит данные, специально организованные и структурированные для поддержки аналитических запросов и принятия решений в конкретной области или функциональном подразделении компании.

ETL (Извлечение, Трансформация и Загрузка) — это процесс, который обеспечивает передачу данных из исходных источников в целевые системы хранения данных. В процессе ETL данные извлекаются из различных источников, затем подвергаются трансформации, включающей очистку, фильтрацию, преобразование и объединение данных, и, наконец, загружаются в целевую систему хранения данных, такую как DWH или Data Mart. Процесс ETL играет важную роль в обеспечении актуальности, целостности и качества данных в аналитической системе.

Пример процесса работы с данными в рассматриваемой системе

Рассмотрим, как можно использовать получившуюся систему на конкретном примере. Предметная область — аналитика цен финансовых инструментов, торгующихся на фондовых рынках. Так система строится на основе вычислений на GPU, попробуем построить модель машинного обучения, которая предсказывает цену курса акций NVidia (тикер NVDA) на несколько периодов вперед. Анализ проводился на данных дневного таймфрейма (1 day) с предсказанием на 2 периода (2 дня) вперед.

Схематично, система выглядит следующим образом:

4bfb87f288b074b4b5e9f773b2ea24e1.png

Красной пунктирной линией показаны ETL процессы из источников данных, которые в рассматриваемой системе можно принять за Data Lakes. Данные из них забираются сервисом Data Extractor, который развернут на k8s node pool только с CPU, и записываются в OLTP DWH кластер Cassandra.

По необходимости/расписанию/событию Analytics Service, который развернут на k8s node pool с GPU, забирает данные из нескольких таблиц DWH кластера, трансформирует их в единую структуру и записывает в Data Mart кластер Cassandra, который может быть настроен для OLAP операций. Это тоже ETL процесс, он обозначен черными сплошными линиями, выполняется посредством Spark Executors, которые назначены Analytics Service — в терминологии Spark это driver.

Говоря об инфраструктуре решения, не будет лишним упомянуть еще пару инструментов для работы в с Big Data системой:

Apache Zeppelin — мощный инструмент визуализации и анализа данных. Он предоставляет интерактивную среду для разработки, выполнения и представления результатов вычислений на основе больших объемов данных. С помощью Apache Zeppelin можно создавать и запускать ноутбуки, которые содержат код на различных языках программирования, включая Java, Scala, Kotlin и многие другие. Zeppelin обладает широким набором интегрированных визуализаций и возможностей интерактивного анализа данных, что делает его полезным инструментом для работы с результатами аналитики и машинного обучения. Zeppelin имеет возможность работы с интерпретатором Spark.

Spring Cloud Data Flow (SCDF) — это распределенная система управления потоками данных (Data Flow) в облачной среде. Она предоставляет инфраструктуру и инструменты для развертывания, управления и мониторинга сложных потоков данных между различными источниками и приемниками данных. SCDF позволяет создавать и конфигурировать потоки данных в виде графа, состоящего из различных компонентов обработки, таких как источники, преобразования, фильтры и назначения. С помощью SCDF можно управлять и масштабировать потоки данных в распределенной среде.

Оба эти инструмента, Apache Zeppelin и Spring Cloud Data Flow, могут быть полезными в контексте рассматриваемой системы для аналитики цен финансовых инструментов. Apache Zeppelin предоставит удобную среду для визуализации и анализа данных, а Spring Cloud Data Flow поможет в управлении потоками данных и обработке информации между различными компонентами системы.

Конфигурация приложения

Стоит подробнее рассмотреть конфигурацию приложения для работы со Spark.

public static JavaSparkContext sparkContext(SparkSettings sparkSettings) {
        String host = sparkSettings.getHost();
        SparkConf sparkConf = new SparkConf(true)
                .setAppName(sparkSettings.getAppName())
                .setMaster(sparkSettings.getMasterHost())
                .setJars(sparkSettings.getJars())
                ...
                .set("spark.driver.resource.gpu.amount", "0") // (1)
                ...
                // Cassandra // (2)
                .set("spark.jars.packages", "datastax:spark-cassandra-connector:3.3.0-s_2.12")
                .set("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
                .set("spark.cassandra.auth.username", sparkSettings.getCassandraUser())
                .set("spark.cassandra.auth.password", sparkSettings.getCassandraPass())
                .set("spark.cassandra.connection.host", sparkSettings.getCassandraHosts());

        if (sparkSettings.isKubernetesEnabled()) { // (3)
            sparkConf.set("spark.kubernetes.container.image", sparkSettings.getExecutorImage());

            if (sparkSettings.getDeployMode().equals("client")) {
                sparkConf
                    .set("spark.kubernetes.driver.pod.name", sparkSettings.getHostName())
                    .set("spark.kubernetes.namespace", sparkSettings.getNamespace())
                    .set("spark.kubernetes.authenticate.driver.serviceAccountName", sparkSettings.getDriverSa());
            }
        }
        return new JavaSparkContext(sparkConf);
    }

(1) — если приложение (spark driver) не использует ресурсы GPU напрямую, следует указать в данной настройке »0», чтобы под него не резервировался необходимый Spark Executor’ам ресурс.

(2) — настройки cassandra spark connector. Для подключения экзекуторов к кластеру Cassandra следует указать данные настройки. Коннектор cassandra имеет большое число настроек, все можно найти здесь;

(3) — так как приложение может работать не только в k8s, настройки, которые предназначены только для него, не стоит указывать. Здесь должен быть указан, как минимум, необходимый для работы приложения образ spark executor, имя driver k8s Pod (можно получить с помощью InetAddress.getLocalHost().getHostName();), namespace, в котором будут создаваться executor Pods, и Service Account. Все это было создано ранее.

JavaSparkContext может быть передан в объект SparkSession, который далее будет использоваться для работы с executor’ами.

Работа с Cassandra в Spark

Получение данных

В сервисе будем использовать Dataset как объект с данными. Существуют так же RDD и Dataframe, и, почему в Java/Kotlin стоит использовать Dataset, можно почитать здесь и здесь.

Базовый метод получения данных из кассандра:

AbstractCassandraRepository

abstract class AbstractCassandraRepository constructor(
    private val sparkSession: SparkSession
) {

    companion object {
        internal const val keyspace: String = "instrument_data"
    }

    fun cassandraDataset(keyspace: String, table: String): Dataset {
        val cassandraDataset: Dataset = sparkSession.read()
            .format("org.apache.spark.sql.cassandra")
            .option("keyspace", keyspace)
            .option("table", table)
            .load()

        cassandraDataset.createOrReplaceTempView(table)
        return cassandraDataset
    }
}

Здесь мы работаем со спарковой сессией: указываем ей, что Dataset должен быть прочитан из кассандры, указываем кейспейс и таблицу, указываем имя датасета, чтобы в последующих запросах можно было обращаться к колонкам таблицы по ее имени.

Естественно, никого не интересует получать все данные и делать full scan таблицы — у каждой есть Partition Key, по которому нужно искать необходимые данные.

Рассмотрим пример получения данных из таблицы instrument_data.time_series_history:

instrument_data.time_series_history

create table instrument_data.time_series_history
(
    ticker      text,
    task_number uuid,
    datetime    timestamp,
    timeframe   text,
    close       decimal,
    high        decimal,
    low         decimal,
    open        decimal,
    volume      bigint,
    primary key (ticker, task_number, datetime, timeframe)
    )
    with compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};

create index time_series_history_datetime_index
    on instrument_data.time_series_history (datetime);

create index time_series_history_task_number_index
    on instrument_data.time_series_history (task_number);

create index time_series_history_timeframe_index
    on instrument_data.time_series_history (timeframe);

У нее имеется Partition Key ticker и три Clustering Column:

task_number — номер задания на получение и анализ данных в смежной системе;

datetime — дата и время записи;

timeframe — таймфрейм временного ряда (1day в рассматриваемом случае).

Задача — доставать данные по ticker, task_number и datetime between (dateStart, dateEnd) из всех таблиц.

AbstractIndicatorRepository

abstract class AbstractIndicatorRepository constructor(
    sparkSession: SparkSession,
    private val table : String
) : AbstractCassandraRepository(sparkSession) {

    fun getBaseDataSet(
        ticker: String,
        taskNumber : UUID,
        dateStart : LocalDate,
        dateEnd : LocalDate
    ): Dataset {
        return cassandraDataset(table)
            .filter(
                functions.col("ticker").equalTo(ticker)
                    .and(functions.col("task_number").equalTo(taskNumber.toString()))
                    .and(functions.col("datetime").between(dateStart, dateEnd)))
    }
}

Таким образом, можно обратиться к предыдущему методу и отфильтровать значения. Функционал данного метода будет аналогичен запросу данных из таблицы с указанными полями. При этом Spark не будет доставать весь датасет, затем его фильтровать — в экзекуторы попадут уже отфильтрованные данные.

Данный метод аналогичен запросу:

select *
from instrument_data.time_series_history
where ticker = ?1
  AND task_number = ?2
  AND datetime > ?3 and datetime < ?4;

Если не передавать partition key, увеличение времени запроса можно отследить на DAG запроса в Spark UI. Это лежит на поверхности, можно очень быстро разобраться, на получении каких данных запрос тормозит работу.

Чтобы выполнить select по определенным столбцам, уже следует создать репозиторий для отдельной таблицы и выполнить запрос следующим образом:

TimeSeriesRepository

@Component
class TimeSeriesRepository(
    sparkSession: SparkSession
) : AbstractIndicatorRepository(sparkSession, "time_series_history") {

    fun getDataset(
        ticker: String,
        taskNumber : UUID,
        dateStart : LocalDate,
        dateEnd : LocalDate
    ) : Dataset {
        val dataset = getBaseDataSet(ticker, taskNumber, dateStart, dateEnd)
            .selectExpr(
                "to_date(datetime) as dateTime",
                "CAST(close AS Double) as close"
            )

        dataset.createOrReplaceTempView("ts")
        return dataset
    }
}

Тогда конечный датасет вернет таблицу из двух столбцов с датой и временем и ценой закрытия, преобразованной в Double. Double необходим при передаче данных на вычисление в GPU.

Получаем все необходимые данные:

getMainDataset

private fun getMainDataset(
    ticker : String,
    taskNumber : UUID,
    dateStart : LocalDate,
    dateEnd : LocalDate,
    currentOffset : Int,
    batchSize : Int
) : Dataset {
    val timeSeries = timeSeriesRepository
        .getDataset(ticker, taskNumber, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("ts")
    val emaDataSet = emaRepository
        .getEmaDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("ema")
    val stochasticDataset = stochasticRepository
        .getStochasticDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("stoch")
    val bBandsDataset = bBandIndicatorRepository
        .getBBandsDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("bb")
    val macdDataset = macdRepository
        .getMacdDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("macd")
    val rsiDataset = rsiRepository
        .getRsiDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("rsi")
    val smaDataset = smaRepository
        .getSmaDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("sma")
    val willrDataset = willrRepository
        .getWillrDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("willr")

    return combineDatasets(
        timeSeries, emaDataSet, stochasticDataset, bBandsDataset, macdDataset, rsiDataset, smaDataset, willrDataset
    )
}

Здесь у Java/Kotlin разработчика может возникнуть желание сделать запрос данных через CompletableFuture> и выполнить их все в одном тредпуле. Однако, не стоит этого делать. Спарк сам распараллеливает работу и создает FutureTask’s, программисту не стоит распараллеливать код на разные потоки.

и комбинируем:

combineDatasets

private fun combineDatasets(
    timeSeries : Dataset,
    emaDataSet : Dataset,
    stochasticDataset : Dataset,
    bBandsDataset : Dataset,
    macdDataset : Dataset,
    rsiDataset : Dataset,
    smaDataset : Dataset,
    willrDataset : Dataset
): Dataset {

    val result = timeSeries
        .join(emaDataSet, timeSeries.col("datetime")
                .equalTo(emaDataSet.col("datetime")), "leftouter")
        .join(stochasticDataset, timeSeries.col("datetime")
                .equalTo(stochasticDataset.col("datetime")), "leftouter")
        .join(bBandsDataset, timeSeries.col("datetime")
                .equalTo(bBandsDataset.col("datetime")), "leftouter")
        .join(macdDataset, timeSeries.col("datetime")
                .equalTo(macdDataset.col("datetime")), "leftouter")
        .join(rsiDataset, timeSeries.col("datetime")
                .equalTo(rsiDataset.col("datetime")), "leftouter")
        .join(smaDataset, timeSeries.col("datetime")
                .equalTo(smaDataset.col("datetime")), "leftouter")
        .join(willrDataset, timeSeries.col("datetime")
                .equalTo(willrDataset.col("datetime")), "leftouter")
        .selectExpr(
            "ts.dateTime as dateTime",
            "ema.emaTimePeriod as emaTimePeriod",
            "ema.ema as ema",
            "stoch.fastKPeriod as fastKPeriod",
            "stoch.slowKPeriod as slowKPeriod",
            "stoch.slowDPeriod as slowDPeriod",
            "stoch.slowD as slowD",
            "stoch.slowK as slowK",
            "bb.timePeriod as bbTimePeriod",
            "bb.sd as bbSd",
            "bb.lowerBand as lowerBand",
            "bb.middleBand as middleBand",
            "bb.upperBand as upperBand",
            "macd.signalPeriod as signalPeriod",
            "macd.fastPeriod as fastPeriod",
            "macd.slowPeriod as slowPeriod",
            "macd.macd as macd",
            "macd.macdHist as macdHist",
            "macd.macdSignal as macdSignal",
            "rsi.timePeriod as rsiTimePeriod",
            "rsi.rsi as rsi",
            "sma.timePeriod as smaTimePeriod",
            "sma.sma as sma",
            "willr.timePeriod as willrTimePeriod",
            "willr.willr as willr",
            "ts.close as close"
        )

    val windowSpec = Window.orderBy(functions.asc("dateTime"))
    return result
        .withColumn("id", functions.row_number().over(windowSpec))
        .filter(functions.col("dateTime").isNotNull)
}

В методе combineDatasets происходит объединение таблиц, которое Cassandra не поддерживает, а Spark предоставляет такую возможность.

В результирующий датасет добавляется колонка id, которая содержит номер строки. Здесь же предварительно сортируются данные по колонке dateTime и отфильтровываются строки с пустыми ячейками.

Стоит отметить, что есть иной способ работы с Dataset — можно обратиться к объекту sparkSession, вызвать метод sql () и передать в него обычный SQL запрос на объединение данных, и он будет работать.

Если все‑таки предпочтительно использовать SQL запросы, можно это делать, различий в скорости работы в моих без учета предварительных фильтров для датасетов не было замечено. Spark поддерживает все основные функции, с единственной оговоркой — не стоит писать свои собственные функции (UDF) и стараться выполнить запрос, это плохо влияет на производительность. За помощью в этом вопросе следует обратиться к оф. документации. Spark понимает ANSI формат. Если нужно перевести запрос, написанный на другом диалекте, можно воспользоваться инструментом.

В абзаце выше не просто так выделено полужирным с подчеркиванием одно важное условие. Допустим, мы получили датасет timeSeries, и сразу же хотим через sparkSession.sql () нативным запр

© Habrahabr.ru