Feature engineering и кластерный анализ клиентов на PySpark

d47ef3022226fbb4cab227cb03d41ff5.png

Привет, Хабр!

Сегодня с вами Смолюк Анастасия и Путилова Елена, участники профессионального сообщества NTA.

Кластеризация клиентов является важным инструментом, так как позволяет лучше понимать клиентов и предлагать им более персонализированный сервис, а также она может быть полезна для компании в поиске решения при возникновении проблем с клиентами.

BigData плотно входит в нашу жизнь, датасеты растут и постоянно изменяются, что усложняет задачу кластеризации клиентов. Обычно для задач кластеризации используется библиотека Sklearn, но с большим объёмом данных её использовать не получиться. Spark позволяет реализовать параллельные вычисления на кластерах и имеет в составе своего фреймворка библиотеку машинного обучения MLlib. В случае больших данных, когда привычные инструменты отказываются работать с такими объёмами, PySpark приходит на выручку. При этом прежде чем запустить алгоритмы машинного обучения на датасете, необходимо подготовить данные и провести feature engineering, а это достаточно трудозатратная задача, но в то же время необходимая, так как от этого этапа во многом зависит качество конечного результата. Данный этап также необходимо делать на PySpark, опять‑таки из‑за объёма данных.

Перед нами стояла задача анализа массива данных заёмщиков физических лиц — злостных неплательщиков кредитов, дела по которым уже направлены в суд. Этот массив необходимо было разбить на блоки (кластеры). Цель кластерного анализа — понять, какие группы по общим признакам можно выделить, и в дальнейшем разработать для каждой группы индивидуальную тактику взыскания, и возможно, найти пути улучшения методологии скоринга.

Навигация по посту

Подготовка данных

Идеи

Подготовка данных — этап, предшествующий анализу и требующий хорошего понимания предметной области. Предобработка осуществляется если не руками самого эксперта в этой области, то в очень тесном с ним сотрудничестве. Останавливаться на предварительной подготовке данных долго не будем, поскольку общих рекомендаций здесь не выработать, только кратко отметим основные моменты, которые мы произвели с нашим датасетом и которые отличаются от классической борьбы с отсутствующими значениями.
Выбирали признаки, которые:

  • непосредственно характеризуют именно самого заёмщика, не кредитный продукт, не договор и прочее;

  • имеют значение до выхода на просрочку (то есть, например, данные по процедурам взыскания в анализ не берём, так как хотим разобраться в причинах, которые к этому привели).

Убрали признаки:

  • дублирующие друг друга по существу (например, остаток основного долга (ОД) в валюте и остаток ОД в рублях — достаточно оставить только один показатель);

  • по которым слишком много вариантов (например, 100–200 значений для признака «должность на месте работы»).

В результате предобработки датасета количество исходных данных существенно сократилось. В исходном датасете количество признаков достигало 191, после чистки на основе описанных выше идей их осталось 43. Среди них:

  • признаки, связанные с первым кредитным договором: вид кредитования, срок кредита, признак реструктуризации, дата выдачи кредита, ставка, валюта и т. д.;

  • числовые признаки (итого по всем договорам): сумма обеспечения, сумма общей задолженности в рублях, сумма погашений по основному долгу;

  • признаки — индивидуальные характеристики заёмщика: пол, возраст, резидентство, признак vip, наличие заграничного паспорта, категория надёжности, данные, связанные с рабочей деятельности, данные, связанные с собственностью и т. д.

Реализация на PySpark

