Что лучше: Spark Structured Streaming или полное прекращение работы прода?
Правильное построение ETL-процессов (преобразования данных) — сложная задача, а при большом объёме обрабатываемых данных неизбежно возникают проблемы с ресурсами. Поэтому нам требуется выискивать новые архитектурные решения, способные обеспечить стабильность расчётов и доступность данных, а при необходимости и масштабируемость — с минимальными усилиями.
Когда я пришел в Ozon, мне пришлось столкнуться с огромным количеством ETL-джоб. Прежде чем применить модель машинного обучения, сырые данные проходят множество этапов обработки. А само применение модели (то, ради чего существует команда) занимает всего 5% времени.
Всем привет! Меня зовут Алексей, и в Ozon я занимаюсь матчингом. Что такое матчинг и зачем он нужен, мой коллега @alex_golubev13 объяснил в статье «Векторное представление товаров Prod2Vec».
Ежедневно у нас добавляются сотни тысяч новых товаров, а также меняются те, которые уже есть на сайте. Это могут быть изменения картинок, описаний, названий или цен. Процесс ETL в данном случае заключается в извлечении признаков из товаров, которые появились или обновились в течение заданного промежутка времени (на данный момент за день). Данные мы забираем из HDFS и Hive, а для работы с ними используем PySpark.
Сразу скажу, что большую часть ресурсов и времени в ETL занимает обработка изображений и текстовых данных. Так, каждое изображение проходит через несколько свёрточных нейронных сетей, которые возвращают векторное представление для картинки (эмбеддинг). Для текста — та же схема.
Сначала ETL-процесс состоял из batch-джоб, которые брали партиции данных за конкретную дату и целиком её обрабатывали. Понятно, что с ростом числа товаров они будут работать всё дольше и дольше, а объём потребляемых ресурсов будет только расти. Особенно заметно это во время действия акций и сезонных распродаж — тогда часто меняется цена и добавляется много новых товаров. В такие моменты приходилось значительно поднимать память для приложения. К тому же процесс стал занимать слишком много времени — и весь остальной пайплайн был вынужден ждать завершения ETL. Всё закончилось тем, что на количество товаров, проходящих через ETL, выставлялся лимит, и максимально туда шла треть всех обновившихся товаров. Понятно, что при таком подходе очередь товаров, которые не проходят через пайплайн, будет стремительно расти.
Для того чтобы избежать большой очереди, мы решили никогда не останавливать наш пайплайн ETL — он теперь работает постоянно. Так мы пришли к Spark Structured Streaming.
Как у нас всё работает
Spark Structured Streaming позволяет работать с потоковыми данными, при этом можно использовать все преимущества Spark SQL. Теперь все обновления едут в Kafka-топик, а Streaming Session читает данные из него, обрабатывает и складывает в HDFS. Затем раз в день мы забираем эти данные и обновляем таблицы, которые являются результатом ETL. Таким образом, можем не выставлять лимит на количество обрабатываемых товаров в день, получая обновления равномерно в течение суток. Эмпирическим путём выяснили, что за день стриминг способен обрабатывать около 20 млн изображений и 50 млн текстовых объектов.
В качестве нейросетей для получения эмбеддингов используем BERT, ResNet50, fastText, NFNet, а с недавнего времени также считаем эмбеддинги для модели Prod2Vec.
Если вы работаете с PySpark (или просто со Spark), то наверняка знаете о возможности создания пользовательских функций (user-defined functions, UDF). На данный момент PySpark позволяет использовать три вида таких функций: Python UDF, Pandas UDF, Scala UDF. Об основных отличиях и бенчмарках можно прочитатьв этой статье, а я лишь скажу, что для инференса моделей используем Pandas UDF.
Давайте на примере рассмотрим, как можно выполнять инференс ML-моделей с использованием PySpark Structured Streaming и Pandas UDF, а в качестве источника сообщения используем Kafka. Весь код ниже актуален для PySpark 3.X.
Для начала — небольшой ликбез по основной терминологии Kafka.
Kafka — это инструмент, который позволяет работать с потоками событий. Например, есть приложение, которое пишет много логов. Хочется иметь к ним быстрый доступ, чтобы анализировать и делать какие-то выводы или просто сохранять в базу данных. Kafka в этом случае — посредник между приложением, которое пишет логи, и приложением или человеком, читающим эти логи.
Чтобы ориентироваться в Kafka-терминологии из данной статьи, необходимо знать про три вещи. На примере приложения с логами определим Producer, Consumer и Topic:
Topic показывает, где будут храниться логи в Kafka. Можно представить, что Topic — это папка, а каждый лог в нём —файл из этой папки. У каждого объекта (лога, сообщения) есть свой индекс (offset). Kafka также партиционирует топик, разбивая его на несколько частей и раскидывая по Kafka-кластеру.
Producer в данном случае будет записывать логи в Topic. Он создаётся в логируемом приложении и пишет всё, что ему скажет пользователь.
log = get_log ()
producer.produce (log, topic)
Consumer будет читать сообщения, которые находятся в топике. Он создаётся в приложении для чтения и обработки логов. При этом один топик могут читать сразу несколько консьюмеров.
Начинаем
Представим, что есть Kafka-топик, куда поступает событие изменения описания товара на сайте или добавления нового. Необходимо обрабатывать все такие события и извлекать необходимую информацию из текстов. Скажем, что известна схема сообщений (protobuf-схема), которые находятся в топике (ID и описание товара), а к текстам мы хотим применять какую-то ML-модель, которая возвращает эмбеддинг из текста.
syntax = "proto3";
message ItemText{
int64 item_id = 1;
string item_text = 2;
}
Чтобы десериализовывать proto-сообщения в Python, необходимо создать .py-файл из .proto-файла. Я это делаю командой protoc --python_out=.
.
Если вы не знакомы с Protobuf, то можно перейти по ссылке — и буквально за 30 секунд понять, что это :)
Объявим функцию, которая будет создавать и возвращать текстовую модель:
class TextModel:
...
def predict(self, x):
# your code
return model_prediction
...
def get_text_model(**kwargs) -> TextModel:
# your code
return text_model
Определим функции process_text
, которая будет добавлять колонку «embedding» к входным данным, и get_dataframe_from_messages
, которая десериализует сообщения.
# types_pb2 как раз создается из .proto
from types_pb2 import ItemText
def get_dataframe_from_messages(messages: pd.Series) -> pd.DataFrame:
proto_buffer = ItemText()
schema = [
"item_id",
"item_text",
]
columns = {col: [] for col in schema}
for msg in data:
data = proto_buffer.FromString(msg)
for col in columns:
columns[col].append(getattr(data, col))
return pd.DataFrame(columns)
@F.pandas_udf("item_id int, item_text string, embedding array")
def process_text(data: pd.Series) -> pd.DataFrame:
model = get_text_model(**kwargs)
data = get_dataframe_from_messages(data)
data["embedding"] = data["item_text"].apply(lambda x: model.predict(x))
return data
Функция обработки сообщения моделью готова. Теперь нужно научиться работать с топиком и поднять Spark Session для стриминга.
Создадим сессию и подпишемся на конкретный топик:
spark = SparkSession.builder.getOrCreate()
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("subscribe", topic)
)
Для контроля сессии также рекомендуется выставлять дополнительные параметры. Список таких параметров доступен в документации. Наиболее важными, на мой взгляд, являются:
maxOffsetsPerTrigger — отвечает за количество сообщений, которые попадают в батч;
minPartitions — указывает число партиций, на которое разбивается этот самый батч. Неправильный выбор этого параметра может значительно замедлять стриминг в micro-batch режиме.
Рассмотрим пример, когда у топика есть три партиции. Тогда по умолчанию в параметрах контекста будет minPartitions = 3
. Это значит, что на каждый экзекутор прилетит по одной партиции, и текущий батч будет состоять из трёх задач.
Так, а что делать, если хочется повысить производительность стриминг-джобы? Какой параметр нужно изменить?
Окей, вы добавляете больше экзекуторов в Spark-приложение в надежде, что оно ускорится. К сожалению, оно не ускоряется ☹️ Что же происходит в действительности?
Добавляется один экзекутор, для которого просто нет данных, так как в параметрах контекста значение minPartitions
выставлено по умолчанию. В итоге такой экзекутор простаивает. Можно ли тогда изменить число партиций в Kafka, чтобы оно совпадало с количеством экзекуторов? Да, так тоже можно сделать, но только если у вас есть доступ к настройкам топика