Как мы в ivi переписывали etl

Год назад мы решили переделать схему сбора данных в приложении и данных о действиях клиентов. Старая система работала исправно, но с каждым разом было всё сложнее и опаснее вносить туда изменения.

pwne-zjsv77sp-sb6w5e-7q9xws.jpeg

В этой статье я расскажу какие технологии мы стали использовать для сбора и агрегации данных в новом проекте.

И вот почему
image
Так выглядела наша старая схема движения данных.

Множество данных от наших микросервисов, переливались скриптами в Hive.

Flume грузил клиентские данные из Kafka в ещё несколько таблиц, плюсом Flume грузил информацию о просмотрах из файловой системы одного из сервисов. Кроме этого были десятки скриптов в cron и oozie.

В какой-то момент мы поняли, что так жить нельзя. Такую систему загрузки данных практически невозможно тестировать. Каждая выгрузка сопровождается молитвами. Каждый новый тикет на доработку — тихим скрежетом сердца и зубов. Сделать так, чтобы система была полностью толерантна к падениям какого-либо её компонента стало очень сложно.


Подумав о том каким мы хотим видеть новый ETL и примерив технологии и молитвы мы получили следующую схему:

image

  • Все данные поступают по http. От всех сервисов. Данные в json.
  • Храним сырые (не обработанные) данные в kafka 5 дней. Кроме ETL, данные из kafka также используют и другие backend-сервисы.
  • Вся логика обработки данных находится в одном месте. Для нас это стал java-код для фреймворка Apache Flink. Про этот чудо-фреймворк чуть позже.
  • Для хранения промежуточных рассчётов используем redis. У Flink есть своё state-хранилище, оно толератно к падениям и делает чекпоинты, но его проблема в том, что из него нельзя восстановится при изменении кода.
  • Складируем всё в Clickhouse. Подключаем внешними словарями все таблицы, данные из которых микросервисы не отправляют нам событиями по http.


Если про самописный http-сервис складирующий данные в kafka и про сам сервис kafka писать нет смысла, то вот про то как мы используем Flink и ClickHouse я хочу остановится подробнее.

Apache Flink


Apache Flink — это платформа обработки потоков с открытым исходным кодом для распределенных приложений с высокой степенью отказоустойчивости и толерантностью к падениям.

Когда данные для анализа нужны быстрее и необходима быстрая агрегация большого потока данных для оперативной реакции на определенные события — стандартный, батчевый подход к ETL уже не работает. Тут-то нам и поможет streaming-processing.

Прелесть такого подхода не только в быстроте доставки данных, но и в том что вся обработка находится в одном месте. Можно обвесить всё тестами, вместо набора скриптов и sql-запросов это становится похоже на проект который можно поддерживать.

Рассмотрим простейший пример обработки потока из kafka на базе Apache Flink
package ivi.ru.groot;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.Properties;
public class Test {
    public static void main(String[] args) throws Exception {
        // Инициализация окружения flink-приложения
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Настройки для консюмера kafka
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "kafka.host.org.ru:9092");
        kafkaProps.setProperty("zookeeper.connect", "zoo.host.org.ru:9092");
        kafkaProps.setProperty("group.id", "group_id_for_reading");
        FlinkKafkaConsumer010 eventsConsumer =
                new FlinkKafkaConsumer010<>("topic_name",
                        new SimpleStringSchema(),
                        kafkaProps);
        // Вот теперь у нас есть поток данных(DataStreamApi) над которым можно делать всё что угодно
        DataStream eventStreamString = env.addSource(eventsConsumer).name("Event Consumer");
        // Для начала решим очень простую задачу. Отфильтруем те записи в которых есть слово Hello
        eventStreamString = eventStreamString.filter(x -> x.contains("Hello"));
        // А теперь оставим те которые можно преобразовать в json и прокинем далее в поток JSONObject
        DataStream jsonEventStream = eventStreamString.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                try{
                    //прокидываем в поток записи которые можно преобразовать в json
                    out.collect(new JSONObject(value));
                }
                catch (JSONException e){}
            }
        });
        // выведем в stout эти json-объекты.
        jsonEventStream.print();
        // Запустим наш граф
        env.execute();
    }
}

Про то как быстро создать maven-проект c зависимостями flink и небольшими примерами.

Вот здесь подробное описание DataStream API, с помощью которого можно производить практически любые преобразования с потоком данных.


Кластер flink можно запустить в yarn, mesos или при помощи отдельных (встроенных в пакет flink) task- и job-manager’s.