После импорта необходимых библиотек и создания сессии Spark, входной точки каждого PySpark приложения, загружаем исходные данные и подготовленный совместно с экспертом список признаков в объекты Spark DataFrame. DataFrame — одна из двух абстракций массива данных в Spark (вторая абстракция RDD), которая предоставляет более высокоуровневое API (по сравнению с RDD). Метод загрузки данных зависит от формата файла (в рассматриваемом примере мы загружали CSV, но может быть JSON, ORC, Parquet и др.).
Далее на основе списка признаков из исходного датасета отбираем нужные колонки. Группируем полученный массив данных по идентификатору заёмщика, при этом для колонок с числовыми значениями данные суммируем, а для колонок с категориальными значениями оставляем только первое значение. Суммирование объясняется тем, что по заёмщику интересует полная сумма долга в случае нескольких займов. А выбор первого категориального значения из всех возможных обусловлен тем, что фокус интереса оставался на первичном обращении заёмщика (первом выданном кредите). Другие категориальные признаки (например, пол заёмщика) либо не меняются от одного займа к другому,  либо даже если подвержены изменениям (например, резидентство), то интерес представляет статус на момент первого обращения.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import first, sum, col

input_path = 'YOUR_PATH_TO_DATA' # путь до исходного датасета
feature_path = 'YOUR_PATH_TO_FEATURE_LIST' # путь до файла со списком отобранных экспертом признаков

# создание spark-сессии
spark = SparkSession.builder.appName("feature_engineering").getOrCreate()
# загрузка исходных данных и списка релевантных для задачи признаков в объект DataFrame
df = spark.read.csv(input_path, sep='~', inferSchema = True, header = True)
df_col = spark.read.csv(feature_path, sep='~', inferSchema = True, header = False)

# формируем датасет только с релевантными признаками
col_list = [row[0] for row in df_col.collect()]
df_features = df.select(col_list)

# группируем данные по id заёмщика, при этом агрегируем (суммируем) данные колонок 
# с числовыми значениями, а по категориальным признакам оставляем первое значение
id_col = ['CLIENT_ID'] # колонка с идентификатором заёмщика
numeric_col = ['BALANCE', 'AMOUNT'] # список колонок с числовыми значениями
# список колонок с категориальными значениями получаем вычитанием
categorial_col = list(set(df_features.columns) - set(numeric_col) - set(id_col))

grouped_df = df_features.groupBy(id_col).agg(
    *[first(col).alias(col) for col in categorial_col],
    *[sum(col).alias(col) for col in numeric_col]
)

Конструирование признаков (Feature Engineering)

Добавление дополнительных признаков + UDF

После первичной подготовки данных для нашего DataFrame решили заменить дату рождения на возраст, чтобы сократить число возможных вариантов признака; возраст заёмщика важный признак, но точность до дней существенной роли не играет. Для того, чтобы добавить в наш DataFrame новую колонку, в которой хранилось бы значение возраста в годах, можно воспользоваться методом withColumn объекта Spark DataFrame. Первым параметром передаётся название колонки, вторым — значение, в нашем случае оно будет вычисляться на основе разницы в месяцах между текущей датой и датой рождения заёмщика, и для получения значения возраста в годах полученную разницу надо разделить на количество месяцев в году — 12:

from pyspark.sql.functions import round, current_date, months_between
df = grouped_df.withColumn('age', round(months_between(current_date(), grouped_df['date_of_birth'])/12, 0).cast('int'))

Подобную логику можно применить и к другим колонкам с датами — с целью уменьшения количества значений признака оставить только год от даты (например, даты выдачи ссуды):

from pyspark.sql.functions import year
df = df.withColumn('year_start_loan', year(df['loan_start_date']))

Также есть возможность создавать свои пользовательские функции UDF (user‑defined function). UDF в целом широко используется в Spark — когда нет необходимого вам функционала, можно самостоятельно написать его, определив свою UDF функцию. Эта же возможность может быть применена к задачам Feature Engineering. Например, можно реализовать расчёт заработной платы с учётом районного коэффициента rate:

from pyspark.sql.functions import udf

@udf('integer')
def numWithRate(num):

    return rate*int(num)
	
df = df.withColumn('sum_with_rate', numWithRate(df.sum))

Бинаризация

