Как работать со словарями данных и оптимизировать запросы в ClickHouse

527d34ec1527f043e8f93a3fe1ff9396.png

Приветствуем! На связи вновь Глеб Кононенко и Алексей Диков — разработчики из Лиги Цифровой Экономики. Ранее мы уже немного рассказывали про наш опыт работы с распределенными таблицами в ClickHouse в этой статье.

Сегодня хотим поделиться опытом оптимизации запросов и работы со словарями данных. Используемая версия ClickHouse: 23.8.7.24

Напомним характеристики нашего проекта:

  • Данные грузятся каждые 15 минут

  • Постоянно приходит дублирующая информация

  • Необходимо хранить данные в течение 5 лет

  • В среднем в сутки приходит 150 млн строк (пик — до 13 млрд/сут)

  • В базе 1266 млрд строк, в сжатом виде 61 Тб, в несжатом — 585 Тб

Оглавление

  • Создание тестовых таблиц

  • Джойн. Первые шаги, важность порядка

  • Настройки для запросов

  • Пример оптимизации в ClickHouse

  • Работа со словарями

Создание тестовых таблиц

Создадим две таблицы на движке MergeTree. Первая таблица — факты, в которой поля типа String будем сразу по умолчанию наполнять строками разной длины:

create table fct_contract(
    org_id UUID,
    contract_num String
   	DEFAULT randomPrintableASCII(randUniform(10, 15)),
    name String
   	DEFAULT randomPrintableASCII(randUniform(15, 30)),
    value UInt32,
    create_date Date
 )
 ENGINE = MergeTree
 ORDER BY (org_id,contract_num);

Вторая таблица — справочник, поле ИНН генерируется случайным образом в пределах 10 тысяч значений:

create table dim_name(
    org_id UUID,
    name String,
    inn UInt64 DEFAULT randUniform(100000000000, 100000010000)
 )
 ENGINE MergeTree
 ORDER BY (org_id, name);

Вставим в первую таблицу 15 млн записей:

insert into fct_contract (org_id, value, create_date)
 SELECT
	UUIDNumToString(murmurHash3_128(toString(rand()%10000))) as org_id,
 	value,
 	now()::date - randUniform(1, 10000)
 FROM generateRandom('value UInt32')
LIMIT 15000000;

Джойн. Первые шаги, важность порядка

Пока у нас есть одна наполненная и одна пустая таблицы. Даже в этом случае операция джойн может не выполниться из-за нехватки памяти, если мы неправильно выберем порядок соединения таблиц.

Попробуем. Для этого ограничим память для выполнения запросов в ~1 гигабайт:

select org_id, contract_num, name, value, create_date, inn
 from dim_name
 join fct_contract
    on fct_contract.org_id = dim_name.org_id
   and fct_contract.name = dim_name.name
 settings max_memory_usage = 1000000000;

Памяти не хватает, и мы получаем ошибку:

Received exception from server (version 23.8.7):
 Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 955.75 MiB (attempt to allocate chunk of 4244058 bytes), maximum: 953.67 MiB.: (while reading column name): (while reading from part /var/lib/clickhouse/store/d30/d30dbd5e-bad0-4bb1-859c-7f26f955c2cc/all_186_186_0/ in table default.fct_contract (d30dbd5e-bad0-4bb1-859c-7f26f955c2cc) located on disk default of type local, from mark 62 with max_rows_to_read = 8192): While executing MergeTreeThread. (MEMORY_LIMIT_EXCEEDED)

А теперь поменяем таблицы в запросе местами и запустим снова:

select org_id, contract_num, name, value, create_date, inn
 from fct_contract
 join dim_name
    on fct_contract.org_id = dim_name.org_id
   and fct_contract.name = dim_name.name
 settings max_memory_usage = 1000000000;
0 rows in set. Elapsed: 0.011 sec. Processed 262.14 thousand rows, 19.39 MB (23.49 million rows/s., 1.74 GB/s.)
 Peak memory usage: 24.86 MiB.

Запрос отрабатывает, ничего не возвращая, потому что справочник в настоящий момент не заполнен. Дело в том, что ClickHouse при операции соединения в первую очередь загружает в оперативную память правую таблицу из запроса.

Наполним наш справочник:

insert into dim_name (org_id, name)
 select org_id, name from fct_contract;

Попробуем опять выполнить запрос:

select org_id, contract_num, name, value, create_date, inn
 from fct_contract
 join dim_name
    on fct_contract.org_id = dim_name.org_id
   and fct_contract.name = dim_name.name
 settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
 Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 1.38 GiB (attempt to allocate chunk of 1342177280 bytes), maximum: 953.67 MiB.: While executing FillingRightJoinSide. (MEMORY_LIMIT_EXCEEDED)

