Машинное обучение на Spark

Существует множество подходов к машинному обучению. Со стороны может показаться, что генеративные модели на архитектуре под названием «трансформер» заняли передовые позиции и ближайшее обозримое будущее именно за ними. Но существуют и другие подходы к машинному обучению, которые тиражируются в медийном поле не так широко.

В этой статье вы познакомитесь с таким классом алгоритмов, как ансамблевые методы машинного обучения. А именно — градиентный бустинг на решающих деревьях. В основе они представляют из себя деревья решений, которые являются очень простой структурой, позволяющей получить ответ на основе входных данных. А еще мы разберемся, при чем тут Spark, и посмотрим на эти алгоритмы на практике.

Введение для контекста

Генеративные текстовые модели вроде GPT-4 или Claude Sonnet 3.5 постепенно проникают во все сферы нашей жизни. Кому-то кажется, что эти нейросети смогут заменить большинство специалистов по работе с текстом. Кто-то считает, что для этого им пока что не хватает мощности. Но в медийном поле они закрепились как самые продвинутые инструменты на основе искусственного интеллекта. 

Отчасти это правда, но есть важный нюанс. Человек, не работающий с машинным обучением, может считать, что архитектура LLM — это самая продвинутая модель, которая подходит под любые задачи. Из-за этого другие подходы к машинному обучению игнорируются в попытках реализовать решение всех задач через сверточные нейронные сети, в частности через LLM. Даже многие специалисты машинного обучения иногда забывают, что более простые алгоритмы могут оказаться достаточно точными для решения определенного спектра задач. 

Ансамблевые методы — как раз один из таких вариантов простых алгоритмов. Одним из вариантов ансамблевых методов является градиентный бустинг. Он представляет из себя множество деревьев решений. Эти деревья формируют лес решений, а бустинг выстраивает линейную комбинацию из произвольного числа таких лесов. Как бы надстраивая один лес над другим, алгоритм градиентного бустинга минимизирует ошибку. 

Градиентный бустинг обучается быстро, показывает хорошие результаты и не требует таких высоких вычислительных мощностей, как обучение LLM. Поэтому это один из самых эффективных применяемых алгоритмов для обработки табличных и неоднородных данных. С моделями, обученными именно градиентным бустингом, мы сталкиваемся повсеместно: это и рекомендательные системы, и калькуляторы тарифов сотовых операторов, и многие другие обработчики персональных данных, о которых мы не знаем.

Предлагаю познакомиться с одной из самых эффективных реализаций градиентного бустинга — Catboost и Catboost for Spark.

При чем тут Spark

Так сложилось, что Apache Spark — это стандарт в индустрии, когда дело касается по-настоящему больших данных. Если вы хотите обрабатывать набор данных, которые невозможно поместить в один физический сервер, то вам необходим Spark. Однако это очень дорогое решение ввиду сложности его настройки. Помимо самого Spark, необходимо также настроить Hadoop как облачную файловую систему, планировщик заданий и всю сопутствующую ему обвязку. Именно поэтому стал набирать популярность Spark на кластерах Kubernetes. Мы в Cloud ML Platform реализовали поддержку этого инструмента именно на этих кластерах. 

Такой подход позволяет очень сильно экономить ресурсы, так как можно сделать небольшой кластер с поддержкой автомасштабирования. И уже в процессе работы Kubernetes будет запрашивать дополнительные ресурсы у облака при необходимости, затем отдавать их обратно, используя ресурсы, только когда это необходимо.

Какова же связь между Spark и градиентным бустингом? Достаточно прямая. Любой алгоритм машинного обучения работает тем лучше, чем больше данных ему доступно, при условии их неизменного качества. Градиентный бустинг — это один из тех алгоритмов, которые больше всего выигрывают за счет увеличения количества данных. Таким образом, возможность работать с неограниченным размером данных, которую предоставляет Spark, отлично соотносится с особенностями градиентного бустинга. Более того, одна из самых популярных и эффективных библиотек по работе с ним предоставляет свою реализацию под Spark.

Теперь предлагаю познакомиться поближе с возможностями градиентного бустинга на Spark на реальном примере.

Описание задачи