Определённые колонки можно бинаризовать — трансформировать к виду, когда диапазон возможных значений сужается до двух: 0 и 1. Например, есть признак «дата смерти», но нас интересует не сама дата как таковая, а факт смерти. Такой признак можно преобразовать в двоичные числа 0 и 1:

from pyspark.ml.feature import Binarizer
from pyspark.sql.types import DoubleType

df = df.withColumn('year_of_death', (year(df['date_of_death'])/1).cast(DoubleType()))
binarizer = Binarizer(threshold=0, inputCol='year_of_death', outputCol='is_dead')
df = binarizer.transform(df)
df = df.na.fill(0)

Заполнение пустых значений

Для заполнения отсутствующих значений (пустые, null, NaN) в Spark реализован класс Imputer. Входные колонки/колонка должны быть числового типа (не категориального). Заполнение может быть средним значением, медианой либо модой — это устанавливается с помощью метода setStrategy («mean|median|mode»), или пользовательским значением с помощью метода setMissingValue (custom_value):

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ['col1', 'col2'], outputCols = ['col1_out', 'col2_out'])
imputer.setStrategy('median')
model = imputer.fit(df)
df = model.transform(df)

One-Hot Encoding и Pipeline

Модели машинного обучения требуют, чтобы все поданные на вход признаки были числового типа. Поэтому все категориальные признаки должны быть закодированы в численное представление перед подачей в модель. С этой задачей справляется One‑Hot Encoder (OHE), реализованный на PySpark, этот тип кодирования основывается на создании вектора с бинарными значениями под каждое возможное значение признака. Если значения признаков представляют собой текст, то нужно их предварительно перевести из строк в индексы. Сделаем это с помощью Pipeline — конвейера, который последовательно запускает стадии (на вход Pipeline подаётся аргумент stages типа список) в определённом порядке. В качестве стадии могут быть объекты двух типов: Transformer и Estimator. Первый используется для преобразования DataFrame (реализует метод transform), а второй — для обучения ML моделей (реализует метод fit):

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when

pipe_category = df.withColumn('category',\
    when(df.category.isNull(), "нет категории").otherwise(df.category))
pipe_final = pipe_category.withColumn('description',\
    when(pipe_category.description.isNull(), "нет категории").otherwise(pipe_category.description))
stage_idx = StringIndexer(inputCols=['category', 'description'],
                        outputCols=['category_idx', 'description_index'])
stage_ohe = OneHotEncoder(inputCols=['category_idx', 'description_index'],
                        outputCols=['category_OHE', 'description_OHE'])
pipeline = Pipeline(stages=[stage_idx, stage_ohe])
df = pipeline.fit(pipe_final).transform(pipe_final)

Удаление ненужных признаков

После всех преобразований перед финальной сборкой не забываем удалять колонки с промежуточными вычислениями или ставшие ненужными:

col2del = ['date_of_birth', 'loan_start_date', 'date_of_death', 'year_of_death']
df = df.drop(*col2del)

Сборка в вектор

Когда все преобразования с датасетом завершены, нужно собрать множество признаков в один вектор, который затем будет подан на вход алгоритму машинного обучения. Эту сборку в вектор осуществляет класс VectorAssembler, в качестве аргумента inputCols необходимо передать список всех признаков:

from pyspark.ml.feature import VectorAssembler

assemble = VectorAssembler(inputCols = ['last_payment', 'age'], outputCol = 'features')
assembled_data = assemble.transform(df)

Дополнительные возможности Feature Engineering на PySpark

У PySpark есть дополнительный инструментарий для конструирования признаков, который мы не задействовали в рамках нашей задачи, но который стоит упомянуть. Функционал делится на четыре группы: извлечение признаков, преобразование признаков, отбор признаков и локальное хеширование (LSH — Locality Sensitive Hashing).

Извлечение признаков представлено FeatureHasher, классом, который превращает множество категориальных и числовых признаков в вектор признаков определённой размерности, и тремя классами для работы с текстовыми данными: CountVectorizer, Word2Vec и TF‑IDF.

