Как мы в ivi переписывали etl
Год назад мы решили переделать схему сбора данных в приложении и данных о действиях клиентов. Старая система работала исправно, но с каждым разом было всё сложнее и опаснее вносить туда изменения.
В этой статье я расскажу какие технологии мы стали использовать для сбора и агрегации данных в новом проекте.
Так выглядела наша старая схема движения данных.
Множество данных от наших микросервисов, переливались скриптами в Hive.
Flume грузил клиентские данные из Kafka в ещё несколько таблиц, плюсом Flume грузил информацию о просмотрах из файловой системы одного из сервисов. Кроме этого были десятки скриптов в cron и oozie.
В какой-то момент мы поняли, что так жить нельзя. Такую систему загрузки данных практически невозможно тестировать. Каждая выгрузка сопровождается молитвами. Каждый новый тикет на доработку — тихим скрежетом сердца и зубов. Сделать так, чтобы система была полностью толерантна к падениям какого-либо её компонента стало очень сложно.
Подумав о том каким мы хотим видеть новый ETL и примерив технологии и молитвы мы получили следующую схему:
- Все данные поступают по http. От всех сервисов. Данные в json.
- Храним сырые (не обработанные) данные в kafka 5 дней. Кроме ETL, данные из kafka также используют и другие backend-сервисы.
- Вся логика обработки данных находится в одном месте. Для нас это стал java-код для фреймворка Apache Flink. Про этот чудо-фреймворк чуть позже.
- Для хранения промежуточных рассчётов используем redis. У Flink есть своё state-хранилище, оно толератно к падениям и делает чекпоинты, но его проблема в том, что из него нельзя восстановится при изменении кода.
- Складируем всё в Clickhouse. Подключаем внешними словарями все таблицы, данные из которых микросервисы не отправляют нам событиями по http.
Если про самописный http-сервис складирующий данные в kafka и про сам сервис kafka писать нет смысла, то вот про то как мы используем Flink и ClickHouse я хочу остановится подробнее.
Apache Flink
Apache Flink — это платформа обработки потоков с открытым исходным кодом для распределенных приложений с высокой степенью отказоустойчивости и толерантностью к падениям.
Когда данные для анализа нужны быстрее и необходима быстрая агрегация большого потока данных для оперативной реакции на определенные события — стандартный, батчевый подход к ETL уже не работает. Тут-то нам и поможет streaming-processing.
Прелесть такого подхода не только в быстроте доставки данных, но и в том что вся обработка находится в одном месте. Можно обвесить всё тестами, вместо набора скриптов и sql-запросов это становится похоже на проект который можно поддерживать.
package ivi.ru.groot;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.Properties;
public class Test {
public static void main(String[] args) throws Exception {
// Инициализация окружения flink-приложения
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Настройки для консюмера kafka
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka.host.org.ru:9092");
kafkaProps.setProperty("zookeeper.connect", "zoo.host.org.ru:9092");
kafkaProps.setProperty("group.id", "group_id_for_reading");
FlinkKafkaConsumer010 eventsConsumer =
new FlinkKafkaConsumer010<>("topic_name",
new SimpleStringSchema(),
kafkaProps);
// Вот теперь у нас есть поток данных(DataStreamApi) над которым можно делать всё что угодно
DataStream eventStreamString = env.addSource(eventsConsumer).name("Event Consumer");
// Для начала решим очень простую задачу. Отфильтруем те записи в которых есть слово Hello
eventStreamString = eventStreamString.filter(x -> x.contains("Hello"));
// А теперь оставим те которые можно преобразовать в json и прокинем далее в поток JSONObject
DataStream jsonEventStream = eventStreamString.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
try{
//прокидываем в поток записи которые можно преобразовать в json
out.collect(new JSONObject(value));
}
catch (JSONException e){}
}
});
// выведем в stout эти json-объекты.
jsonEventStream.print();
// Запустим наш граф
env.execute();
}
}
Про то как быстро создать maven-проект c зависимостями flink и небольшими примерами.
Вот здесь подробное описание DataStream API, с помощью которого можно производить практически любые преобразования с потоком данных.
Кластер flink можно запустить в yarn, mesos или при помощи отдельных (встроенных в пакет flink) task- и job-manager’s.
Кроме очевидной задачи складирования данных в нужном формате, мы с помощью Flink написали код, решающий следующие задачи:
- Генерация сессий для событий. Сессия становится единой для всех событий одного user_id. Вне зависимости какой был источник сообщения.
- Проставляем гео-информацию для каждого события (город, область, страну, широту и долготу).
- Вычисляем «продуктовые воронки». Наши аналитики описывают определенную последовательность событий. Мы ищем для пользователя внутри одной клиентской сессии эту последовательность и маркируем попавшие в воронку события.
- Комбинация данных из разных источников. Чтобы не делать лишние join«ы — можно заранее понять, что столбец из таблицы A может понадобиться в будущем в таблице B. Можно сделать это на этапе процессинга.
Для быстрой работы всей этой машинерии пришлось сделать пару нехитрых приёмов:
- Все данные партиционируем по user_id на этапе заливки в kafka.
- Используем redis как state-хранилище. Redis — это просто, надёжно и супер быстро, когда мы говорим про key-value хранилище.
- Избавится от всех оконных функций. Нет всем задержкам!
ClickHouse
Clickhouse выглядел на момент проектирования просто идеальным вариантом для наших задач хранения и аналитических расчётов. Колоночное хранилище со сжатием (по структуре похожее на parquet), распределенная обработка запросов, шардирование, репликация, семплирование запросов, вложенные таблицы, внешние словари из mysql и любого ODBC подключения, дедупликация данных (хоть и отложенная) и многие другие плюшки…
Мы начали тестировать ClickHouse уже через неделю после релиза и сказать что всё сразу было радужно — это соврать.
Нет вменяемой схемы распределения прав доступа. Пользователи заводятся через xml файл. Нельзя настроить пользователю readOnly доступ на одну базу и полный доступ до другой базы. Либо полный, либо только чтение.
Нет нормального join. Если правая часть от join не помещается в память — извини. Нет оконных функций. Но мы решили это построив в Flink механизм «воронок», который позволяет отслеживать последовательности событий и помечать их. Минус наших «воронок», что мы не можем их смотреть задним числом до добавления аналитиком. Или нужно репроцессить данные.
Долгое время не было нормального ODBC-драйвера. Это огромный барьер для того чтобы внедрять базу, ибо многие BI (tableu в частности) имеют именно этот интерфейс. Сейчас с этим проблем нет.
Побывав на последней конференции по CH (12 декабря 2017 года), разработчики базы обнадежили меня. Большинство из тех проблем которые меня волнуют должны быть решены в первом квартале 2018 года.
Многие ругают ClickHouse за синтаксис, но мне он нравится. Как выразился один мой многоуважаемый коллега, это «база данных для программистов». И в этом есть немного правды. Можно сильно упрощать запросы если использовать крутейший и уникальный функционал. Например, функции высшего порядка. Lambda-вычисления на массивах прямо в sql. Не чудо ли это??? Или то, что мне очень понравилось — комбинаторы агрегатных функций.
Данный функционал позволяет к функциям приставлять набор суффиксов (-if, -merge, -array) модифицируя работу этой функции. Крайне интересные наработки.
Наше решение на Clickhouse основывается на табличном-движке ReplicatedReplacingMergeTree.
Схема распределения данных по кластеру выглядит примерно так:
Distributed таблицы — это обёртка над локальными таблицами (ReplicatedReplacingMergeTree), в которую идут все insert и select. Эти таблицы занимаются шардированием данных при вставке. Запросы к этим таблицам будут распределёнными. Данные, по возможности, распределённо обрабатываются на удалённых серверах.
ReplicatedReplacingMergeTree — это движок который реплицирует данные и при этом, при каждом мёрже схлопывает дубликаты по определённым ключам. Ключи для дедупликации указываются при создании таблицы.
Резюме
Такая схема ETL, позволила нам иметь хранилище толерантное к дубликатам. При ошибке в коде, мы всегда можем откатить consumer offset в kafka и обработать часть данных снова, не прилагая никаких особых усилий для движения данных.