Теперь и ему не хватает памяти… Попробуем это исправить.

Настройки для запросов

У ClickHouse есть разные алгоритмы соединения таблиц. По умолчанию используется hash-алгоритм, который записывает хеш-значения для соединяемых полей правой таблицы в оперативную память. Затем он ищет среди хешей совпадения для данных из левой таблицы. Такой процесс достаточно эффективен по скорости выполнения, но не очень — в плане потребления ресурсов.

Медленный, но менее ресурсозатратный алгоритм — partial_merge. При его использовании сначала сортируются ключи правой таблицы и затем поблочно (partial_merge_join_rows_in_right_blocks) записываются в оперативную память вместе с индексами каждого блока, до тех пор пока корзина (выделенный на запрос объем оперативной памяти) не заполнится. Затем данные сбрасываются на диск. Далее такая же операция происходит с левой таблицей, и уже затем данные сопоставляются с использование min-max индексов.

Подробнее про алгоритмы вы можете прочитать в этой статье.

Включим этот алгоритм:

select org_id, contract_num, name, value, create_date, inn
 from fct_contract
 join dim_name
 	on fct_contract.org_id = dim_name.org_id
    and fct_contract.name = dim_name.name
 settings max_memory_usage = 1000000000,
 	join_algorithm = 'partial_merge',
 	default_max_bytes_in_join= 500000000;
15000000 rows in set. Elapsed: 22.969 sec. Processed 30.00 million rows, 1.94 GB (1.31 million rows/s., 84.24 MB/s.)
 Peak memory usage: 927.08 MiB.

Отлично! Наш запрос отработан за 23 секунды.

На какие еще настройки нужно обратить внимание?

Рассмотрим ситуацию, в которой нам надо упорядочить таблицу fct_contract не по заданной при создании таблицы сортировке, а по произвольному полю:

select org_id, contract_num, name, value, create_date
 from fct_contract
 order by name
 settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
 Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 954.38 MiB (attempt to allocate chunk of 4614592 bytes), maximum: 953.67 MiB.: (avg_value_size_hint = 31.0965576171875, avg_chars_size = 27.715869140625, limit = 16384): (while reading column name): (while reading from part /var/lib/clickhouse/store/d30/d30dbd5e-bad0-4bb1-859c-7f26f955c2cc/all_186_186_0/ in table default.fct_contract (d30dbd5e-bad0-4bb1-859c-7f26f955c2cc) located on disk default of type local, from mark 86 with max_rows_to_read = 16384): While executing MergeTreeThread. (MEMORY_LIMIT_EXCEEDED)

Скрипт не отрабатывает. Чтобы это исправить, добавим настройку max_bytes_before_external_sort. Она будет сбрасывать на диск отсортированные данные при достижении указанного значения. Выделим на сортировку 500 Мб. Вот только этого может не хватить, чтобы избежать ошибки с недостатком памяти. Дело в том, что ClickHouse умеет запускать несколько потоков для выполнения запроса, и для каждого будет работать наше ограничение в 500 Мб. 

Чтобы этого избежать, требуется ограничить количество потоков. Это можно сделать с помощью настройки max_threads:

select org_id, contract_num, name, value, create_date
 from fct_contract
 order by name
 settings max_memory_usage = 1000000000,
 max_bytes_before_external_sort = 500000000,
 max_threads = 1;
15000000 rows in set. Elapsed: 19.259 sec. Processed 15.28 million rows, 1.13 GB (793.45 thousand rows/s., 58.72 MB/s.)
 Peak memory usage: 539.32 MiB.

Готово!

А теперь мы захотели сгруппировать таблицу по полю, не входящему в значения «order by». По логике архитектуры в ClickHouse у нас не должно быть таких запросов, они явно сигнализируют о том, что мы неправильно задали ключ сортировки. Но будем исходить из того, что это разовая история.

select name, sum(value)
 from fct_contract
 group by name
 settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
 Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 958.75 MiB (attempt to allocate chunk of 0 bytes), maximum: 953.67 MiB.: While executing AggregatingTransform. (MEMORY_LIMIT_EXCEEDED)

Добавим настройку для группировки, которая будет также сбрасывать данные на диск после достижения выделенного объема. Теперь отсортируем результат и все сделаем в одном потоке.

select name, sum(value)
 from fct_contract
 group by name
 order by sum(value) desc
 settings max_memory_usage = 1000000000,
 max_bytes_before_external_sort = 500000000,
 max_bytes_before_external_group_by = 500000000,
 max_threads = 1;
