Разработка пользовательских агрегатных функций для аналитики в MySQL

Привет, Хабр!
Вы когда‑нибудь писали аналитические запросы в MySQL и понимали, что встроенных функций вам не хватает? Хотите посчитать медиану зарплат? 99-й процентиль времени ответа запросов? Собрать JSON‑массив прямо в базе данных?
В MySQL нет MEDIAN()
, PERCENTILE_CONT()
и нормального способа объединить данные в JSON. Всё приходится делать через костыли.
Решение? Написать собственную агрегатную функцию на C++, которая будет работать так же, как SUM()
и AVG()
, но делать то, что вам реально нужно.
Почему UDF на C++?
В MySQL есть несколько способов расширять возможности:
Хранимые процедуры (SQL) → медленные, не работают с агрегатами.
Плагины (C++) → сложно писать, требует компиляции.
UDF (C++) → быстрее SQL, работает как встроенная функция, но требует компиляции.
Поэтому если нужна быстрая кастомная функция типа SUM()
или AVG()
, UDF — лучший вариант.
Почему стандартные агрегатные функции MySQL не всегда работают?
Допустим, есть таблица employees:
CREATE TABLE employees (
id INT AUTO_INCREMENT PRIMARY KEY,
department VARCHAR(50),
salary DOUBLE
);
INSERT INTO employees (department, salary) VALUES
('IT', 100000), ('IT', 120000), ('IT', 110000),
('HR', 50000), ('HR', 60000), ('HR', 55000);
Мы хотим посчитать медианную зарплату в каждом отделе.
Среднее (AVG()
) не всегда даёт правильную картину. Например:
SELECT department, AVG(salary) FROM employees GROUP BY department;
Допустим, есть три зарплаты: 100k, 120k, 5M, то AVG (salary) = 1.74M, но реальная медиана = 120k.
Можно попробовать SQL‑хак с GROUP_CONCAT()
:
SELECT department,
SUBSTRING_INDEX(SUBSTRING_INDEX(GROUP_CONCAT(salary ORDER BY salary), ',', COUNT(*)/2), ',', -1) AS median
FROM employees GROUP BY department;
Но этот запрос сломается, если зарплат много. GROUP_CONCAT()
имеет лимит (group_concat_max_len
), а если данных миллионы строк, MySQL начнёт тормозить.
Что делать? Реализовать нормальную агрегатную функцию median()
.
Как MySQL выполняет агрегатные функции?
Любая агрегатная функция (SUM()
, AVG()
, COUNT()
) в MySQL работает по трём этапам:
Создаёт объект для хранения промежуточных данных (
SUM_INIT()
).Добавляет в него каждое значение (
SUM_ADD()
).Вычисляет и возвращает результат (
SUM_FINAL()
).
Для SUM (salary) MySQL держит переменную, в которую суммирует значения. Наша задача — сделать то же самое, но для медианы, JSON‑агрегации и квантилей.
Разработка UDF median () на C++
Создадим файл udf_median.cpp.
#include
#include
#include
// Структура для хранения промежуточных значений
struct MedianCtx {
std::vector values;
};
// Инициализация UDF
extern "C" my_bool median_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
if (args->arg_count != 1 || args->arg_type[0] != REAL_RESULT) {
strcpy(message, "median() принимает только один аргумент типа DOUBLE");
return 1;
}
initid->ptr = (char*) new MedianCtx();
return 0;
}
// Добавление нового значения в группу
extern "C" void median_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
MedianCtx *ctx = (MedianCtx*) initid->ptr;
ctx->values.push_back(*(double*) args->args[0]);
}
// Вычисление медианы
extern "C" double median(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
MedianCtx *ctx = (MedianCtx*) initid->ptr;
if (ctx->values.empty()) {
*is_null = 1;
return 0;
}
std::sort(ctx->values.begin(), ctx->values.end());
size_t n = ctx->values.size();
return (n % 2 == 0) ? (ctx->values[n/2 - 1] + ctx->values[n/2]) / 2 : ctx->values[n/2];
}
// Очистка памяти
extern "C" void median_deinit(UDF_INIT *initid) {
delete (MedianCtx*) initid->ptr;
}
median_init()
создаёт объект для хранения значений. median_add()
добавляет в массив новые числа. median()
сортирует массив и возвращает медиану. median_deinit()
чистит память, чтобы не было утечек.
Компиляция и установка UDF в MySQL
g++ -shared -o udf_median.so -fPIC udf_median.cpp `mysql_config --include`
sudo cp udf_median.so /usr/lib/mysql/plugin/
Регистрируем функцию:
CREATE AGGREGATE FUNCTION median RETURNS REAL SONAME 'udf_median.so';
Теперь можно её использовать.
Ещё три примера использования UDF в MySQL
json_agg (): агрегируем данные в JSON
В MySQL нет аналога JSON_AGG()
, который есть в PostgreSQL. Нжно собрать JSON‑массив значений, но GROUP_CONCAT()
не поддерживает JSON‑кодирование.
Создаём файл udf_json_agg.cpp:
#include
#include
#include
#include
// Контекст для хранения JSON-данных
struct JsonAggCtx {
std::vector values;
};
// Инициализация UDF
extern "C" my_bool json_agg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
if (args->arg_count != 1 || args->arg_type[0] != STRING_RESULT) {
strcpy(message, "json_agg() принимает только один аргумент типа STRING");
return 1;
}
initid->ptr = (char*) new JsonAggCtx();
return 0;
}
// Добавление нового значения
extern "C" void json_agg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
if (args->args[0] == NULL) return; // Пропускаем NULL-значения
JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr;
ctx->values.push_back(std::string(args->args[0]));
}
// Возвращаем JSON
extern "C" char* json_agg(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {
JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr;
std::ostringstream json;
json << "[";
for (size_t i = 0; i < ctx->values.size(); ++i) {
if (i > 0) json << ",";
json << "\"" << ctx->values[i] << "\"";
}
json << "]";
std::string json_str = json.str();
*length = json_str.size();
char* res = strdup(json_str.c_str());
initid->ptr = (char*) res; // Сохраняем для освобождения
return res;
}
// Очистка памяти
extern "C" void json_agg_deinit(UDF_INIT *initid) {
free(initid->ptr); // Очищаем память после использования
delete (JsonAggCtx*) initid->ptr;
}
Теперь можно делать вот так:
SELECT order_id, json_agg(product_name) FROM orders GROUP BY order_id;
И получить нормальный JSON:
[
{"order_id": 1, "products": ["Laptop", "Mouse", "Keyboard"]},
{"order_id": 2, "products": ["Phone", "Charger"]}
]
Теперь можно работать с JSON‑функциями MySQL (JSON_EXTRACT()
, JSON_CONTAINS()
).
percentile (): вычисляем 95-й и 99-й процентиль
В аналитике важно знать 90-й, 95-й, 99-й процентиль. Если средний запрос выполняется за 100 мс, но 99-й процентиль = 2 секунды → значит, у 1% пользователей сайт жутко тормозит. Для метрик SLA важно следить не за средним временем ответа, а за худшими 1–5%. В PostgreSQL есть PERCENTILE_CONT()
, в MySQL — нет.
Напишем UDF percentile()
, который работает с GROUP BY
:
#include
#include
#include
// Контекст для хранения значений
struct PercentileCtx {
std::vector values;
};
// Инициализация UDF
extern "C" my_bool percentile_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != REAL_RESULT) {
strcpy(message, "percentile() принимает два аргумента: (double, double)");
return 1;
}
initid->ptr = (char*) new PercentileCtx();
return 0;
}
// Добавляем новое значение в массив
extern "C" void percentile_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
if (args->args[1] == NULL) return; // Пропускаем NULL-значения
PercentileCtx *ctx = (PercentileCtx*) initid->ptr;
ctx->values.push_back(*(double*) args->args[1]);
}
// Вычисляем процентиль
extern "C" double percentile(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
PercentileCtx *ctx = (PercentileCtx*) initid->ptr;
if (ctx->values.empty()) {
*is_null = 1;
return 0;
}
double p = *(double*) args->args[0]; // Процентиль (например, 0.95)
if (ctx->values.size() < 2) return ctx->values[0]; // Не сортируем 1 элемент
std::sort(ctx->values.begin(), ctx->values.end());
size_t index = (size_t)(p * ctx->values.size());
return ctx->values[std::min(index, ctx->values.size() - 1)];
}
// Очистка памяти
extern "C" void percentile_deinit(UDF_INIT *initid) {
delete (PercentileCtx*) initid->ptr;
}
Теперь можно делать:
SELECT percentile(0.95, response_time) FROM api_logs;
И получить 95-й процентиль времени ответа API.
running_avg ()
Если у вас есть данные по временным рядам (например, средняя температура за день), нужно усреднять данные за N последних дней.
Напишем UDF running_avg()
, который считает среднее по скользящему окну:
#include
#include
// Контекст для хранения скользящего среднего
struct RunningAvgCtx {
std::deque window;
double sum = 0;
size_t window_size;
};
// Инициализация UDF
extern "C" my_bool running_avg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != INT_RESULT) {
strcpy(message, "running_avg() принимает два аргумента: (double, int)");
return 1;
}
initid->ptr = (char*) new RunningAvgCtx();
return 0;
}
// Добавляем значение
extern "C" void running_avg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr;
double value = *(double*) args->args[0];
size_t window_size = *(long long*) args->args[1];
if (window_size == 0) return; // Защита от деления на 0
ctx->window.push_back(value);
ctx->sum += value;
if (ctx->window.size() > window_size) {
ctx->sum -= ctx->window.front();
ctx->window.pop_front();
}
}
// Возвращаем среднее
extern "C" double running_avg(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr;
return ctx->window.empty() ? 0 : ctx->sum / ctx->window.size();
}
// Очистка памяти
extern "C" void running_avg_deinit(UDF_INIT *initid) {
delete (RunningAvgCtx*) initid->ptr;
}
Теперь можно делать:
SELECT date, running_avg(temperature, 7) OVER (ORDER BY date) FROM weather;
И получать усреднённые данные по 7 дням.
Всем, кому интересен системный анализ, рекомендуем обратить внимание на открытые уроки, которые пройдут в Otus в марте:
6 марта. Use Cases: Как улучшить требования к проекту.
Узнать подробнее17 марта. Переезд с монолита на микросервисы: когда, зачем и как.
Узнать подробнее