Преобразование признаков — самая густонаселённая функционалом группа. Помимо рассмотренных на конкретных примерах возможностей есть классы для работы с текстовыми данными: Tokenizer (разбивает текст на составляющие — токены), StopWordsRemover (убирает стоп‑слова — слова, которые не несут смысловую нагрузку), NGram (для формирования из токенов N‑грамм); классы для нормализации — преобразования данных к неким безразмерным единицам, иногда в рамках определённого диапазона, иногда с заданным свойством: Normalizer (нормализация в соответствии с единичной нормой), StandardScaler (трансформация каждого объекта так, чтобы он имел стандартное отклонение), MinMaxScaler (приведение к диапазону [0,1]), MaxAbsScaler (приведение к диапазону [-1,1]), RobustScaler (удаление медианы и масштабирование в соответствии с определённым квантильным диапазоном).

Обзор библиотеки MLlib

MLlib — библиотека машинного обучения в фреймворке Spark. Основная особенность связана с предназначением Spark в целом — ориентированность на параллельную обработку, что целесообразно в случае большого объёма данных. MLlib верхнеуровнево предоставляет следующие возможности:

  • ML‑алгоритмы: классификация, регрессия, кластеризация и коллаборативная фильтрация;

  • инструменты ML процесса: преобразование признаков: стандартизация, нормализация, хеширование; конструирование ML конвейера (pipeline); оценка моделей и настройка гиперпараметров;

  • другие инструменты:  линейная алгебра, статистика, обработка данных.

На данный момент алгоритмов кластеризации, реализованных в MLlib, немного:

  • K‑means — метод k средних, разбивает множество элементов векторного пространства на заданное число кластеров k;

  • LDA (Latent Dirichlet allocation) — латентное размещение Дирихле;

  • Bisecting K‑means — модификация алгоритма k средних, при котором происходит итеративное разделение множества кластера;

  • GMM (Gaussian Mixture Model) — модель Гауссовой смеси — разложениесложного распределения на составляющие кривые нормального распределения;

  • PIC (Power Iteration Clustering) — алгоритм кластеризации вершин графа.

LDA используют для анализа текстовых данных (тематическое моделирование), GMM — для анализа визуальных данных, PIC используется для графов, поэтому для нашего кейса мы использовали универсальный метод K‑means и его модификацию Bisecting K‑means.

Подбор оптимальных гиперпараметров для кластеризации (метод силуэта)

Алгоритмы кластеризации K‑means и Bisecting K‑means принимают на вход число k, на которое впоследствии разбивает массив данных. Выбор того, каким числом задать количество кластеров, непростая задача, и существует несколько подходов, например: экспертное мнение, метод локтя, силуэт‑скор. Мы выбрали метод силуэта, он даёт наглядную картину для принятия решения (на эту тему есть статья на Хабр). Суть метода: для каждой точки рассчитывается коэффициент силуэта, который представляет собой меру качества кластеризации. Коэффициент находится в диапазоне от -1 до 1, чем ближе показатель к 1, тем больше точки похожи на другие точки того же кластера и отличаются от точек других кластеров, и соответственно, тем более плотными и хорошо разделёнными оказываются формируемые кластеры. Выбирается количество кластеров, которое даёт наивысший средний коэффициент силуэта для всех точек данных.

Вычисление силуэта на PySpark (код для разных алгоритмов кластеризации отличается только используемым классом PySpark: в одном случае KMeans, в другом BisectingKMeans — второй приведён в закомментированном виде):

from pyspark.ml.clustering import KMeans
#from pyspark.ml.clustering import BisectingKMeans # для модифицированного K-means
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt

silhouette_scores = []
evaluator = ClusteringEvaluator(featuresCol='features', \
metricName='silhouette', distanceMeasure='squaredEuclidean') # помимо выбранной Евклидовой меры расстояния доступна ещё косинусная близость distanceMeasure='cosine', но мы в дальнейшем будем запускать метод K-Means, а он использует Евклидово расстояние в качестве меры расстояния

