Пользовательские агрегатные и оконные функции в PostgreSQL и Oracle

habr.png


В этой статье мы посмотрим, как в двух системах создавать пользовательские агрегатные и оконные (в терминологии Oracle — аналитические) функции. Несмотря на различия в синтаксисе и в целом в подходе к расширяемости, механизм этих функций очень похож. Но и различия тоже имеются.

Надо признать, что собственные агрегатные и оконные функции встречается довольно редко. Оконные функции вообще по каким-то причинам традиционно относят к разряду «продвинутого» SQL и считают сложными для понимания и освоения. Тут бы разобраться с теми функциями, которые уже имеются в СУБД!

Зачем тогда вообще вникать в этот вопрос? Могу назвать несколько причин:

  • Хотя оконные функции объективно сложнее обычных агрегатных, но ничего запредельного в них нет; это абсолютно необходимый инструмент для SQL-разработчика. А создание собственной оконной функции, даже совсем простой, позволяет лучше разобраться с тем, как работают стандартные.
  • Оконные и агрегатные функции — прекрасный способ совместить процедурную обработку с декларативной логикой. В некоторых ситуациях получается выполнить сложные действия, оставаясь в рамках парадигмы решения задачи одним SQL-запросом.
  • Да и просто интересная тема, а уж тем более интересно сравнить две системы.


Пример, на котором будем тренироваться — подсчет среднего, аналог стандартной функции avg для типа numeric (number в Oracle). Мы напишем такую функцию и посмотрим, как она работает в агрегатном и оконном режимах и может ли она вычисляться несколькими параллельными процессами. А в заключение поглядим на пример из реальной жизни.

Агрегатные функции
Будем двигаться от простого к сложному, переключаясь между PostgreSQL и Oracle.

Вначале некоторые общие соображения. Любая агрегатная функция вызывается для каждой строки таблицы по очереди и в конечном итоге обрабатывает их все. Между вызовами ей требуется сохранять внутреннее состояние, определяющее контекст ее выполнения. В конце работы она должна вернуть итоговое значение.

Итак, нам потребуется четыре составляющие:

  • Состояние (контекст),
  • Функция обработки очередной строки,
  • Функция выдачи итогового результата,
  • Указание на то, что предыдущие три пункта составляют агрегатную функцию.


PostgreSQL


Для хранения состояния нужно выбрать подходящий тип данных. Можно взять стандартный, а можно определить свой. Для функции, вычисляющей среднее, нужно отдельно суммировать значения и отдельно подсчитывать их количество. Поэтому создадим свой составной тип с двумя полями:

CREATE TYPE average_state AS (
  accum numeric,
  qty numeric
);

Теперь определим функцию для обработки очередного значения. В PostgreSQL она называется функцией перехода:

CREATE OR REPLACE FUNCTION average_transition(
  state average_state,
  val numeric
) RETURNS average_state AS $$
BEGIN
  RAISE NOTICE '%(%) + %', state.accum, state.qty, val;
  RETURN ROW(state.accum+val, state.qty+1)::average_state;
END;
$$ LANGUAGE plpgsql;

Функция принимает текущее состояние и очередное значение, а возвращает новое состояние: значения суммируются, а к количеству добавляется единица.

Кроме этого, мы выводим (RAISE NOTICE) параметры функции — это позволит нам увидеть, как выполняется работа. Старый добрый отладочный PRINT, нет ничего тебя лучше.

Следующая функция — возвращение финального значения:

CREATE OR REPLACE FUNCTION average_final(
  state average_state
) RETURNS numeric AS $$
BEGIN
  RAISE NOTICE '= %(%)', state.accum, state.qty;
  RETURN CASE WHEN state.qty > 0 THEN
    trim(trailing '0' from (state.accum/state.qty)::text)::numeric
  END;
END;
$$ LANGUAGE plpgsql;

Функция принимает состояние и возвращает результирующее число. Для этого просто делим накопленную сумму на количество. Но при нулевом количестве возвращаем NULL (так поступает и avg).

«Финт ушами» с функцией trim нужен исключительно для аккуратности вывода: таким образом мы избавляемся от незначащих нулей, которые иначе будут загромождать экран и мешать восприятию. Примерно вот так:

SELECT 1::numeric / 2::numeric;
        ?column?
------------------------
 0.50000000000000000000
(1 row)

В реальной жизни эти фокусы, конечно, не нужны.