Было решено использовать машинное обучения для предсказания такого заболевания, как ИБС — ишемическая болезнь сердца. Медицина — многогранная область знаний, недостаточно хорошо представленная в машинном обучении. 

Как показывает статистика, ишемическая болезнь сердца является одной из самых частых причин смертей в мире. Основную опасность здесь представляет именно внезапность сердечных приступов, вызванных этой болезнью. Поэтому важно вовремя обнаружить, что пациент принадлежит к определенной группе риска, связанной с ИБС, и направить его на обследование.

Самым точным способом определения ИБС является коронарография. Однако это очень опасная и сложная инвазивная процедура, которая требует наличия соответствующего оборудования, лицензии и специалиста по эндоваскулярной диагностике. Все это приводит к тому, что назначение на эту процедуру получает далеко не каждый пациент, у которого есть диагноз ИБС или который находится в ее группе риска. Чаще всего коронарография производится непосредственно перед операцией, чтобы определить, в какой именно области сердца требуется вмешательство. При этом практически не применяется для первичной постановки диагноза ИБС.

Другим способом косвенного определения ишемической болезни сердца являются анализы, по которым кардиолог сможет поставить диагноз и записать пациента в группу риска. Опытный кардиолог сможет поставить этот диагноз на основании ЭКГ и жалоб пациента на давящую или сжимающую боль в области сердца. Однако у врача ограничено количество приемов в день и самих рабочих дней. Но обработать результаты анализов и ЭКГ может и компьютер. Возможность разгрузить медицинский персонал — это цель обучения модели, описанной в этой статье. В теории, получив диагноз от модели, пациент может сразу же записаться в группу риска и отправиться на обследование к кардиологу. Это позволило бы автоматизировать процесс и ускорить сам прием у врача, предоставив специалисту все необходимые данные вместе с прогнозом о возможном наличии ИБС.

Стоит заранее сделать оговорку: автор статьи не преследовал цели повлиять на медицинскую практику непосредственно. В данном вопросе у меня нет компетенции. Я лишь хотел продемонстрировать, как использовать современные подходы к машинному обучению, на примере медицинской тематики.

Разведывательный анализ данных

Из-за того что Spark работает в Kubernetes через Spark-operator, единственным способом выполнить любую задачу на Spark является Spark-submit, который каждый раз поднимает поды со Spark как с отдельной инсталляцией, выполняет расчет и отключает эти поды, таким образом создавая изолированную установку Spark для выполнения каждый конкретной Spark Job. В связи с этим довольно неудобно работать с датасетом в таком инкрементальном подходе.

Для целей разведывательного анализа данных существует инструмент Spark-connect, входящий в состав поставки  Cloud Spark (K8s) от Cloud ML Platform. С помощью него можно подключиться к серверу Spark и в привычной среде, например в Jupyter, работать с сессией Spark. Плюс такого подхода заключается в том, что данные обрабатываются сразу в нативной форме для Spark и можно перенести эту обработку сразу на обучение.

Поэтому воспользуемся именно им. Для начала создадим инстанс Jupyterhub (ML Platform → Инстансы → Новый Jupyterhub), а также инстанс Spark в K8s (ML Platform → Spark в K8s → Создать кластер). Дождемся завершения процесса.

Нам потребуются следующие данные из личного кабинета:

  • имя бакета, куда будет загружен датасет (ML Platform → Spark в K8s → Выбрать созданный кластер → Бакет);

  • токен доступа к кластеру (ML Platform → Токены);

  • ID созданного кластера (ML Platform → Spark в K8s → Выбрать созданный кластер → ID);

  • DNS-имя созданного кластера.

В качестве датасета для обучения модели воспользуемся Classification of Coronary Artery Disease (CAD). Это небольшой датасет, который отлично подойдет для proof-of-concept-задачи вроде этой. Датасет содержит всего 303 строки и 55 столбцов. Однако его небольшой размер не помешает нам получить хорошую модель. Нужно лишь грамотно его предобработать. Также стоит заметить, что датасет состоит из данных 2017 года и несколько отстает от современных представлений в медицине о факторах риска ИБС. В контексте этой статьи это не важно, но читателям стоит иметь это в виду.