for K in range(2,11):
    KMeans_ = KMeans(featuresCol='features', k=K)
    #KMeans_ = BisectingKMeans (featuresCol='features', k=K) # для модифицированного K-means
    KMeans_fit = KMeans_.fit(assembled_data)
    KMeans_transform = KMeans_fit.transform(assembled_data)
    evaluation_score = evaluator.evaluate(KMeans_transform)
    silhouette_scores.append(evaluation_score)    

fig, ax = plt.subplots(1,1, figsize =(10,8))
ax.plot(range(2,11),silhouette_scores)
ax.set_xlabel('Number of Clusters')
ax.set_ylabel('Silhouette Score')

Для алгоритма K‑means для нашего датасета получили наилучший коэффициент силуэта при k=3 (левый график), а для Bisecting K‑means оптимальным получилось значение k=4 (правый график, значение k=2 не приняли в расчёт из‑за грубого обобщения):

Подбор гиперпараметров (метод силуэта) для алгоритма K-means

Подбор гиперпараметров (метод силуэта) для алгоритма K-means

Подбор гиперпараметров (метод силуэта) для алгоритма Bisecting K-means

Подбор гиперпараметров (метод силуэта) для алгоритма Bisecting K-means

Кластеризация

Запускаем кластеризацию K‑means с подобранным для этого алгоритма количеством кластеров k=3:

from pyspark.ml.clustering import KMeans

n_clusters = 3
KMeans_ = KMeans(featuresCol='features', k=n_clusters)
KMeans_Model = KMeans_.fit(assembled_data)
KMeans_Assignments = KMeans_Model.transform(assembled_data)

	Bisecting K-means запускаем с вычисленным для алгоритма оптимальным k=4:

from pyspark.ml.clustering import BisectingKMeans

n_clusters = 4
KMeans_ = BisectingKMeans (featuresCol='features', k=n_clusters)
KMeans_Model = KMeans_.fit(assembled_data)
KMeans_Assignments = KMeans_Model.transform(assembled_data)

В результате выполнения появится колонка «prediction» с номером кластера, к которому алгоритм отнёс каждый кейс. Для обоих алгоритмов кластеризации получили похожие результаты с оговоркой, что Bisecting K‑means дополнительно первый кластер разделил по суммам основного долга — на очень крупные и крупные.

При анализе полученных результатов мы пришли к выводам:

  • первый кластер в основном на покупку строящегося жилья с крупными суммами основного долга (в среднем 2 миллиона рублей), возраст заёмщиков 35–45 лет, в основном есть обеспечение;

  • второй кластер военная ипотека со средними суммами основного долга (в среднем 600 тысяч рублей), заёмщики — в основном мужчины, возраст 25–35 лет, без обеспечения;

  • третий кластер — потребительские кредиты и кредиты по карте, самые небольшие суммы (около 100 тысяч рублей в среднем), возрастной диапазон широкий 35–60 лет, в основном без обеспечения.

Заключение

Опыт кластеризации на PySpark оказался успешным. Результаты кластеризации, хоть и немного отличающиеся для каждого алгоритма, в целом задали вектор, по каким признакам можно разделить монолит кредитных неплательщиков на группы, и предоставила инсайты для дальнейшей тактики взыскания, индивидуальной для каждой группы. И мы сочли разумным выделение именно таких кластеров.

Проделанная работа на PySpark дала ожидаемые плоды: для большого датасета мы быстро получили разумное разделение на группы. И такой анализ обычно требуется на регулярной основе, поэтому не стоит убирать данную работу в стол — в скором будущем мы его оперативно повторим на новых данных. Кроме того, приведенный в статье код для feature engineering и кластерного анализа на PySpark легко адаптировать для любого датасета структурированных данных.

© Habrahabr.ru