И, наконец, определяем собственно агрегатную функцию. Для этого используется специальная команда CREATE AGGREGATE:

CREATE AGGREGATE average(numeric) (
  sfunc     = average_transition,
  stype     = average_state,
  finalfunc = average_final,
  initcond  = '(0,0)'
);

В этой команде указывается тип данных для состояния (stype), две наши функции (sfunc и finalfunc) и еще начальное значение состояния (initcond) в виде строковой константы.

Можно пробовать. Почти все примеры в этой статье будут использовать простую таблицу с пятью строками: раз, два, три, четыре, пять. Таблицу создаем на лету функцией generate_series, незаменимым помощником генерации тестовых данных:

SELECT average(g.x) FROM generate_series(1,5) AS g(x);
NOTICE:  0(0) + 1
NOTICE:  1(1) + 2
NOTICE:  3(2) + 3
NOTICE:  6(3) + 4
NOTICE:  10(4) + 5
NOTICE:  = 15(5)
 average
---------
       3
(1 row)

Результат верный, а вывод функций позволяет проследить ход выполнения:

  • Состояние было установлено в (0,0),
  • Функция average_transition последовательно вызывалась пять раз, постепенно изменяя состояние,
  • В конце была вызвана функция average_final, которая и получила 3 = 15/5.


Еще одна проверка — на пустом множестве:

SELECT average(g.x) FROM generate_series(1,0) AS g(x);
NOTICE:  = 0(0)
 average
---------
    
(1 row)

Oracle


В Oracle вся расширяемость обеспечивается механизмом Data Cartridge. Говоря по-простому, нам потребуется создать объектный тип, реализующий необходимый для агрегации интерфейс. Контекст естественным образом представляется атрибутами этого объекта.

CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
  accum number,
  qty   number,
  STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl)
    RETURN number,
  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number
    RETURN number,
  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl)
    RETURN number,
  MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number)
    RETURN number
);
/

Начальное значение контекста определяется здесь не константой, а отдельной (статической, то есть не привязанной к конкретному экземпляру объекта) функцией ODCIAggregateInitialize.

Функция, вызываемая для каждой строки — это ODCIAggregateIterate.

Результат возвращает функция ODCIAggregateTerminate, и ей, заметьте, передаются некие флаги, с которыми мы разберемся чуть позже.

Интерфейс включает еще одну обязательную функцию: ODCIAggregateMerge. Мы ее определим — куда ж деваться,  —, но разговор о ней пока отложим.

Теперь создадим тело объекта с реализацией перечисленных методов.

CREATE OR REPLACE TYPE BODY AverageImpl IS
  STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl)
  RETURN number IS
  BEGIN
    actx := AverageImpl(0,0);
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number)
  RETURN number IS
  BEGIN
    dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
    self.accum := self.accum + val;
    self.qty := self.qty + 1;
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl)
  RETURN number IS
  BEGIN
    dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
    self.accum := self.accum + ctx2.accum;
    self.qty := self.qty + ctx2.qty;
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number)
  RETURN number IS
  BEGIN
    dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
    returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
    RETURN ODCIConst.Success;
  END;
END;
/

Реализация, по большей части, повторяет все то, что мы делали для PostgreSQL, но в немного другом синтаксисе.

Trim-пляски вокруг возвращаемого значения не нужны: Oracle самостоятельно отрезает незначащие нули при выводе значения.

Обратите внимание, что все функции возвращают признак успешности выполнения (значение ODCIConst.Success), а смысловые значения передаются через параметры OUT и IN OUT (которые в PL/SQL никак не связаны с собственно возвращаемым значением, как в PL/pgSQL). В частности, любая функция, в том числе и ODCIAggregateTerminate, может изменять атрибуты своего объекта, ссылка на который передается ей в первом параметре (self).

Определение агрегатной функции выглядит следующим образом:

CREATE OR REPLACE FUNCTION average(val number) RETURN number
  AGGREGATE USING AverageImpl;
/

Проверяем. Для генерации значений используем идиоматическую конструкцию с рекурсивным запросом CONNECT BY level:

SELECT average(level) FROM dual CONNECT BY level <= 5;
AVERAGE(LEVEL)
--------------
             3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:0

Можно обратить внимание на то, что вывод сообщений в PostgreSQL появляется до результата, а в Oracle — после. Это из-за того, что RAISE NOTICE работает асинхронно, а пакет dbms_output буферизует вывод.