Когда создание кластера будет завершено, возьмем S3-бакет, к которому привязан кластер, и поместим датасет туда. ML Platform → Spark в K8s → Выбрать созданный кластер → Бакет, далее идем в Объектное хранилище → Бакеты и кладем датасет в каталог datasets. У меня получилось datasets/CAD.csv

Также необходимо взять токен доступа. Идем в ML Platform → Токены и выписываем токен, далее сохраняем его.

Заходим в созданный инстанс Jupiter, создаем новый ноутбук и устанавливаем необходимые библиотеки:

%pip install pyspark

%pip install pandas

%pip install pyarrow

%pip install grpcio

%pip install protobuf

%pip install grpcio-status

После этого запускаем сессию в Spark-connect. Здесь нам пригодится токен доступа, который мы сохранили, и DNS-имя кластера. URL для Spark-connect следует следующему шаблону «sc://:15002/; spark-token=<токен доступа>»

6cdde5489ecef669d688d709ca9c2a5f.png

Выводим схему данных и видим, что у нас датасет, который состоит из 55 столбцов, один из которых называется Cath и является целевым для обучения. Всего значений 303, что может показаться недостаточным, однако это компенсируется количеством фичей (столбцов).

Датасет

root

 |-- Age: integer (nullable = true)

 |-- Weight: integer (nullable = true)

 |-- Length: integer (nullable = true)

 |-- Sex: string (nullable = true)

 |-- BMI: double (nullable = true)

 |-- DM: integer (nullable = true)

 |-- HTN: integer (nullable = true)

 |-- Current Smoker: integer (nullable = true)

 |-- EX-Smoker: integer (nullable = true)

 |-- FH: integer (nullable = true)

 |-- Obesity: string (nullable = true)

 |-- CRF: string (nullable = true)

 |-- CVA: string (nullable = true)

 |-- Airway disease: string (nullable = true)

 |-- Thyroid Disease: string (nullable = true)

 |-- CHF: string (nullable = true)

 |-- DLP: string (nullable = true)

 |-- BP: integer (nullable = true)

 |-- PR: integer (nullable = true)

 |-- Edema: integer (nullable = true)

 |-- Weak Peripheral Pulse: string (nullable = true)

 |-- Lung rales: string (nullable = true)

 |-- Systolic Murmur: string (nullable = true)

 |-- Diastolic Murmur: string (nullable = true)

 |-- Typical Chest Pain: integer (nullable = true)

 |-- Dyspnea: string (nullable = true)

 |-- Function Class: integer (nullable = true)

 |-- Atypical: string (nullable = true)

 |-- Nonanginal: string (nullable = true)

 |-- Exertional CP: string (nullable = true)

 |-- LowTH Ang: string (nullable = true)

 |-- Q Wave: integer (nullable = true)

 |-- St Elevation: integer (nullable = true)

 |-- St Depression: integer (nullable = true)

 |-- Tinversion: integer (nullable = true)

 |-- LVH: string (nullable = true)

 |-- Poor R Progression: string (nullable = true)

 |-- FBS: integer (nullable = true)

 |-- CR: double (nullable = true)

 |-- TG: integer (nullable = true)

 |-- LDL: integer (nullable = true)

 |-- HDL: double (nullable = true)

 |-- BUN: integer (nullable = true)

 |-- ESR: integer (nullable = true)

 |-- HB: double (nullable = true)

 |-- K: double (nullable = true)

 |-- Na: integer (nullable = true)

 |-- WBC: integer (nullable = true)

 |-- Lymph: integer (nullable = true)

 |-- Neut: integer (nullable = true)

 |-- PLT: integer (nullable = true)

 |-- EF-TTE: integer (nullable = true)

 |-- Region RWMA: integer (nullable = true)

 |-- VHD: string (nullable = true)

 |-- Cath: string (nullable = true)

Данные разделены на категориальные, численные и порядковые. Хорошим признаком является то, что данные не содержат пропусков или дублированных значений.

bd6f0c2e09439586e1c021f98c6b2647.png

Попробуем обнаружить выбросы:

917b537df61b9634946a2260410aadcc.png

