Введение в Clickhouse движок AggregatingMergeTree

9ba4dda767144a7c52ac4f139b20f6e1.png

В процессе разработки витрин данных часто возникает задача предоставления клиентам данных в агрегированном виде. Если данных в хранилище немного, то их можно агрегировать «на лету», но это плохая практика так как, чем больше будет копиться данных, тем дольше будут выполняться запросы, и тем больше Clickhouse будет съедать ресурсов. В этих случаях логично хранить данные в заранее агрегированном виде, вопрос лишь в том, как реализовать расчет данных агрегированных значений.

В интернете существуют много однотипных статей иллюстрирующих базовое использование материализованных представлений (далее — матвью) на движке AggregatingMergeTree, но если ваша задача выходит за рамки »1 нода, 1 метрика, 1 параметр агрегации» эти статьи вам мало чем помогут. Я посчитал, что моим коллегам может пригодиться своего рода гайд о том, как пользоваться данными представлениями для более сложных задач.

Гайд выполнен в виде шагов, иллюстрирующих мой путь в понимании данной концепции. Если я совершил какую-либо ошибку в процессе, и вы ее заметили, или у вас есть предложение по улучшению / дополнению данного гайда, прошу написать об этом в комментариях, уверен всем от этого будет только лучше.

В рамках моей задачи хранилище данных (далее — DWH) реализовано в виде реплицированного кластера состоящего из 3 нод, данные на ноды распределяются равномерно в соответствии с ключом сортировки таблиц. Существует исходная таблица source, которая содержит столбцы id, timecode_1, metric_data — данные представляют собой временной ряд утилизации ресурсов с гранулярностью 1 минута. Данные поступают блоками каждые 2 минуты.

Пользователи могут запрашивать данные с разной гранулярностью, поэтому наша задача — агрегировать и сохранить данные с разными периодами агрегации (2, 3, 5, 10 минут и т.д.).

Шаг 1. Матвью на 1 ноде с движком AggregatingMergeTree

Начнем с создания исходной таблицы source, в которой будут храниться данные нашего временного ряда. В таблице будут следующие столбцы:

  • id — идентификатор объекта;

  • timecode_1 — метка времени каждой точки данных с гранулярностью 1 минута;

  • metric_data — значение метрики для точки временного ряда

SQL запрос для создания source таблицы (вопросов оптимизации сейчас не касаемся):

CREATE TABLE source
( 
    id          UInt32, 
    timecode_1  DateTime, 
    metric_data Float32
) 
    ENGINE = MergeTree() 
        PARTITION BY toYYYYMM(timecode_1) 
        ORDER BY (id, timecode_1);

Теперь, когда у нас есть исходная таблица, мы можем создать матвью с движком AggregatingMergeTree, которая будет агрегировать данные по каждому объекту с 2-минутными интервалами:

CREATE MATERIALIZED VIEW source_aggregated_mv 
    ENGINE = AggregatingMergeTree() 
    PARTITION BY toYYYYMM(timecode) 
    ORDER BY (id, timecode) 
AS 
SELECT id, 
       toStartOfInterval(timecode_1, toIntervalMinute(2)) AS timecode, 
       avgState(metric_data) AS metric_data 
FROM source 
GROUP BY id, timecode;

SELECT вычисляет время начала 2-минутного интервала для каждой точки данных и агрегирует данные с помощью функции avgState (). GROUP BY группирует данные по идентификатору и 2-минутным интервалам.

Важно отметить, что поля в GROUP BY в запросе материализованного представления и в ORDER BY в определении материализованного представления должны совпадать. Это связано с тем, что механизм AggregatingMergeTree использует ORDER BY для задания первичного ключа, который используется для сортировки данных перед их агрегированием и для правильного объединения данных.

Теперь, когда мы создали исходную таблицу и материализованное представление, вставим некоторые данные в исходную таблицу, чтобы увидеть, корректно ли работает материализованное представление.

Вставим первую группу данных:

INSERT INTO source (id, timecode_1, metric_data) 
VALUES (1, '2023-03-05 00:00:00', 10), 
       (2, '2023-03-05 00:00:00', 15);

Сделаем запрос в матвью source_aggregated_mv, чтобы извлечь агрегированные данные. Помните, что мы должны использовать функцию avgMerge (), чтобы объединить ранее вычисленные состояния и сгруппировать их.

SELECT id, 
       timecode, 
       avgMerge(metric_data) AS metric_data 
FROM source_aggregated_mv 
GROUP BY id, timecode;