Как мы видим, в функцию ODCIAggregateTerminate был передан нулевой флаг. Это означает, что контекст больше не требуется и его — при желании — можно забыть.

И проверка на пустом множестве:

SELECT average(rownum) FROM dual WHERE 1 = 0;
AVERAGE(ROWNUM)
---------------

= 0(0) flags:0

Оконные функции: OVER ()
Хорошая новость: написанная нами агрегатная функция может без всяких изменений работать и как оконная (аналитическая).

Оконная функция отличается от агрегатной тем, что не сворачивает выборку в одну (агрегированную) строку, а вычисляется как бы отдельно для каждой строки. Синтаксически вызов оконной функции отличается наличием конструкции OVER с указанием рамки, которая определяет множество строк для обработки. В простейшем случае она так и записывается: OVER (), и это означает, что функция должна обработать все строки. Результат получается такой, как будто мы посчитали обычную агрегатную функцию и записали результат (один и тот же) напротив каждой строки выборки.

Иными словами, рамка статична и охватывает все строки:

1.      2.      3.      4.      5.
+---+   +---+   +---+   +---+   +---+
| 1 |   | 1 |   | 1 |   | 1 |   | 1 |
| 2 |   | 2 |   | 2 |   | 2 |   | 2 |
| 3 |   | 3 |   | 3 |   | 3 |   | 3 |
| 4 |   | 4 |   | 4 |   | 4 |   | 4 |
| 5 |   | 5 |   | 5 |   | 5 |   | 5 |
+---+   +---+   +---+   +---+   +---+


PostgreSQL


Попробуем:

SELECT g.x, average(g.x) OVER ()
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  1(1) + 2
NOTICE:  3(2) + 3
NOTICE:  6(3) + 4
NOTICE:  10(4) + 5
NOTICE:  = 15(5)
 x | average
---+---------
 1 |       3
 2 |       3
 3 |       3
 4 |       3
 5 |       3
(5 rows)

По выводу NOTICE видно, что все происходит точно так же, как и ранее при вычислении обычной агрегатной функции. Получив результат от функции average_final, PostgreSQL проставляет его в каждой строке.

Oracle


SELECT average(level) OVER() average
FROM dual CONNECT BY level <= 5;

     LEVEL     AVERAGE
---------- -----------
         1           3
         2           3
         3           3
         4           3
         5           3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:0

Неожиданно. Вместо того, чтобы вычислить результат один раз, Oracle вызывает функцию ODCIAggregateTerminate N+1 раз: сначала для каждой строки с флагом 1 (что означает, что контекст еще пригодится) и затем еще один раз в конце. Значение, полученное при последнем вызове, просто игнорируется.

Вывод такой: если в функции ODCIAggregateTerminate используется вычислительно сложная логика, надо подумать о том, чтобы не делать одну и ту же работу несколько раз.

Оконные функции: OVER (PARTITION BY)
Предложение PARTITION BY в определении рамки похоже на обычную агрегатную конструкцию GROUP BY. Оконная функция с указанием PARTITION BY вычисляется отдельно для каждой группы строк, и результат приписывается к каждой строке выборки.

В таком варианте рамка тоже статична, но для каждой группы она разная. Например, если определены две группы строк (с первой по вторую и с третьей по пятую), то рамку можно представить себе так:

1.      2.      3.      4.      5.
+---+   +---+
| 1 |   | 1 |
| 2 |   | 2 |   +---+   +---+   +---+
+---+   +---+   | 3 |   | 3 |   | 3 |
                | 4 |   | 4 |   | 4 |
                | 5 |   | 5 |   | 5 |
                +---+   +---+   +---+


PostgreSQL


SELECT g.x/3 part,
       g.x,
       average(g.x) OVER (PARTITION BY g.x/3)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  0(0) + 3
NOTICE:  3(1) + 4
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
 part | x | average
------+---+---------
    0 | 1 |     1.5
    0 | 2 |     1.5
    1 | 3 |       4
    1 | 4 |       4
    1 | 5 |       4
(5 rows)

Вычисление снова происходит последовательно, но теперь при переходе к другой группе строк состояние сбрасывается в начальное значение (initcond).

Oracle


SELECT trunc(level/3) part,
       level,
       average(level) OVER(PARTITION BY trunc(level/3)) average
