Разработка пользовательских агрегатных функций для аналитики в MySQL

223ec9841ddb87f1b31a34f8b404faad.jpg

Привет, Хабр!

Вы когда‑нибудь писали аналитические запросы в MySQL и понимали, что встроенных функций вам не хватает? Хотите посчитать медиану зарплат? 99-й процентиль времени ответа запросов? Собрать JSON‑массив прямо в базе данных?

В MySQL нет MEDIAN(), PERCENTILE_CONT() и нормального способа объединить данные в JSON. Всё приходится делать через костыли.

Решение? Написать собственную агрегатную функцию на C++, которая будет работать так же, как SUM() и AVG(), но делать то, что вам реально нужно.

Почему UDF на C++?

В MySQL есть несколько способов расширять возможности:

  • Хранимые процедуры (SQL) → медленные, не работают с агрегатами.

  • Плагины (C++) → сложно писать, требует компиляции.

  • UDF (C++) → быстрее SQL, работает как встроенная функция, но требует компиляции.

Поэтому если нужна быстрая кастомная функция типа SUM() или AVG(), UDF — лучший вариант.

Почему стандартные агрегатные функции MySQL не всегда работают?

Допустим, есть таблица employees:

CREATE TABLE employees (
    id INT AUTO_INCREMENT PRIMARY KEY, 
    department VARCHAR(50), 
    salary DOUBLE
);

INSERT INTO employees (department, salary) VALUES
('IT', 100000), ('IT', 120000), ('IT', 110000), 
('HR', 50000), ('HR', 60000), ('HR', 55000);

Мы хотим посчитать медианную зарплату в каждом отделе.

Среднее (AVG()) не всегда даёт правильную картину. Например:

SELECT department, AVG(salary) FROM employees GROUP BY department;

Допустим, есть три зарплаты: 100k, 120k, 5M, то AVG (salary) = 1.74M, но реальная медиана = 120k.

Можно попробовать SQL‑хак с GROUP_CONCAT():

SELECT department, 
       SUBSTRING_INDEX(SUBSTRING_INDEX(GROUP_CONCAT(salary ORDER BY salary), ',', COUNT(*)/2), ',', -1) AS median
FROM employees GROUP BY department;

Но этот запрос сломается, если зарплат много. GROUP_CONCAT() имеет лимит (group_concat_max_len), а если данных миллионы строк, MySQL начнёт тормозить.

Что делать? Реализовать нормальную агрегатную функцию median().

Как MySQL выполняет агрегатные функции?

Любая агрегатная функция (SUM(), AVG(), COUNT()) в MySQL работает по трём этапам:

  1. Создаёт объект для хранения промежуточных данных (SUM_INIT()).

  2. Добавляет в него каждое значение (SUM_ADD()).

  3. Вычисляет и возвращает результат (SUM_FINAL()).

Для SUM (salary) MySQL держит переменную, в которую суммирует значения. Наша задача — сделать то же самое, но для медианы, JSON‑агрегации и квантилей.

Разработка UDF median () на C++

Создадим файл udf_median.cpp.

#include 
#include 
#include 

// Структура для хранения промежуточных значений
struct MedianCtx {
    std::vector values;
};

// Инициализация UDF
extern "C" my_bool median_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
    if (args->arg_count != 1 || args->arg_type[0] != REAL_RESULT) {
        strcpy(message, "median() принимает только один аргумент типа DOUBLE");
        return 1;
    }

    initid->ptr = (char*) new MedianCtx();
    return 0;
}

// Добавление нового значения в группу
extern "C" void median_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    MedianCtx *ctx = (MedianCtx*) initid->ptr;
    ctx->values.push_back(*(double*) args->args[0]);
}

// Вычисление медианы
extern "C" double median(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    MedianCtx *ctx = (MedianCtx*) initid->ptr;
    if (ctx->values.empty()) {
        *is_null = 1;
        return 0;
    }

    std::sort(ctx->values.begin(), ctx->values.end());
    size_t n = ctx->values.size();
    return (n % 2 == 0) ? (ctx->values[n/2 - 1] + ctx->values[n/2]) / 2 : ctx->values[n/2];
}

// Очистка памяти
extern "C" void median_deinit(UDF_INIT *initid) {
    delete (MedianCtx*) initid->ptr;
}

median_init() создаёт объект для хранения значений. median_add() добавляет в массив новые числа. median() сортирует массив и возвращает медиану. median_deinit() чистит память, чтобы не было утечек.

Компиляция и установка UDF в MySQL

g++ -shared -o udf_median.so -fPIC udf_median.cpp `mysql_config --include`
sudo cp udf_median.so /usr/lib/mysql/plugin/

Регистрируем функцию:

CREATE AGGREGATE FUNCTION median RETURNS REAL SONAME 'udf_median.so';

Теперь можно её использовать.

Ещё три примера использования UDF в MySQL

json_agg (): агрегируем данные в JSON

В MySQL нет аналога JSON_AGG(), который есть в PostgreSQL. Нжно собрать JSON‑массив значений, но GROUP_CONCAT() не поддерживает JSON‑кодирование.

Создаём файл udf_json_agg.cpp:

#include 
#include 
#include 
#include 

// Контекст для хранения JSON-данных
struct JsonAggCtx {
    std::vector values;
};

// Инициализация UDF
extern "C" my_bool json_agg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
    if (args->arg_count != 1 || args->arg_type[0] != STRING_RESULT) {
        strcpy(message, "json_agg() принимает только один аргумент типа STRING");
        return 1;
    }
    initid->ptr = (char*) new JsonAggCtx();
    return 0;
}