15000000 rows in set. Elapsed: 14.964 sec. Processed 35.51 million rows, 1.44 GB (2.37 million rows/s., 96.55 MB/s.)
 Peak memory usage: 523.50 MiB.

Следующая задача: нам надо соединить таблицы и просуммировать значения.

select inn, sum(value)
 from fct_contract
 join dim_name
 	on fct_contract.org_id = dim_name.org_id
    and fct_contract.name = dim_name.name
 group by inn
 settings max_memory_usage = 1000000000,
 	join_algorithm = 'partial_merge',
 	default_max_bytes_in_join= 500000000,
 	max_bytes_before_external_group_by= 500000000,
	max_threads = 1;
Received exception from server (version 23.8.7):
 Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 955.32 MiB (attempt to allocate chunk of 5077241 bytes), maximum: 953.67 MiB.: While executing SourceFromNativeStream. (MEMORY_LIMIT_EXCEEDED)

Мы снова получаем ошибку из-за ограничений памяти. На текущий момент при соединениях настройки сортировки и группировки не отрабатывают должным образом.

Пример оптимизации в ClickHouse

Мы решили проанализировать наш предыдущий скрипт и пришли к выводу, что нам достаточно суммировать значения по конкретному org_id. Давайте попробуем запустить скрипт без каких-либо дополнительных настроек, предварительно выбрав любой org_id из сгенерированных в нашей таблице:

select inn, sum(value)
 from fct_contract as contract
 join dim_name
 	on contract.org_id = dim_name.org_id
    and contract.name = dim_name.name
 where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
 group by inn
 settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
 Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 1.38 GiB (attempt to allocate chunk of 1342177280 bytes), maximum: 953.67 MiB.: While executing FillingRightJoinSide. (MEMORY_LIMIT_EXCEEDED)

Вполне ожидаемо: наш запрос не помещается в оперативную память. Добавим настройки:

select inn, sum(value)
 from fct_contract as contract
 join dim_name
 	on contract.org_id = dim_name.org_id
    and contract.name = dim_name.name
 where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
 group by inn
 settings max_memory_usage = 1000000000,
 join_algorithm = 'partial_merge',
 default_max_bytes_in_join= 500000000;
1376 rows in set. Elapsed: 5.245 sec. Processed 15.05 million rows, 827.51 MB (2.87 million rows/s., 157.77 MB/s.)
 Peak memory usage: 857.55 MiB.

Запрос выполнился, но насколько оптимально?

При соединении таблиц в ClickHouse крайне важна предварительная выборка данных. Давайте из обеих таблиц отфильтруем данные в подзапросах до джойна и посмотрим, что получится:

select inn, sum(value)
 from (
 	select org_id, name, value
 	from fct_contract
 	where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
 ) as contract
 join (
 	select org_id, name, inn
 	from dim_name
 	where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
 ) as dim
    on contract.org_id = dim.org_id
   and contract.name = dim.name
 group by inn
 settings max_memory_usage = 1000000000;
1376 rows in set. Elapsed: 0.010 sec. Processed 90.11 thousand rows, 1.55 MB (9.43 million rows/s., 162.36 MB/s.)
 Peak memory usage: 10.35 MiB.

Мы видим, что по итогам предварительного отбора данных вместо чтения 15 миллионов строк было прочитано 90 тысяч, пик потребления памяти уменьшился более чем в 80 раз. Скорость выполнения выросла в сотни раз.

Работа со словарями

Но если мы не можем отобрать данные из таблиц и нам целиком надо их соединить и агрегировать?

Есть в ClickHouse такая сущность, как словарь. Словари бывают внутренние и внешние (хранят данные из внешних источников). Они могут быть использованы для оптимизации запросов как альтернативы операциям соединения (join-ам). Словари хранят данные в парадигме ключ-значение частично или полностью в оперативной памяти. Ключ для значения указывается в секции Primary Key, перечисление атрибутов в теле запроса обязательно должно начинаться с ключа.

Неверно:

CREATE DICTIONARY dic_name (name String, id UInt64) PRIMARY KEY id..

Верно:

CREATE DICTIONARY dic_name (id UInt64, name String) PRIMARY KEY id...

Кроме того, при создании словаря нам необходимо указывать следующие секции:

  • Source — источник, которым может быть как таблица внутри самого ClickHouse, так и информация из внешних источников (другая СУБД, внешний файл или http-источник).

  • Lifetime — периодичность автоматического обновления словаря из источника.

  • Layout — тип размещения словаря в памяти.

Создадим на основе нашей таблицы-справочника словарь. В качестве типа размещения выберем complex_key_hashed_array, так как соединение словаря с таблицей будет происходить по нескольким полям. Такой тип более экономичен по потреблению памяти по сравнению с рекомендуемым в документации complex_key_hashed.