FROM dual CONNECT BY level <= 5;

      PART      LEVEL    AVERAGE
---------- ---------- ----------
         0          2        1.5
         0          1        1.5
         1          4          4
         1          5          4
         1          3          4
0(0) + 2
2(1) + 1
= 3(2) flags:1
= 3(2) flags:1
0(0) + 4
4(1) + 5
9(2) + 3
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:0

Занятно, что Oracle решил переставить строки местами. Это может что-то сказать о деталях реализации, но в любом случае — имеет право.

Оконные функции: OVER (ORDER BY)
Если в определение рамки добавить предложение ORDER BY, указывающее порядок сортировки, функция начнет работать в режиме нарастания (для функции sum мы бы так и сказали — нарастающим итогом).

Для первой строки рамка будет состоять из одной этой строки; для второй — из первой и второй; для третьей — из первой, второй и третьей и так далее. Иными словами, в рамку будут входить строки с первой до текущей.

На самом деле, это можно ровно так и записать: OVER (ORDER BY… ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), но, поскольку это многословие подразумеваются по умолчанию, его обычно опускают.

Итак, рамка перестает быть статичной: ее голова движется вниз, а хвост остается на месте:

1.      2.      3.      4.      5.
+---+   +---+   +---+   +---+   +---+
| 1 |   | 1 |   | 1 |   | 1 |   | 1 |
+---+   | 2 |   | 2 |   | 2 |   | 2 |
        +---+   | 3 |   | 3 |   | 3 |
                +---+   | 4 |   | 4 |
                        +---+   | 5 |
                                +---+


PostgreSQL


SELECT g.x, average(g.x) OVER (ORDER BY g.x)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  3(2) + 3
NOTICE:  = 6(3)
NOTICE:  6(3) + 4
NOTICE:  = 10(4)
NOTICE:  10(4) + 5
NOTICE:  = 15(5)
 x | average
---+---------
 1 |       1
 2 |     1.5
 3 |       2
 4 |     2.5
 5 |       3
(5 rows)

Как видим, строки все так же добавляются к контексту по одной, но теперь функция average_final вызывается после каждого добавления, выдавая промежуточный итог.

Oracle


SELECT level, average(level) OVER(ORDER BY level) average
FROM dual CONNECT BY level <= 5;

     LEVEL    AVERAGE
---------- ----------
         1          1
         2        1.5
         3          2
         4        2.5
         5          3
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) + 4
= 10(4) flags:1
10(4) + 5
= 15(5) flags:1
= 15(5) flags:0

На этот раз обе системы работают одинаково.

Оконные функции: OVER (PARTITION BY ORDER BY)
Предложения PARTITION BY и ORDER BY можно комбинировать. Тогда внутри каждой группы строк функция будет работать в режиме нарастания, а при переходе от группы к группе состояние будет сбрасываться в начальное.

1.      2.      3.      4.      5.
+---+   +---+
| 1 |   | 1 |
+---+   | 2 |   +---+   +---+   +---+
        +---+   | 3 |   | 3 |   | 3 |
                +---+   | 4 |   | 4 |
                        +---+   | 5 |
                                +---+


PostgreSQL


SELECT g.x/3 part,
       g.x,
       average(g.x) OVER (PARTITION BY g.x/3 ORDER BY g.x)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  0(0) + 3
NOTICE:  = 3(1)
NOTICE:  3(1) + 4
NOTICE:  = 7(2)
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
 part | x | average
------+---+---------
    0 | 1 |       1
    0 | 2 |     1.5
    1 | 3 |       3
    1 | 4 |     3.5
    1 | 5 |       4
(5 rows)

Oracle


SELECT trunc(level/3) part,
       level,
       average(level) OVER(PARTITION BY trunc(level/3) ORDER BY level) average
FROM dual CONNECT BY level <= 5;

      PART    LEVEL     AVERAGE
---------- ---------- ----------
     0        1           1
     0        2         1.5
     1        3           3
     1        4         3.5
     1        5           4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
0(0) + 3
= 3(1) flags:1
3(1) + 4
= 7(2) flags:1
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Оконные функции со скользящей рамкой
Во всех примерах, которые мы посмотрели, рамка либо была статической, либо двигалась только ее голова (при использовании предложения ORDER BY). Это давало нам возможность вычислять состояние последовательно, добавляя к контексту строку за строкой.