Как видно из результатов вычислений, выбросов в датасете достаточно, однако они не помешают обучению модели, как можно будет убедиться далее.

Также хочется посмотреть на корреляцию между данными и какие данные наиболее сильно влияют на показатель принадлежности пациента к группе риска. Для этого применим обычный Pandas.

Взаимная корреляция:

762c2000ae8c1a84b9a6fe396e9a6ce1.png

Наибольший вклад в диагноз:

33ee1abcafcb92ac156eb46689e7a284.png

Как хорошо видно из графиков, наибольший вклад в подтверждение диагноза вносят следующие признаки, по мере убывания:

  1. Боль в груди (давящая или сжимающая).

  2. Возраст.

  3. Аномалия движения региональной стенки.

  4. Гипертензия.

  5. Диабет.

  6. Артериальное давление.

  7. Инверсия зубца Т.

  8. Повышенный сахар натощак.

  9. Скорость оседания эритроцитов.

  10. Содержание калия в крови.

Также видно, что есть и те значения, которые отрицательно влияют на диагноз, то есть уменьшают вероятность ИБС. По мере убывания отрицательного влияния:

  1. Атипичная стенокардия.

  2. Некардиогенная боль в груди.

  3. Фракция выброса (более 55%).

  4. Диастолический шум.

  5. Нормальное количество лимфоцитов.

  6. Отсутствие одышки.

Обучение модели Catboost for Spark

Обычный Catboost и Catboost for Spark очень сильно различаются. Те методы, что работают для обычного Catboost, не работают для Catboost for Spark. По сути, это совершенно разные библиотеки с разными интерфейсами, реализующие один и тот же алгоритм.

Поэтому, попробовав обучить модель, я столкнулся со сложностями, связанными с неподходящим форматом данных:

  • Catboost for Spark отказывается работать со строками. Любые строковые значения должны в обязательном порядке быть переведены в числовые, а еще лучше в целочисленные.

  • Catboost for Spark не работает со стандартными датафреймами, имеющими разные типы данных. В обязательном порядке нужен датафрейм, состоящий только из векторов.

  • Spark-operator требует перед запуском Spark Job, чтобы все зависимости были указаны. Если указать зависимость внутри исполняемого кода, как привыкли делать разработчики в обычном Spark, то зависимость не будет подгружена. При этом в клиентской библиотеке Cloud ML Platform есть метод, который позволяет это сделать.

  • Необходимо правильно задавать ресурсы для Spark Job, иначе она может упасть из-за недостатка памяти.

Приняв во внимание все вышеперечисленное, можно приступать к обучению модели на Spark. Для этого необходимо создать два файла:

  1. Один из будет хранить исполняемый код, в нем будет проходить обучение.

  2. Второй будет запускать это обучение на кластере Spark в K8s.Он будет использовать клиентскую библиотеку Cloud ML Platform, так как она значительно упрощает процесс создания Spark Job для Spark-operator.

Листинг файла, запускающего Spark Job:

from mlplatform_client import MLPlatform

ADMIN_REFRESH_TOKEN_2 = "TOKEN"

CLUSTER_ID = "CLUSTER_ID"

admin_client = MLPlatform(ADMIN_REFRESH_TOKEN_2)

job_name = "catboost-job-1"

admin_client.spark_delete_job(CLUSTER_ID, job_name)

manifest = admin_client.get_default_manifest(CLUSTER_ID, job_name)

manifest.set_spark_conf({"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp --add-exports=java.base/sun.net.util=ALL-UNNAMED"})

manifest.set_packages(["ai.catboost:catboost-spark_3.4_2.12:1.2.2"])

manifest.set_driver_settings({"memory": "2000m", "memoryOverhead": "500m"})

manifest.set_executor_settings({"memory": "2000m", "memoryOverhead": "500m"})

manifest.set_files(["s3a://k8s-boosting-2-k8s-3d827a1641f8c4-bucket/datasets/CAD.csv"])

print(manifest)

job = admin_client.spark_submit_job(CLUSTER_ID, manifest, pycode_file_path="catboost_classification_med.py")

print(job)

