Бутстрап в PySpark
Всем привет! Меня зовут Илья Черников, я аналитик больших данных в X5 Tech, сейчас занимаюсь аналитикой и оценкой активностей CVM маркетинга экспресс-доставки «Пятёрочки».
В статье я расскажу о том, как мы решали вопрос автоматизации оценки эффективности большого количества маркетинговых кампаний с помощью бутстрапа в PySpark. Я опишу различные подходы к реализации бутстрапа с их плюсами и минусами, а также расскажу об итоговом варианте, который мы выбрали для себя.
Небольшой сэмпл данных и тетрадки с примерами запусков описанных ниже вариантов реализации можно увидеть в репозитории.
Небольшая предыстория
В наши обязанности, как аналитиков больших данных, входит широкий спектр задач: от ад-хок запросов бизнеса, до разработки аналитических моделей и расчётов на кластере больших данных. Наши основные рабочие инструменты — это Hive и PySpark, они используются в 90% задач.
В направлении маркетинга, кроме различных исследований и предварительного анализа инициатив, есть важная часть нашей работы — оценка эффективности запускаемых маркетинговый коммуникаций.
Полтора года назад, когда я пришёл в этот продукт, маркетинг запускал максимум 5–10 коммуникаций в месяц, и аналитика по ним проводилась вручную — просто по точечным оценкам в результате пилота. Аналогично оценивались и результаты работы в течение месяца по глобальным целевым и контрольным группам.
Соответственно, первым делом надо было хотя бы на коленке наладить процесс оценки кампаний с учётом статистической значимости. Мы быстро накидали тетрадку, где с помощью PySpark собирали статистики по клиентам кампании, переводили их в Pandas и сравнивали метрики целевой и контрольной групп с помощью бутстрапа. Результаты по всем кампаниям собирали в Excel и отправляли бизнес-заказчикам.
В какой-то момент мы решили оценить влияние коммуникаций всей «Пятёрочки» на привлечение клиентов в экспресс-доставку, что потребовало сделать оценку по существенно большим группам. Собрав данные по всем необходимым статистикам и попытавшись их обработать, очевидно получили проблему: при скачивании данных в Pandas просто не хватало оперативки.
Тогда у меня в первый раз возникла мысль:»У нас же есть Spark, который отвечает за параллелизацию вычислений. А, может, можно сделать бутстрап сразу в нём? ». Я накидал разные варианты реализации, но времени на полноценные тесты и оптимизацию не было, так что это всё так и осталось лежать в тетрадке. Проблему тогда решил просто за счёт последовательного расчёта по каждой метрике (что снизило объём скачиваемых данных за один раз).
С течением времени росла клиентская база и количество кампаний, что привело к тому, что один из аналитиков сидел целыми днями и обсчитывал результаты кампаний. В этот момент стало очевидно, что надо это дело автоматизировать, а результаты представить в дашборде — тогда во второй раз посетила мысль:»А всё-таки было бы интересно рассчитывать всё сразу в Spark».
Таким образом проблема автоматизации и масштабирования существующего решения вылилась для нас в задачу — «Как реализовать расчёт оценки десятков маркетинговых кампаний на миллионы пользователей с помощью распределённых вычислений на PySpark?». Процесс поиска решения данной задачи описан ниже.
Почему бутстрап
Для начала стоит уточнить, почему именно был выбран бутстрап.
Конечно, можно было бы не изобретать велосипед и пойти в сторону использования T-test, который проще реализуется и работает быстрее. Однако, наше направление маркетинга только развивается, и часто появляются новые идеи, например, посмотреть на перцентили. В такой ситуации хотелось бы иметь наиболее универсальный инструмент, с которым количество вопросов «А какой метод расчёта применить к той или иной метрике?» стремилось бы к минимуму. Бутстрап, по сути, и является самым универсальным методом, поэтому решили сделать выбор в пользу него.
На всякий случай оставлю ссылку на одну из первых статей в поиске про бутстрап на Хабре, в которой довольно понятно описана его концепция: Бутстрап: швейцарский нож аналитика в A/B-тестах.
Ограничения
Важно отметить, что у нас есть несколько ограничений:
Наш продукт пока работает на PySpark 2.4.4. Часть команд уже переехала на связку Pyspark 3.x + Hadoop 3, но у нас пока это только в планах.
Наш продуктивный код мы пишем с использованием собственного фреймворка PySparkPipeline. Внутри команды мы придерживаемся подхода, в рамках которого желательно (по возможности), чтобы внутри одного процесса были только ленивые вычисления (то есть не было запуска самих расчётов). В результате возвращается Spark DataFrame, то есть именно схема вычислений, и единственное вызываемое действие — это запись в БД.
У такого подхода есть свои плюсы и минусы, но мы не будем углубляться в них в рамках текущей статьи. Скажем лишь, что этот подход хорошо прижился в нашем продукте.
Описание данных
Для лучшего понимания описанных ниже подходов стоит ненадолго остановиться на описании структуры данных, на которых производятся вычисления.
До расчёта эффекта по кампаниям необходимо предварительно собрать статистики по участникам в датафрейм, который поступает на вход каждого из описанных подходов ниже.
Ниже приведён урезанный и немного преобразованный пример данных, которые используются у нас.
Описание полей:
camp_id: идентификатор кампании;
control_group_flg: флаг принадлежности клиента к контрольной группе кампании;
revenue: выручка, которую нам принёс клиент во время кампании;
margin: маржа, которую нам принёс клиент во время кампании;
conversion: флаг того, что клиент откликнулся на предложение.
Какие же есть варианты
Итак, поискав решения в интернете, я понял, что при возникновении похожей проблемы, люди уходили в оптимизацию путём использования различных статистических критериев для разных метрик и выборок. Мне же очень хотелось иметь один универсальный инструмент, который можно было бы использовать автоматически на ежедневной основе и не думать больше ни о чём.
1. Бутстрап в UDF
Самый первый вариант, пришедший в голову — это написать UDF, который будет применяться при группировке. Таким образом, если у нас, например, выделено 100 воркеров, то мы можем параллельно обсчитать 100 кампаний.
Изначально было понятно, что этот вариант вряд ли подойдёт из-за того, что наши кампании могут включать как тысячу клиентов, так и десяток миллионов. При этом, ни мы, ни Spark, естественно, не знаем, какая кампания на какой воркер попадёт. Получается, что если хотя бы одна акция потребует большого объёма памяти, то все наши воркеры должны быть способны такой объём в себя вместить. А такое выделение ресурсов нельзя назвать оптимальным.
Эту проблему, в теории, можно было бы попробовать решить с помощью объединения небольших кампаний в батчи, чтобы на воркер попадало хотя бы относительно оптимальное количество данных, но это увеличило бы время выполнения и никак не решило бы проблему кампаний с большим количеством участников.
Несмотря на эти предположения, я ни разу не сталкивался с UDF, применяемыми при группировке, поэтому мне стало интересно всё-таки попробовать.
Код получился относительно простой, по сути я просто добавил определение схемы и немного переформатировал для работы с Pandas вместо NumPy:
# Список ключевых столбцов
KEY_COLS = ['camp_id']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
ALPHA = 0.05
BS_ITERS = 5000
# Определяем схему дф, который будет возвращаться из pandas_udf,
schema = T.StructType(
# Ключевые поля
[T.StructField(i, T.StringType()) for i in KEY_COLS]
+
# Схема для левой и правой границ доверительного интервала всех метрик
[T.StructField(metric, T.ArrayType(T.DoubleType()))
for metric in STATISTICS_COLS]
)
# Создаем udf для расчета статистик по нашей схеме
@F.pandas_udf(schema, functionType=F.PandasUDFType.GROUPED_MAP)
def bs_on_executor(pdf: pd.DataFrame) -> pd.DataFrame:
"""Функция расчета доверительного интервала для нескольких метрик"""
# На каждом воркере выставляем переменную ARROW_PRE_0_15_IPC_FORMAT
# чтобы на нашей версии PySpark нормально работал более новый PyArrow
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'
# получаем значение KEY_COLS для того, чтобы их вернуть в ответе
keys = pdf[KEY_COLS].iloc[0].tolist()
# контрольная группа
a = (pdf.loc[pdf['control_group_flg'] == 1]
.reset_index(drop=True)[STATISTICS_COLS])
# пилотная группа
b = (pdf.loc[pdf['control_group_flg'] == 0]
.reset_index(drop=True)[STATISTICS_COLS])
len_a = len(a)
len_b = len(b)
diff_list = []
# непосредственно само бутстрапирование
for _ in range(BS_ITERS):
a_boot = a.sample(len_a, replace=True).mean()
b_boot = b.sample(len_b, replace=True).mean()
diff_list.append(b_boot - a_boot)
bs_result = pd.concat(diff_list)
# расчет доверительного интервала
ci_res = bs_result.groupby(bs_result.index).quantile([ALPHA/2, 1-ALPHA/2])
return pd.DataFrame(
[keys + [ci_res[metric].tolist() for metric in STATISTICS_COLS]])
# Сама группировка и вызов бутстрапа
result = statistics_df.groupBy(*KEY_COLS).apply(bs_on_executor)
И на небольших кампаниях это работало довольно быстро и неплохо.
Но как только я попробовал протестировать это на кампании с двумя миллионами клиентов, сразу же получил ошибку »org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
»
Причина кроется не в том, что не хватает памяти на воркере, а в том, что в более старой версии PyArrow, которая работает с нашей версией Spark, в UDF можно передавать не более 2Гб несжатых данных Ссылка на баг в Jira Apache.
Соответственно, после проверки, этот вариант точно отметается.
2. Наивный подход (explode в Spark)
Следующий подход, который, в итоге, в несколько видоизменённом виде (см. раздел «Итоговый вариант» ниже) и используется сейчас — это «наивный подход». Основная концепция следующая:
Выберем из исходного датафрейма поля с идентификатором кампании, флагом контрольной группы (сохранив исходное количество строк).
Копируем его N раз, сохраняя номер копирования в колонку «Номер итерации». N — желаемое количество бутстрап итераций (у нас это 5000).
Для каждой строки получившегося датафрейма нам нужно случайным образом выбрать запись из исходного, сохраняя принадлежность групп (имеется в виду, что должны совпадать идентификатор кампании и флаг контрольной группы).
Группируем по идентификатору кампании, группе и номеру итерации и считаем значения метрик.
Сразу сталкиваемся с проблемой поиска подхода к сэмплированию данных. У датафреймов есть метод sample
, но с его помощью нельзя получить датафрейм больше исходного, поэтому придётся выполнять эту операцию N раз в цикле, что Spark-у очень не понравится…
Я решил пойти по следующему пути:
Пройтись оконной функцией по всем строкам внутри группы и проставить порядковый номер, применив сортировку, которая всегда будет возвращать одинаковый результат (по сути, отсортировал по внутреннему идентификатору пользователя).
Для каждой строки из нашего датафрейма со статистиками генерируем массив с последовательностью цифр от 1 до N и делаем
explode
. Из этих массивов мы получаем номер итерации, и для каждой итерации у нас сохраняется количество строк, соответствующее количеству строк в изначальном датафрейме.Далее для каждой строки проставляем случайное число от 1 до <количество клиентов в группе>, которое будет соответствовать порядковому номеру из п. 1.
Таким образом, мы получили датафрейм с N итераций и списком идентификаторов, которые попали в конкретную итерацию.
Далее остаётся только заджоинить его обратно на датафрейм из п. 1.
Таким образом, мы обращаемся к источнику с данными по клиентам в кампании 2 раза. Поэтому лучше его заранее либо закэшировать, либо записать в БД.
Можно было бы оптимизировать, создавая бутстрап-датафрейм, не задействуя изначальные данные, а просто создав датафрейм с нужными количеством строк, в зависимости от количества клиентов в каждой кампании. Однако в этом случае надо было бы сохранить ключи групп и количества строк в каждой группе. А это уже не соответствует концепции PySparkPipeline по отсутствию промежуточных вычислений. Да и большой оптимизации там не получится.
Соответственно, код можно представить следующим образом:
# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 5000
# Рассчитываем количество строк в группе и порядковый номер
window_for_bs_stats = Window.partitionBy(*KEY_COLS)
statistics_df = (
statistics_df
.withColumn('rn',
F.row_number().over(window_for_bs_stats.orderBy('user_id')))
.withColumn('group_cnt', F.count(F.lit(1)).over(window_for_bs_stats))
# repartition нужен для того, чтобы дальнейшие действия по разворачиванию
# происходили на всех воркерах, а не ограниченном количестве,
# соответствующем количеству партиций из window_for_bs_stats
.repartition('rn')
).cache()
iterations_df = (
statistics_df
# Для каждой записи в нашем датафрейме генерируем количество строк = bs_iter_cnt
# Порядковый номер строки будет являться номером итерации
.select(KEY_COLS + ['group_cnt'])
.withColumn('iter_num', F.explode(F.sequence(F.lit(1), F.lit(BS_ITERS))))
# Для каждой строки в итерации случайно выбираем номер записи
# из изначального датафрейма
.withColumn('rn',
(F.floor(F.rand() * (F.col('group_cnt'))) + 1).cast('int')
.alias('rn'))
.select(KEY_COLS + ['iter_num', 'rn'])
)
# Соединяем бутстрап датафрейм с изначальным по случайному идентификатору строки
result_df = (
bs_df
.join(
statistics_df.select(KEY_COLS + STATISTICS_COLS + ['rn']),
on=KEY_COLS + ['rn'],
how='inner'
)
# Группируем по ключевым полям и номеру итерации
.groupby(KEY_COLS + ['iter_num'])
.agg(*[F.avg(stat_col).alias(stat_col) for stat_col in STATISTICS_COLS])
)
Ну, а дальше уже группируем по идентификатору кампании и считаем разницу между пилотной и целевым группами и считаем нужные перцентили для получения доверительного интервала.
Этот подход работает неплохо и работает довольно быстро. 50 миллионов строк (то есть 20–30 кампаний) обсчитываются за 30–40 минут на свободном кластере.
Но есть один существенный недостаток: в определенный момент расчёта получается 50 млн * 5000 строк. Это довольно большой объём, соответственно, нужно закладывать достаточное количество памяти на воркерах, которое будет зависеть от количества метрик и дополнительных полей, которые у вас используются.
3. Пуассоновский или Биномиальный бутстрап
В тот же самый момент, когда я размышлял над тем, как сделать наивный бутстрап, на еженедельных встречах аналитиков коллеги из другого продукта рассказали про свой подход к бутстрапу, который они недавно разработали.
Они использовали Пуассоновский бутстрап. Что это такое — хорошо рассказано в этом видео: Как я перестал беспокоиться и полюбил Пуассон-Bootstrap | Вебинар Валерия Бабушкина | karpov.courses
Основная концепция состоит в том, что вместо того, чтобы сэмплировать клиентов теста, мы для каждого клиента «сэмплируем» его появление в каждой из итераций бутстрапа. Вероятность появления клиента в каждой бутстрап итерации описывается биномиальным распределением, где n = количеству клиентов в кампании, а .
Соответственно, для каждого клиента мы можем сгенерировать распределение количества появлений этой строки в каждой итерации бутстрапа. Таким образом мы получим матрицу, где столбцами выступают номера итераций, а в строках — количество появлений каждой записи каждой итерации.
При этом, при достаточно большом количестве объектов (более 100), Биномиальное распределение начинает сходиться к Пуассоновскому распределению с lambda = 1
.
Раз распределения начинают сходиться, то мы можем использовать Пуассоновское распределение, которое не требует от нас знаний о количестве клиентов в каждой кампании. Относительно наивного подхода, с точки зрения производительности, разница состоит в том, что убирается лишний джоин, и все расчёты происходят внутри одного датафрейма.
В итоговом виде реализацию можно представить следующим образом:
# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 5000
# udf для создания массива количества появлений строки в каждой итерации
pois_udf = F.udf(lambda size: np.random.poisson(1, size).tolist(),
T.ArrayType(T.IntegerType()))
result_df = (
statistics_df
.withColumn('iter_cnt', F.lit(BS_ITERS))
# создаем массив с количеством вхождений строки в каждую итерацию
.withColumn('poisson_array', pois_udf(F.col('iter_cnt')))
# делаем posexplode, сохраняя порядковый номер итерации и количество вхождений
.select(
KEY_COLS
+ [F.posexplode('poisson_array').alias('iter_num', 'poisson')]
+ STATISTICS_COLS
)
# Убираем лишние строки, которые не участвуют в итерации бутстрапа
.filter(F.col('poisson') != 0)
# Группируем по ключевым полям и номеру итерации
.groupBy(KEY_COLS + ['iter_num'])
# Считаем сумму метрик и количество клиентов в каждой итерации
.agg(*(
[F.sum(F.col('poisson') * F.col(stat_col)).alias(stat_col)
for stat_col in STATISTICS_COLS]
+
[F.sum(F.col('poisson')).alias('total_cnt')]
))
)
# Рассчитываем значение средних для каждой итерации
for stat_col in STATISTICS_COLS:
result_df = (result_df
.withColumn(stat_col, F.col(stat_col) / F.col('total_cnt')))
Минусы данного подхода:
Остаётся проблема большого объёма данных, так как в какой-то момент всё ещё происходит умножение строк на количество итераций бутстрапа.
Спорный минус — на малых количествах записей в связи с тем, что количество строк в каждой итерации бутстрапа разное, подход может давать большую дисперсию, если количество строк не превышает нескольких сотен.
Итоговый вариант
Так как Пуассоновский или Биномиальный бутстрап всё-таки имеют хоть небольшой, но минус в точности оценки на малых выборках, я решил использовать наивный бутстрап, но добавить к нему одну из идей коллег — итерационный подход.
Суть итерационного подхода состоит в том, что генерируется не сразу N итераций, а они бьются на несколько бакетов (например, 50), соответственно, в каждом бакете количество строк умножается не на 5000, а только на 100. На каждый бакет создаётся отдельная стадия расчёта, то есть их количество и количество тасок увеличивается, но при этом оптимизация Spark срабатывает таким образом, что часть выполняется параллельно, а часть последовательно, в зависимости от доступных ресурсов, которые выделены на текущую сессию.
Таким образом, можно закладывать меньшее количество ресурсов для работы процесса. Пример разницы планов выполнения приведён ниже. Так как после разворачивания датафрейма на итерации идёт джоин, то нагрузку на память вполне можно оценить по объёму Shuffle:
Бутстрап с одной итерацией:
Бутстрап с двумя итерациями — объём данных на каждом стейдже снизился, соответственно, в один момент времени на воркерах находится меньше данных:
Код для итогового подхода выглядит следующим образом:
# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 5000
# количество итераций внутри одного батча
BS_BATCH_ITERS = 100
def naive_bs_batch(sdf: DataFrame, batch_num: int, iter_cnt: int) -> DataFrame:
"""Функция расчета одного батча бутстрапа"""
iterations_df = (
sdf
# Для каждой записи в нашем датафрейме генерим количество строк = iter_cnt
# И проставляем реальный номер итерации
.select(KEY_COLS + ['group_cnt'])
.withColumn('iter_num',
F.explode(F.sequence(F.lit(batch_num * iter_cnt + 1),
F.lit((batch_num + 1) * iter_cnt))))
# Для каждой записи итерации случайно выбираем номер записи
# из изначального датафрейма
.withColumn('rn',
(F.floor(F.rand() * (F.col('group_cnt'))) + 1)
.cast('int').alias('rn'))
.select(KEY_COLS + ['iter_num', 'rn'])
)
# Соединяем бутстрап датафрейм с изначальным по случайному идентификатору строки
batch_result = (
iterations_df
.join(
sdf.select(KEY_COLS + STATISTICS_COLS + ['rn']),
on=KEY_COLS + ['rn'],
how='inner'
)
# Группируем по ключевым полям и номеру итерации бутстрапа
# и считаем статистики для всех итераций
.groupby(KEY_COLS + ['iter_num'])
.agg(*[F.avg(stat_col).alias(stat_col)
for stat_col in STATISTICS_COLS])
# Без coalesce у нас получается
# количество партиций = кол-во shuffle.partitions * batch_iter_amt
# Из-за этого очень долго может работать union
.coalesce(1)
)
return batch_result
# количество батчей
batches_cnt = BS_ITERS // BS_BATCH_ITERS
# Рассчитываем количество строк в группе и порядковый номер
window_for_bs_stats = Window.partitionBy(*KEY_COLS)
statistics_df = (
statistics_df
.withColumn('rn',
F.row_number().over(window_for_bs_stats.orderBy('user_id')))
.withColumn('group_cnt', F.count(F.lit(1)).over(window_for_bs_stats))
.repartition('rn')
).cache()
# Считаем каждый батч и делаем union результатов
result_df = reduce(
lambda x, y: x.union(y),
[naive_bs_batch(sdf=statistics_df, batch_num=i, iter_cnt=BS_BATCH_ITERS)
for i in range(batches_cnt)]
)
Заключение
Бутстрап в PySpark возможен и несложно реализуем. Все три описанных подхода в определённых условиях будут работать, и довольно стабильно.
Если у вас небольшие кампании и версия PySpark выше 3, то вполне можно использовать подход расчёта в UDF, тем более, что такие обработки просто привычнее писать на Python.
Если у вас стабильно большие кампании, то стоит использовать Пуассоновский/Биномиальный бутстрап.
Если размер очень сильно варьируется от кампании к кампании, то для себя я выбрал наивный подход.
При этом к последним двум подходам при достаточно большом количестве данных можно применять итерационную оптимизацию (как в итоговом варианте) для повышения стабильности и снижения объёма потребляемых данных в моменте.
У нас этот процесс запущен на ежедневной основе ночью, когда кластер свободен. Весь расчёт, включая сбор агрегатов, занимает от 30 минут до 1,5 часов. Результаты расчётов транслируются в дашборд со списком кампаний и рассчитанными метриками.