Но рамку оконной функции можно задать и таким образом, что ее хвост тоже будет смещаться. В нашем примере это будет соответствовать понятию скользящего среднего. Например, указание OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) говорит о том, что для каждой строки результата будут усредняться текущее и два предыдущих значений.

1.      2.      3.      4.      5.
+---+
|   |   +---+
|   |   |   |   +---+
| 1 |   | 1 |   | 1 |   +---+
+---+   | 2 |   | 2 |   | 2 |   +---+
        +---+   | 3 |   | 3 |   | 3 |
                +---+   | 4 |   | 4 |
                        +---+   | 5 |
                                +---+


Сможет ли вычисляться оконная функция в таком случае? Оказывается, сможет, правда неэффективно. Но, написав еще немного кода, можно улучшить ситуацию.

PostgreSQL


Посмотрим:

SELECT g.x,
       average(g.x) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  3(2) + 3
NOTICE:  = 6(3)
NOTICE:  0(0) + 2
NOTICE:  2(1) + 3
NOTICE:  5(2) + 4
NOTICE:  = 9(3)
NOTICE:  0(0) + 3
NOTICE:  3(1) + 4
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
 x | average
---+---------
 1 |       1
 2 |     1.5
 3 |       2
 4 |       3
 5 |       4
(5 rows)

Вплоть до третьей строки все идет хорошо, потому что хвост фактически не двигается: мы просто добавляем к уже имеющемуся контексту очередное значение. Но, поскольку мы не умеем убирать значение из контекста, для четвертой и пятой строк все приходится пересчитывать полностью, каждый раз возвращаясь к начальному состоянию.

Итак, было бы здорово иметь не только функцию добавления очередного значения, но и функцию удаления значения из состояния. И действительно, такую функцию можно создать:

CREATE OR REPLACE FUNCTION average_inverse(state average_state, val numeric)
RETURNS average_state AS $$
BEGIN
  RAISE NOTICE '%(%) - %', state.accum, state.qty, val;
  RETURN ROW(state.accum-val, state.qty-1)::average_state;
END;
$$ LANGUAGE plpgsql;

Чтобы оконная функция смогла ей воспользоваться, нужно пересоздать агрегат следующим образом:

DROP AGGREGATE average(numeric);
CREATE AGGREGATE average(numeric) (
  -- обычный вариант
  sfunc      = average_transition,
  stype      = average_state,
  finalfunc  = average_final,
  initcond   = '(0,0)',
  -- вариант с "обратной” функцией
  msfunc     = average_transition,
  minvfunc   = average_inverse,
  mstype     = average_state,
  mfinalfunc = average_final,
  minitcond  = '(0,0)'
);

Проверим:

SELECT g.x,
       average(g.x) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  3(2) + 3
NOTICE:  = 6(3)
NOTICE:  6(3) - 1
NOTICE:  5(2) + 4
NOTICE:  = 9(3)
NOTICE:  9(3) - 2
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
 x | average
---+---------
 1 |       1
 2 |     1.5
 3 |       2
 4 |       3
 5 |       4
(5 rows)

Вот теперь все в порядке: для четвертой и пятой строк мы удаляем из состояния хвостовое значение и добавляем новое.

Oracle


Тут ситуация аналогична. Созданный вариант аналитической функции работает, но неэффективно:

SELECT level,
       average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;

     LEVEL    AVERAGE
---------- ----------
         1          1
         2        1.5
         3          2
         4          3
         5          4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
0(0) + 2
2(1) + 3
5(2) + 4
= 9(3) flags:1
0(0) + 3
3(1) + 4
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Функция удаления значения из контекста определяется следующим образом:

MEMBER FUNCTION ODCIAggregateDelete(self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
  dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
  self.accum := self.accum - val;
  self.qty := self.qty - 1;
  RETURN ODCIConst.Success;
END;

Полный код для copy-paste
CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
  accum number,
  qty   number,
  STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl) RETURN number,
  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number) RETURN number,
  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl) RETURN number,
  MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number) RETURN number,
  MEMBER FUNCTION ODCIAggregateDelete(self IN OUT AverageImpl, val IN number) RETURN number
);
/
CREATE OR REPLACE TYPE BODY AverageImpl IS
  STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl)
  RETURN number IS
  BEGIN
    actx := AverageImpl(0,0);
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number)
  RETURN number IS
  BEGIN
    dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
    self.accum := self.accum + val;
    self.qty := self.qty + 1;
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl)
  RETURN number IS
  BEGIN
    dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
    self.accum := self.accum + ctx2.accum;
    self.qty := self.qty + ctx2.qty;
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number)
  RETURN number IS
  BEGIN
    dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
    returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
    RETURN ODCIConst.Success;
  END;
  MEMBER FUNCTION ODCIAggregateDelete(self IN OUT AverageImpl, val IN number)
  RETURN number IS
  BEGIN
    dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
    self.accum := self.accum - val;
    self.qty := self.qty - 1;
    RETURN ODCIConst.Success;
  END;