Он возвращает:

id

timecode

metric_data

1

2023–03–05 00:00:00

10

2

2023–03–05 00:00:00

15

Вставим вторую группу данных:

INSERT INTO source (id, timecode_1, metric_data) 
VALUES (1, '2023-03-05 00:01:00', 20), 
       (2, '2023-03-05 00:01:00', 25), 
       (1, '2023-03-05 00:02:00', 30);

Запрос возвращает:

id

timecode

metric_data

1

2023–03–05 00:00:00

15

1

2023–03–05 00:02:00

30

2

2023–03–05 00:00:00

20

Как видите, матвью успешно справилась с задачей, корректно рассчитав агрегированные значения метрики.

Шаг 2. Матвью с передачей данных в таблицу AggregatedMergeTree на 1 ноде

Итак, мы успешно создали материализованное представление с движком AggregatingMergeTree, которое агрегирует данные из нашей исходной таблицы source с 2-минутными интервалами. 

Забегу немного в будущее. Напомню, наша задача — агрегировать данные по нескольким временным интервалам. В обычной вьюхе эту задачу можно было бы решить с помощью оператора UNION ALL, но матвью в Clickhouse не поддерживает данный оператор. Будем решать задачу с помощью набора матвьюх, каждая из которых будут агрегировать данные со своим интервалом, а результат будем складывать в одну таблицу. Об этом подробно поговорим на шаге 3, а пока разберемся как сохранить агрегированные данные 1 матвью в таблицу.

Создадим целевую таблицу target с тем же движком. Мы также настроим процесс, который будет автоматически передавать данные из source в target с помощью матвью source_aggregated_mv.  Обратите внимание, что операторы PARTITION BY и ORDER BY, используемые для целевой таблицы, идентичны операторам в матвью source_aggregated_mv из 1 шага. 

Также будьте внимательны при задании типа данных, агрегационная функция и тип обрабатываемых данных должны быть заданы корректно, в обратном случае процесс работать не будет. Например, вы хотите получить последнее значение в столбце типа Decimal (9, 2), но в целевой таблице указали тип столбца AggregateFunction (argMax, UInt32, DateTime), тогда вы получите ошибку:

Code: 70. DB: Exception: Conversion from AggregateFunction (argMax, Decimal (9, 2), DateTime) to AggregateFunction (argMax, UInt32, DateTime) is not supported: while converting source column metric_data to destination column metric_data (CANNOT_CONVERT_TYPE)

Рекомендую перед созданием таблицы и матвью запустить SELECT на ограниченном объеме данных, чтобы выяснить типы. 

SQL для создания target таблицы и матвью:

CREATE TABLE target
( 
    id                 UInt32, 
    timecode     DateTime, 
    metric_data AggregateFunction(avg, Float32)
) 
    ENGINE = AggregatingMergeTree() 
        PARTITION BY toYYYYMM(timecode) 
        ORDER BY (id, timecode); 

CREATE MATERIALIZED VIEW source_aggregated_mv TO target
AS 
SELECT id, 
       toStartOfInterval(timecode_1, toIntervalMinute(2)) AS timecode, 
       avgState(metric_data) AS metric_data 
FROM source 
GROUP BY id, timecode;

Протестируем ETL процесс, вставив некоторые данные в source таблицу и проверив, правильно ли агрегируются и передаются данные в target таблицу. Вставим тестовые данные теми же блоками, как и в предыдущей задаче:

INSERT INTO source (id, timecode_1, metric_data)  
VALUES (1, '2023-03-05 00:00:00', 10),  
       (2, '2023-03-05 00:00:00', 15);

Запросим данные из целевой таблицы:

SELECT id, 
       timecode, 
       avgMerge(metric_data) AS metric_data 
FROM target
GROUP BY id, timecode;

Запрос вернет:

id

timecode

metric_data

1

2023–03–05 00:00:00

10

2

2023–03–05 00:00:00

15

Вставим в таблицу source вторую пачку данных:

INSERT INTO source (id, timecode_1, metric_data) 
VALUES (1, '2023-03-05 00:01:00', 20), 
       (2, '2023-03-05 00:01:00', 25), 
       (1, '2023-03-05 00:02:00', 30);

Запрос в target вернет:

id

timecode

metric_data

1

2023–03–05 00:00:00

15

1

2023–03–05 00:02:00

30

2

2023–03–05 00:00:00

20

Как мы видим, данные были правильно агрегированы и переданы в целевую таблицу с помощью материализованного представления.