Подробнее про типы хранения словарей в памяти можно почитать в официальной документации.

CREATE DICTIONARY dic_dim_name
 (
   org_id UUID,
   name String,
   inn UInt64
 )
 PRIMARY KEY org_id, name
 SOURCE(CLICKHOUSE(DB 'default' TABLE 'dim_name'))
LIFETIME(0)
 LAYOUT(complex_key_hashed_array());

Запустим запрос словарю, чтобы он прогрузился, и проверим объем занимаемой оперативной памяти:

select *
 from dic_dim_name;

 select name, formatReadableSize(bytes_allocated)
 from system.dictionaries
 where name = 'dic_dim_name';
┌─name─────────┬─formatReadableSize(bytes_allocated)─┐
│ dic_dim_name │ 910.30 MiB                          │
└──────────────┴─────────────────────────────────────┘

В нашем случае словарь занял 910.30 MiB. Теперь попробуем сделать нашу агрегацию, но вместо операции соединения мы извлекаем значения из словаря с помощью функции dictGet. В ней указываем наименование словаря, нужные поля и ключ соединения. Если ключ соединения составной, как в нашем случае, его необходимо собрать в кортеж, обернув в функцию tuple ().

select
    dictGet('dic_dim_name', 'inn', tuple(org_id, name)) as inn,
    sum(value)
 from fct_contract
 group by inn;
10000 rows in set. Elapsed: 0.888 sec. Processed 15.00 million rows, 765.01 MB (16.89 million rows/s., 861.45 MB/s.)
 Peak memory usage: 20.56 MiB.

Мы не только получили верный результат, наш запрос отработал за 888 миллисекунд! В то время как предыдущий запрос без агрегации, но с джойном двух таблиц по алгоритму partial_merge отрабатывал за 23 секунды.

У функции dictGet есть альтернативы. Для добавления значений по умолчанию при отсутствии совпадения можно использовать dictGetOrDefault, это аналог Left Join. С помощью dictHas в секции where мы может отрезать часть данных таблицы, для которых нет соответствующего ключа в словаре. В документации описаны и другие функции.

Словари значительно эффективнее джойнов: с их помощью можно увеличить скорость расчетов, а также извлекать данные из разных источников (другие базы, файлы и даже URL). ClickHouse прекрасно работает со словарями и позволяет размещать в них десятки миллионов значений.

Но есть и минусы:

1)     Данные могут устаревать, и словарям надо обновляться. Мы можем настроить период жизни данных, через который словарь сделает это автоматически, но для больших словарей это может занять довольно длительное время.

 2)     Словарь загружается в память и хранится там на постоянной основе:

  •  при обращении к нему командой select * from dict_name;

  • командой system reload dictionary dict_name;

  • при старте сервера, если параметр dictionaries_lazy_load выставлен в false.

Словари могут забирать много оперативной памяти и сами ее не освобождают. При dictionaries_lazy_load=false не поможет даже перезагрузка сервера. Если словарь постоянно используется и есть выгода в скорости обработки данных, то все хорошо. Но если он применяется однократно в каком-то большом расчете, стоит вытолкнуть его из оперативной памяти. Для этого можно открепить словарь (detach), при этом СУБД «забудет» о его существовании, и он будет вытеснен из памяти, но все метаданные будут сохранены. После этого подсоединяем словарь обратно.

Открепляем словарь:

detach dictionary dic_dim_name;

Возвращаем на место:

attach dictionary dic_dim_name;

Проверим занимаемое место:

select name, formatReadableSize(bytes_allocated)
 from system.dictionaries
 where name = 'dic_dim_name';
┌─name─────────┬─formatReadableSize(bytes_allocated)─┐
│ dic_dim_name │ 0.00 B                              │
└──────────────┴─────────────────────────────────────┘

Как видим, теперь словарь не занимает оперативной памяти.

Итак, в этом материале мы рассказали про важные настройки в ClickHouse, которые позволяют успешно работать с базой данных:

  • join_algorithm — если мы используем джойны в запросах, мы должны контролировать алгоритм выполнения;

  • max_bytes_before_external_sort — для нетипичной сортировки таблицы;

  • max_bytes_before_external_group_by — для объемных агрегаций;

  • max_memory_usage — позволяет контролировать память, доступную запросу.

Еще мы наглядно продемонстрировали, как важно заниматься предварительным отбором данных перед соединением таблиц, сократив время выполнения скрипта в сотню раз! А также показали, как успешно применять словари в запросах для избегания операций соединения, и научились освобождать их из памяти после использования.

В следующей статье планируем рассказать, как мы запилили бэкап распределенной СУБД.

© Habrahabr.ru