// Добавление нового значения
extern "C" void json_agg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    if (args->args[0] == NULL) return;  // Пропускаем NULL-значения
    JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr;
    ctx->values.push_back(std::string(args->args[0]));
}

// Возвращаем JSON
extern "C" char* json_agg(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {
    JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr;
    std::ostringstream json;
    json << "[";
    for (size_t i = 0; i < ctx->values.size(); ++i) {
        if (i > 0) json << ",";
        json << "\"" << ctx->values[i] << "\"";
    }
    json << "]";
    std::string json_str = json.str();
    *length = json_str.size();

    char* res = strdup(json_str.c_str());
    initid->ptr = (char*) res;  // Сохраняем для освобождения
    return res;
}

// Очистка памяти
extern "C" void json_agg_deinit(UDF_INIT *initid) {
    free(initid->ptr);  // Очищаем память после использования
    delete (JsonAggCtx*) initid->ptr;
}

Теперь можно делать вот так:

SELECT order_id, json_agg(product_name) FROM orders GROUP BY order_id;

И получить нормальный JSON:

[
    {"order_id": 1, "products": ["Laptop", "Mouse", "Keyboard"]},
    {"order_id": 2, "products": ["Phone", "Charger"]}
]

Теперь можно работать с JSON‑функциями MySQL (JSON_EXTRACT(), JSON_CONTAINS()).

percentile (): вычисляем 95-й и 99-й процентиль

В аналитике важно знать 90-й, 95-й, 99-й процентиль. Если средний запрос выполняется за 100 мс, но 99-й процентиль = 2 секунды → значит, у 1% пользователей сайт жутко тормозит. Для метрик SLA важно следить не за средним временем ответа, а за худшими 1–5%. В PostgreSQL есть PERCENTILE_CONT(), в MySQL — нет.

Напишем UDF percentile(), который работает с GROUP BY:

#include 
#include 
#include 

// Контекст для хранения значений
struct PercentileCtx {
    std::vector values;
};

// Инициализация UDF
extern "C" my_bool percentile_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
    if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != REAL_RESULT) {
        strcpy(message, "percentile() принимает два аргумента: (double, double)");
        return 1;
    }
    initid->ptr = (char*) new PercentileCtx();
    return 0;
}

// Добавляем новое значение в массив
extern "C" void percentile_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    if (args->args[1] == NULL) return;  // Пропускаем NULL-значения
    PercentileCtx *ctx = (PercentileCtx*) initid->ptr;
    ctx->values.push_back(*(double*) args->args[1]);
}

// Вычисляем процентиль
extern "C" double percentile(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    PercentileCtx *ctx = (PercentileCtx*) initid->ptr;
    if (ctx->values.empty()) {
        *is_null = 1;
        return 0;
    }

    double p = *(double*) args->args[0]; // Процентиль (например, 0.95)
    if (ctx->values.size() < 2) return ctx->values[0]; // Не сортируем 1 элемент

    std::sort(ctx->values.begin(), ctx->values.end());
    size_t index = (size_t)(p * ctx->values.size());
    return ctx->values[std::min(index, ctx->values.size() - 1)];
}

// Очистка памяти
extern "C" void percentile_deinit(UDF_INIT *initid) {
    delete (PercentileCtx*) initid->ptr;
}

Теперь можно делать:

SELECT percentile(0.95, response_time) FROM api_logs;

И получить 95-й процентиль времени ответа API.

running_avg ()

Если у вас есть данные по временным рядам (например, средняя температура за день), нужно усреднять данные за N последних дней.

Напишем UDF running_avg(), который считает среднее по скользящему окну:

#include 
#include 

// Контекст для хранения скользящего среднего
struct RunningAvgCtx {
    std::deque window;
    double sum = 0;
    size_t window_size;
};

// Инициализация UDF
extern "C" my_bool running_avg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
    if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != INT_RESULT) {
        strcpy(message, "running_avg() принимает два аргумента: (double, int)");
        return 1;
    }
    initid->ptr = (char*) new RunningAvgCtx();
    return 0;
}

// Добавляем значение
extern "C" void running_avg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr;
    double value = *(double*) args->args[0];
    size_t window_size = *(long long*) args->args[1];

    if (window_size == 0) return;  // Защита от деления на 0

    ctx->window.push_back(value);
    ctx->sum += value;

    if (ctx->window.size() > window_size) {
        ctx->sum -= ctx->window.front();
        ctx->window.pop_front();
    }
}

// Возвращаем среднее
extern "C" double running_avg(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {
    RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr;
    return ctx->window.empty() ? 0 : ctx->sum / ctx->window.size();
}

// Очистка памяти
extern "C" void running_avg_deinit(UDF_INIT *initid) {
    delete (RunningAvgCtx*) initid->ptr;
}

Теперь можно делать:

SELECT date, running_avg(temperature, 7) OVER (ORDER BY date) FROM weather;

И получать усреднённые данные по 7 дням.

Всем, кому интересен системный анализ, рекомендуем обратить внимание на открытые уроки, которые пройдут в Otus в марте:

  • 6 марта. Use Cases: Как улучшить требования к проекту.
    Узнать подробнее

  • 17 марта. Переезд с монолита на микросервисы: когда, зачем и как.
    Узнать подробнее

© Habrahabr.ru