Теперь, когда есть способ запустить Catboost for Spark, можно переходить к основным элементам обучения модели. Стоит обратить внимание на настройки, которые применены к дефолтному манифесту. С помощью метода set_packages можно задать любой пакет из репозитория Maven, и Spark-operator подтянет его для выполнения задачи. В данном случае указан пакет Catboost для версии Spark, которая использовалась на кластере.

Первое, что необходимо сделать, — разделить выборку на две части: тестовую и тренировочную.

train_df, test_df = df.randomSplit([0.75, 0.25])

Далее необходимо преобразовать данные датафрейма в подходящий для Catboost for Spark формат. Напоминаю, что все строковые данные должны стать числовыми, а все отдельные столбцы должны стать векторами. В этом нам помогут StringIndexer и VectorAssembler. Наиболее полезным является VectorAssembler, который автоматически переводит все столбцы к одному вектору.

from pyspark.ml.feature import VectorAssembler, StringIndexer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

sex_indexer = StringIndexer(inputCol='Sex',

              outputCol="Sex_index")

lablel_indexer = StringIndexer(inputCol="Cath", outputCol="Cath_index")

numerical_feat = ['Age', 'Weight', 'Length', 'BMI', 'BP', 'PR', 'FBS', 'CR', 'TG', 'LDL', 'HDL', 'BUN', 'ESR', 'HB', 'K', 'Na', 'WBC', 'Lymph', 'Neut', 'PLT', 'EF-TTE']

categorical_feat = ['Sex_index', 'DM', 'HTN', 'Current Smoker', 'EX-Smoker', 'FH', 'Obesity', 'CRF', 'CVA', 'Airway disease', 'Thyroid Disease', 'CHF', 'DLP', 'Edema', 'Weak Peripheral Pulse', 'Lung rales', 'Systolic Murmur', 'Diastolic Murmur', 'Typical Chest Pain', 'Dyspnea', 'Atypical', 'Nonanginal', 'Exertional CP', 'LowTH Ang', 'Q Wave', 'St Elevation', 'St Depression', 'Tinversion', 'LVH', 'Poor R Progression']

ordinal_feat = ['Function Class', "Region RWMA", "VHD"]

features = numerical_feat + categorical_feat + ordinal_feat

assembler = VectorAssembler(inputCols=features, outputCol='features')


Далее реализуем функцию, которая позволит провести все необходимые преобразования над исходными данными.

def prepare_vector(df: DataFrame) → DataFrame:

  result_df = sex_indexer.fit(df).transform(df)

  result_df = lablel_indexer.fit(result_df).transform(result_df)

  result_df = result_df.withColumn("Cath_index", col("Cath_index").cast(IntegerType()))

  result_df = result_df.withColumn("Sex_index", col("Sex_index").cast(IntegerType()))

  result_df = assembler.transform(result_df)

  return result_df

train = prepare_vector(train_df)

test = prepare_vector(test_df)

Теперь очень важный шаг. Catboost for Spark лучше всего работает с данными, которые загружены в его собственную абстракцию над датасетов — Pool. Как можно убедиться, каждый датасет содержит всего 2 столбца. Первый — это вектор из 54 значений. Второй содержит итоговые лейблы.

train_pool = catboost_spark.Pool(train.select(['features', "Cath_index"]))

train_pool.setLabelCol("Cath_index")

train_pool.setFeaturesCol('features')

eval_pool = catboost_spark.Pool(test.select(['features', "Cath_index"]))

eval_pool.setLabelCol("Cath_index")

eval_pool.setFeaturesCol('features')

Далее создаем классификатор и обучаем его. Обратите внимание, что нельзя указать функцию потерь, bootstrap и количество данных в каждом ответвлении дерева решений. Кроме того, в целом классификатор для Spark предоставляет гораздо более скудный функционал по сравнению с классификатором для Python.

classifier = catboost_spark.CatBoostClassifier(

  randomSeed=seed, iterations=900,

  odWait=1885, learningRate=0.1, l2LeafReg=13.330290657887146,

  randomStrength=49., depth=4,

  leafEstimationIterations=6, subsample=0.15606746339296185,

  featuresCol="features", labelCol="Cath_index"

)

model = classifier.fit(train_pool, evalDatasets=[eval_pool])