END;
/


Пересоздавать саму функцию не нужно. Проверим:

SELECT level,
       average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;

     LEVEL    AVERAGE
---------- ----------
         1          1
         2        1.5
         3          2
         4          3
         5          4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) - 1
5(2) + 4
= 9(3) flags:1
9(3) - 2
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Параллельность
И PostgreSQL, и Oracle (Enterprise Edition) умеют вычислять агрегатные функции в параллельном режиме. При этом каждый из параллельных процессов выполняет свою часть работы, формируя промежуточное состояние. Затем основной процесс-координатор получает эти несколько состояний и должен объединить их в одно итоговое.

Для этого нужна еще одна функция объединения, которую мы сейчас и напишем. В нашем случае она просто складывает и суммы, и количество значений.

PostgreSQL


Функция выглядит следующим образом:

CREATE OR REPLACE FUNCTION average_combine(state1 average_state, state2 average_state)
RETURNS average_state AS $$
BEGIN
  RAISE NOTICE '%(%) & %(%)', state1.accum, state1.qty, state2.accum, state2.qty;
  RETURN ROW(state1.accum+state2.accum, state1.qty+state2.qty)::average_state;
END;
$$ LANGUAGE plpgsql;

Еще мы уберем наш отладочный вывод из функции average_transition. При параллельном выполнении мы будем суммировать не пять значений, а больше, так что если этого не сделать, мы получим слишком много бесполезной информации.

Поскольку мы убираем вывод, то отпадает и необходимость использовать процедурный язык — напишем функцию на чистом SQL:

CREATE OR REPLACE FUNCTION average_transition(state average_state, val numeric)
RETURNS average_state AS $$
  SELECT ROW(state.accum+val, state.qty+1)::average_state;
$$ LANGUAGE sql;

Осталось пересоздать агрегат с учетом новой функции и указать, что его можно безопасно использовать в параллельном режиме:

DROP AGGREGATE average(numeric);
CREATE AGGREGATE average(numeric) (
  -- обычный вариант
  sfunc       = average_transition,
  stype       = average_state,
  finalfunc   = average_final,
  combinefunc = average_combine,
  initcond    = '(0,0)',
  -- вариант с "обратной” функцией
  msfunc      = average_transition,
  minvfunc    = average_inverse,
  mstype      = average_state,
  mfinalfunc  = average_final,
  minitcond   = '(0,0)',
  -- параллельность
  parallel = safe
);

Теперь создадим таблицу и заполним ее данными. Тысячи строк будет достаточно.

CREATE TABLE t(n) AS SELECT generate_series(1,1000)::numeric;

С настройками по умолчанию PostgreSQL не построит параллельный план для такой таблицы — слишком она мала,  —, но его несложно уговорить:

SET parallel_setup_cost=0;
SET min_parallel_table_scan_size=0;

EXPLAIN(costs off) SELECT average(n) FROM t;
                QUERY PLAN                
------------------------------------------
 Finalize Aggregate
   ->  Gather
         Workers Planned: 2
         ->  Partial Aggregate
               ->  Parallel Seq Scan on t

В плане запроса видим:

  • два запланированных рабочих процесса, выполняющих частичное агрегирование (Partial Aggregate),
  • узел Gather, собирающий информацию,
  • и итоговое объединение состояний (Finalize Aggregate).


Проверим:

SELECT average(n) FROM t;
NOTICE:  0(0) & 281257(678)
NOTICE:  281257(678) & 127803(226)
NOTICE:  409060(904) & 91440(96)
NOTICE:  = 500500(1000)
 average
---------
   500.5
(1 row)

Почему функция average_combine вызывается три раза, а не два? Дело в том, что в PostgreSQL координирующий процесс тоже выполняет часть работы. Поэтому, хотя было запущено два рабочих процесса, реально работа выполнялась в трех. Один из них успел обработать 678 строк, другой 226 и третий — 96 (хотя эти цифры ничего не значат и при другом запуске могут отличаться).