У Flink потрясающий web-интерфейс
Web-интерфейс позволяет посмотреть сколько данных на какой части графа обработано, сколько обработал конкретный worker и сколько обработал отдельный subtask отдельного worker«а. В web-интерфейс можно вывести метрики, можно определить какой участок кода тормозит при помощи механизма back-pressure. Back-pressure определяет какой процент данных не успел просочится через участок графа. Пример графа для нашего ETL:

image


Кроме очевидной задачи складирования данных в нужном формате, мы с помощью Flink написали код, решающий следующие задачи:

  • Генерация сессий для событий. Сессия становится единой для всех событий одного user_id. Вне зависимости какой был источник сообщения.
  • Проставляем гео-информацию для каждого события (город, область, страну, широту и долготу).
  • Вычисляем «продуктовые воронки». Наши аналитики описывают определенную последовательность событий. Мы ищем для пользователя внутри одной клиентской сессии эту последовательность и маркируем попавшие в воронку события.
  • Комбинация данных из разных источников. Чтобы не делать лишние join«ы — можно заранее понять, что столбец из таблицы A может понадобиться в будущем в таблице B. Можно сделать это на этапе процессинга.


Для быстрой работы всей этой машинерии пришлось сделать пару нехитрых приёмов:

  • Все данные партиционируем по user_id на этапе заливки в kafka.
  • Используем redis как state-хранилище. Redis — это просто, надёжно и супер быстро, когда мы говорим про key-value хранилище.
  • Избавится от всех оконных функций. Нет всем задержкам!


ClickHouse


Clickhouse выглядел на момент проектирования просто идеальным вариантом для наших задач хранения и аналитических расчётов. Колоночное хранилище со сжатием (по структуре похожее на parquet), распределенная обработка запросов, шардирование, репликация, семплирование запросов, вложенные таблицы, внешние словари из mysql и любого ODBC подключения, дедупликация данных (хоть и отложенная) и многие другие плюшки…

Мы начали тестировать ClickHouse уже через неделю после релиза и сказать что всё сразу было радужно — это соврать.

Нет вменяемой схемы распределения прав доступа. Пользователи заводятся через xml файл. Нельзя настроить пользователю readOnly доступ на одну базу и полный доступ до другой базы. Либо полный, либо только чтение.

Нет нормального join. Если правая часть от join не помещается в память — извини. Нет оконных функций. Но мы решили это построив в Flink механизм «воронок», который позволяет отслеживать последовательности событий и помечать их. Минус наших «воронок», что мы не можем их смотреть задним числом до добавления аналитиком. Или нужно репроцессить данные.

Долгое время не было нормального ODBC-драйвера. Это огромный барьер для того чтобы внедрять базу, ибо многие BI (tableu в частности) имеют именно этот интерфейс. Сейчас с этим проблем нет.

Побывав на последней конференции по CH (12 декабря 2017 года), разработчики базы обнадежили меня. Большинство из тех проблем которые меня волнуют должны быть решены в первом квартале 2018 года.

Многие ругают ClickHouse за синтаксис, но мне он нравится. Как выразился один мой многоуважаемый коллега, это «база данных для программистов». И в этом есть немного правды. Можно сильно упрощать запросы если использовать крутейший и уникальный функционал. Например, функции высшего порядка. Lambda-вычисления на массивах прямо в sql. Не чудо ли это??? Или то, что мне очень понравилось — комбинаторы агрегатных функций.

Данный функционал позволяет к функциям приставлять набор суффиксов (-if, -merge, -array) модифицируя работу этой функции. Крайне интересные наработки.

Наше решение на Clickhouse основывается на табличном-движке ReplicatedReplacingMergeTree.
Схема распределения данных по кластеру выглядит примерно так:

image


Distributed таблицы — это обёртка над локальными таблицами (ReplicatedReplacingMergeTree), в которую идут все insert и select. Эти таблицы занимаются шардированием данных при вставке. Запросы к этим таблицам будут распределёнными. Данные, по возможности, распределённо обрабатываются на удалённых серверах.

ReplicatedReplacingMergeTree — это движок который реплицирует данные и при этом, при каждом мёрже схлопывает дубликаты по определённым ключам. Ключи для дедупликации указываются при создании таблицы.

Резюме


Такая схема ETL, позволила нам иметь хранилище толерантное к дубликатам. При ошибке в коде, мы всегда можем откатить consumer offset в kafka и обработать часть данных снова, не прилагая никаких особых усилий для движения данных.

© Habrahabr.ru