Шаг 3. Агрегируем данные с разными периодами

Как я сказал ранее, пользователи хотят запрашивать данные, агрегированные на разных временных интервалах. Нам надо эти агрегированные данные заранее подготовить, и эту задачу мы будем решать с помощью нескольких матвьюх, передающих результаты в одну таблицу.

Таким образом, следующий пункт — разработать процесс, который агрегирует данные с различными временными интервалами и сохраняет результат в целевой таблице.

Первый шаг — добавить столбец timespan в запрос CREATE для target таблицы:

CREATE TABLE target
( 
    id                 UInt32, 
    timespan     UInt32, 
    timecode     DateTime, 
    metric_data AggregateFunction(avg, Float32)
) 
    ENGINE = AggregatingMergeTree() 
    PARTITION BY toYYYYMM(timecode) 
    ORDER BY (id, timespan, timecode); 

Следующим шагом является создание материализованных представлений с функциями AggregatingMergeTree для агрегирования данных. Матвью source_aggregated_2_mv агрегирует данные с интервалом в 2 минуты, source_aggregated_3_mv делает то же самое, но с интервалом в 3 минуты:

CREATE MATERIALIZED VIEW source_aggregated_2_mv TO target
AS
SELECT id,
       2 AS timespan,
       toStartOfInterval(timecode, INTERVAL 2 MINUTE) AS timecode,
       avgState(metric_data) AS metric_data
FROM source
GROUP BY id,
         timespan,
         toStartOfInterval(timecode, INTERVAL 2 MINUTE);

CREATE MATERIALIZED VIEW source_aggregated_3_mv TO target
AS
SELECT id,
       3 AS timespan,
       toStartOfInterval(timecode, INTERVAL 3 MINUTE) AS timecode,
       avgState(metric_data) AS metric_data
FROM source
GROUP BY id,
         timespan,
         toStartOfInterval(timecode, INTERVAL 3 MINUTE);

Как видите, материализованные представления в основном такие же, как и раньше, но с некоторыми изменениями:

  • Мы добавили столбец timespan после столбца id. Значение этого столбца равно 2 для первой матвьюхи и 3 для второй, и это постоянное значение, которое не рассчитывается на основе данных.

  • Мы также добавили временной интервал в качестве группирующего столбца в предложении GROUP BY, чтобы он был включен в группировку вместе с идентификатором и временным кодом.

Проверим работоспособность. Вставим данные:

INSERT INTO source (id, timecode_1, metric_data)  
VALUES (1, '2023-03-05 00:00:00', 10),  
       (2, '2023-03-05 00:00:00', 15);

Затем мы обратимся к target таблице, используя следующий запрос:

SELECT id, 
       timespan,
       timecode, 
       avgMerge(metric_data) AS metric_data 
FROM target
GROUP BY id, timespan, timecode;

Получили следующее:

id

timespan

timecode

metric_data

1

2

2023–03–05 00:00:00

10

1

3

2023–03–05 00:00:00

10

2

2

2023–03–05 00:00:00

15

2

3

2023–03–05 00:00:00

15

Вставим второй набор данных в source таблицу:

INSERT INTO source (id, timecode_1, metric_data) 
VALUES (1, '2023-03-05 00:01:00', 20), 
       (2, '2023-03-05 00:01:00', 25), 
       (1, '2023-03-05 00:02:00', 30);

Запустим тот же запрос и получим:

id

timespan

timecode

metric_data

1

2

2023–03–05 00:00:00

15

1

2

2023–03–05 00:02:00

30

1

3

2023–03–05 00:00:00

20

2

2

2023–03–05 00:00:00

20

2

3

2023–03–05 00:00:00

20

Как видите, данные были точно агрегированы с различными интервалами и перемещены в target таблицу.

Шаг 4. Масштабируем решение на кластер

Мы разобрались как создавать инфраструктуру, которая агрегирует данные на разных периодах и складывает их в одну таблицу. Теперь пользователь может выбирать гранулярность данных, запрашивать их и очень быстро получать ответ. Никакой агрегации в запросе, всё «летает». Но все действия мы производили в рамках одной ноды. Теперь же наша задача масштабировать наше решение на реплицированный кластер, состоящий из 3 нод.

Не буду показывать как создавать источник данных, лишь уточню, что теперь он состоит из таблицы source с движком Distributed и таблицы source_shard с движком ReplicatedMergeTree. 

Целевая таблица теперь также состоит из двух, покажем создание этих таблиц:

CREATE TABLE target ON CLUSTER ‘cluster’
( 
    id                 UInt32, 
    timespan     UInt32, 
    timecode     DateTime, 
    metric_data AggregateFunction(avg, Float32)
) 
    ENGINE = Distributed ('cluster', ‘target_shard’, id); 

CREATE TABLE target_shard ON CLUSTER ‘cluster’
( 
    id                 UInt32, 
    timespan     UInt32, 
    timecode     DateTime, 
    metric_data AggregateFunction(avg, Float32)
) 
    ENGINE = ReplicatedAggregatingMergeTree(/clickhouse/tables‘/{shard}/target ’,  ‘{replica}’) 
        PARTITION BY toYYYYMM(timecode) 
        ORDER BY (id, timespan, timecode); 

Последний шаг — создать матвьюхи, которые будут агрегировать данные с разными интервалами. Важно заметить, первая таблица, к которой обращается матвью должна быть локальной. В документации написано, что матвью может обращаться и к локальной, и к распределенной таблице, но моя практика показала, что если обратиться к Distributed таблице, матвью не обработает данные. Плюс разработчики clickhouse рекомендуют при работе с матвью пользоваться схемой «from local to local», так как это эффективнее, снижает нагрузку на сеть, а значит быстрее. Вставлять данные из матвью также желательно в локальную таблицу на той же ноде, причину описал выше.

CREATE MATERIALIZED VIEW source_aggregated_2_mv ON CLUSTER ‘cluster’ TO target_shard
AS
SELECT id,
       2 AS timespan,
       toStartOfInterval(timecode, INTERVAL 2 MINUTE) AS timecode,
       avgState(metric_data) AS metric_data
FROM source_shard
GROUP BY id,
         timespan,
         toStartOfInterval(timecode, INTERVAL 2 MINUTE);

CREATE MATERIALIZED VIEW source_aggregated_3_mv ON CLUSTER ‘cluster’ TO target_shard
AS
SELECT id,
       3 AS timespan,
       toStartOfInterval(timecode, INTERVAL 3 MINUTE) AS timecode,
       avgState(metric_data) AS metric_data
FROM source_shard
GROUP BY id,
         timespan,
         toStartOfInterval(timecode, INTERVAL 3 MINUTE);

Проверим работоспособность. Вставим данные в распределенную таблицу:

INSERT INTO source (id, timecode_1, metric_data) 
VALUES (1, '2023-03-05 00:00:00', 10),
       (2, '2023-03-05 00:00:00', 15);

Затем мы обратимся к распределенной target таблице, используя следующий запрос:

SELECT id, 
       timespan,
       timecode, 
       avgMerge(metric_data) AS metric_data 
FROM target
GROUP BY id, timespan, timecode;

Получили следующее:

id

timespan

timecode

metric_data

1

2

2023–03–05 00:00:00

10

1

3

2023–03–05 00:00:00

10

2

2

2023–03–05 00:00:00

15

2

3

2023–03–05 00:00:00

15

Вставим второй набор данных в source таблицу:

INSERT INTO source (id, timecode_1, metric_data) 
VALUES (1, '2023-03-05 00:01:00', 20), 
       (2, '2023-03-05 00:01:00', 25), 
       (1, '2023-03-05 00:02:00', 30);

Запустим тот же запрос и получим:

id

timespan

timecode

metric_data

1

2

2023–03–05 00:00:00

15

1

2

2023–03–05 00:02:00

30

1

3

2023–03–05 00:00:00

20

2

2

2023–03–05 00:00:00

20

2

3

2023–03–05 00:00:00

20

Как видите, данные были точно агрегированы с различными интервалами и перемещены в target таблицу.

Подведем итог:

Теперь наши данные хранятся в заранее агрегированном виде, распределены между 3 нодами, реплицированы; при запросе пользователем данных с определенной гранулярностью Clickhouse не придется ничего агрегировать с нуля, а значит данные вернутся быстрее. Также данный подход позволит «размазать» нагрузку на DWH, которая бы возникла при обычной агрегации, во времени. Как видите, выигрывает и клиент, и DWH. 

PS:

Помните, AggregatingMergeTree — это не серебряная пуля в решении задачи агрегации и хранения агрегированных данных. Данные хранятся в предагрегированном виде, и при запросе их все равно придется преобразовывать. Поэтому, если вы можете организовать агрегацию и хранение данных вне таблиц с AggregatingMergeTree, делайте это.

© Habrahabr.ru