Oracle


Если помните, функцию ODCIAggregateMerge мы уже написали в самом начале, поскольку в Oracle она является обязательной. Документация настаивает, что эта функция необходима не только для параллельной работы, но и для последовательной — хотя мне трудно понять, зачем (и на практике не приходилось сталкиваться с ее выполнением при последовательной обработке).

Все, что остается сделать — объявить функцию безопасной для параллельной работы:

CREATE OR REPLACE FUNCTION average(val number) RETURN number
  PARALLEL_ENABLE
  AGGREGATE USING AverageImpl;
/

Создаем таблицу:

CREATE TABLE t(n) AS SELECT to_number(level) FROM dual CONNECT BY level <= 1000;

Уговорить Oracle еще проще, чем PostgreSQL — достаточно написать хинт. Вот какой получается план (вывод сильно урезан для простоты):

EXPLAIN PLAN FOR SELECT /*+ PARALLEL(2) */ average(n) FROM t;
SELECT * FROM TABLE(dbms_xplan.display);

---------------------------------
| Id  | Operation               |
---------------------------------
|   0 | SELECT STATEMENT        |
|   1 |  SORT AGGREGATE         |
|   2 |   PX COORDINATOR        |
|   3 |    PX SEND QC (RANDOM)  |
|   4 |     SORT AGGREGATE      |
|   5 |      PX BLOCK ITERATOR  |
|   6 |       TABLE ACCESS FULL |
---------------------------------

План также содержит:

  • частичную агрегацию (4),
  • координатора, получающего частичные контексты (2),
  • и итоговое объединение контекстов (1).


SELECT /*+ PARALLEL(2) */ average(n) FROM t;
AVERAGE(N)
----------
     500.5
0(0) & 216153(657)
216153(657) & 284347(343)
= 500500(1000) flags:0

В Oracle координатор не участвует в частичной агрегации. Поэтому объединяются только два контекста и по этой же причине мы видим только вывод функции ODCIAggregateMerge.

Документация
Самое время привести ссылки на документацию, в том числе и на агрегатные и оконные функции, уже включенным в СУБД. Там можно найти много интересного.

PostgreSQL:

  • Пользовательские агрегатные функции
  • CREATE AGGREGATE
  • Стандартные агрегатные и оконные функции


Oracle:

  • Пользовательские агрегатные функции
  • Интерфейс агрегатных функций
  • Стандартные агрегатные и аналитические функции


Пример про округление копеек
И обещанный пример из жизни. Эту функцию я придумал, когда приходилось писать отчеты для бухгалтерии, работающей по РСБУ (правилам российского бухучета).

Самая простая задача, в которой возникает необходимость округления — распределение общих расходов (скажем, 100 рублей) на отделы (скажем, 3 штуки) по какому-то принципу (скажем, поровну):

WITH depts(name) AS (
  VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
  SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round(amount,2) FROM report;

 dept | round
------+-------
 A    | 33.33
 B    | 33.33
 C    | 33.33
(3 rows)

Этот запрос показывает проблему: суммы надо округлять, но при этом теряется копейка. А РСБУ этого не прощает.

Задачу можно решать по-разному, но на мой вкус наиболее элегантный способ — оконная функция, которая работает в нарастающем режиме и берет всю борьбу с копейками на себя:

WITH depts(name) AS (
  VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
  SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round2(amount) OVER (ORDER BY dept) FROM report;

 dept | round2
------+--------
 A    |  33.33
 B    |  33.34
 C    |  33.33
(3 rows)

Состояние такой функции включает ошибку округления (r_error) и текущее округленное значение (amount). Функция обработки очередного значения увеличивает ошибку округления, и, если она уже превышает полкопейки, добавляет к округленной сумме копеечку:

state.r_error := state.r_error + val - round(val,2);
state.amount := round(val,2) + round(state.r_error,2);
state.r_error := state.r_error - round(state.r_error,2);

А функция, выдающая результат, просто возвращает уже готовый state.amount.

Полный код функции приводить не буду: используя уже приведенные примеры написать ее не представляет сложности.

Если вам встречались интересные примеры использования собственных агрегатных или оконных функций — поделитесь ими в комментариях.

© Habrahabr.ru