predictions = model.transform(eval_pool.data)

predictions.show()

evaluator_f1 = MulticlassClassificationEvaluator(

  labelCol="Cath_index",

  predictionCol="prediction",

  metricName='f1')

evaluator_precision = MulticlassClassificationEvaluator(

  labelCol="Cath_index",

  predictionCol="prediction",

  metricName='weightedPrecision')

evaluator_f1_by_label = MulticlassClassificationEvaluator(

  labelCol="Cath_index",

  predictionCol="prediction",

  metricName='fMeasureByLabel')

print(f'Model F1 = {evaluator_f1.evaluate(predictions)}')

print(f'Model Precision = {evaluator_precision.evaluate(predictions)}')

print(f"Model F-measure by label = {evaluator_f1_by_label.evaluate(predictions)}")

Далее можем наблюдать на результаты выполнения Spark Job на кластере.

F-мера модели для одного лейбла:

9f3c7cc82c757b557d31f2ba8620e43e.png

Precision модели:

a43531f3e628a972bfcf9bdad449be5e.png

F-мера для нескольких лейблов:

4b76980e545660a2cae312e3bd591153.png

Как можно заметить, в результате была получена модель, которая с вероятностью в 90% может предсказать, поставят ли пациенту диагноз ИБС или нет. В качестве основной метрики модели была выбрана именно F-мера. Она наиболее полно выражает качество модели для медицинских данных, так как показывает соотношение правильно поставленных моделью диагнозов к общему количеству правильных диагнозов. Это увеличивает вероятность того, что модель поставит правильный диагноз, так как в случае с заболеванием, которое приводит к смертельному исходу, ложноотрицательное срабатывание модели крайне нежелательно. Гораздо лучше направить пациента на обследование в случае ложноположительного срабатывания, чем не принять меры в случае ложноотрицательного.

Выводы

На примере очевидно заметно, что даже простой алгоритм градиентного бустинга способен выдавать очень хороший результат. Более того, данная задача наиболее хорошо ложится именно на бустинг, так как датасет представляет собой табличные неоднородные данные, которых может не хватить для обучения нейросети (всего 303 значения).

Также, несмотря на небольшой размер датасета, его можно экстраполировать до любых размеров и вычислительных мощностей.  Cloud Spark позволяет это сделать, при этом Catboost for Spark позволяет распараллелить эти вычисления на настоящую бигдату.

Однако стоит упомянуть и отрицательные черты Catboost for Spark.

В первую очередь это низкая скорость работы. При обычной работе внутри оперативной памяти компьютера Catboost работает очень быстро. Он может использовать все доступные вычислительные ресурсы процессора и завершить выполнение за несколько минут. Этого нельзя сказать про Catboost for Spark. Из-за того что Catboost for Spark необходимо поднимать клиент-серверное приложение, настраивать порты и общение между экзекуторами и драйвером Spark, а также выполнять обучение небольшими порциями в виде Spark task, страдает скорость выполнения. Могу лишь предположить, что это связано с сериализацией и десериализацией данных.

Еще одна проблема — скудная функциональность. Так как Catboost for Spark — это совершенно другой интерфейс и способ выполнения алгоритма бустинга, его функционала будет не хватать разработчикам, привыкшим использовать Catboost для Python, — например, не будет возможности задать функцию потерь для бустинга.

Таким образом, использовать Catboost for Spark стоит в том случае, когда у вас есть возможность обучать модель на действительно больших данных. Тогда недостатки в скорости работы и удобстве отойдут на второй план, позволяя улучшить качество финальной модели за счет большого датасета.

Попробуйте Cloud ML Platform от VK Cloud — она помогает построить процесс работы с ML-моделями от дизайна до деплоя, контролировать качество экспериментов и моделей. Для тестирования мы начисляем новым пользователям 3000 бонусных рублей и будем рады вашей обратной связи.

Stay tuned

Присоединяйтесь к телеграм-каналу «Данные на стероидах». В нем вы найдете все об инструментах и подходах к извлечению максимальной пользы из работы с данными: регулярные дайджесты, полезные статьи, а также анонсы конференций и вебинаров.

© Habrahabr.ru