Автоматизация тестирования таблиц в Postgresql на SQL

Пример описываемой автоматизации

Привет.

Предположим, вам пришла задача на тестирование какой-либо функциональности, которая относится к бэкенду. Вы переходите к документу с требованиями и видите, помимо прочего, описание таблиц базы данных (БД). Примерно так это выглядит:

Таблица 1. Требования к таблице price в схеме public

Столбец

Ограничение

Тип

Not null

id

PK

int8

Да

price_value

UNIQUE price_unique

numeric (18, 6)

Да

model_id

UNIQUE price_unique, FOREIGN KEY price_models_fk со столбцом id таблицы models

int4

Да

comment

varchar (255)

Нет

Следовательно, один из тест-кейсов должен содержать проверки на соответствие метаданных этих таблиц ожидаемым значениям. К таким метаданным относятся состав и тип столбцов, ограничения для значений, наличие связей и т.д. В Postgresql есть неочевидная возможность проводить подобные проверки автоматически с помощью SQL-запроса.

2d0a2c90059b4658f8f00c0bac0822b4.png

К слову, эта статья будет интересна не только с точки зрения обзора SQL-запроса для автоматизации тестирования, но и с точки зрения того, что и как можно писать на SQL.

Возьмем для примера пару таблиц:

-- drop table public.models

create table public.models (
    id int8 generated by default as identity( increment by 1 minvalue 1 maxvalue 9223372036854775807 start 1 cache 1 no cycle) not null,
    "name" varchar(50) not null,
    manufacturer varchar(50) not null,
    constraint models_pk primary key (id),
    constraint models_unique unique (name, manufacturer)
);
-- drop table public.price

create table public.price (
    id int8 generated by default as identity( increment by 1 minvalue 1 maxvalue 9223372036854775807 start 1 cache 1 no cycle) not null,
    price_value numeric(18,6) not null,
    model_id int4 not null,
    comment varchar(255) null,
    constraint price_pk primary key (id),
    constraint price_models_fk foreign key (model_id) references public.models(id),
    constraint price_unique unique (price_value, model_id)
);

Сосредоточимся на price. Таблица models нужна исключительно для того, чтобы добавить в price ограничение (constraint) по зависимому ключу (foreign key). SQL-запрос для автоматической проверки метаданных этой таблицы будет выглядеть так:

with
    -- Настраиваем запрос: задаем название таблицы и название схемы
    params(table_name, table_schema) as (
        values (
            'price',
            'public'
        )
    ),
    -- Описываем ожидаемые столбцы таблицы
    required_columns(column_name, data_type, is_nullable, character_maximum_length) as (
        values            
            ('id',          'bigint',               'NO', 	null),
            ('price_value', 'numeric',              'NO', 	18.6),
            ('model_id',    'integer',  		    'NO',	null),
            ('comment',     'character varying',    'YES',  255)
    ),
    -- Описываем ожидаемые ограничения
    required_constraints(column_name, constraint_type) as (
        values            
            ('id',          'PRIMARY KEY'),
            ('id',    		'FOREIGN KEY'),
            ('model_id',    'FOREIGN KEY'),
            ('price_value', 'UNIQUE'),
            ('model_id',    'UNIQUE')
    ),
    -- Находим информацию о столбцах тестируемой таблицы и добавляем обработку null'ов
    columns_info as (
        select
            column_name, data_type, is_nullable,
            coalesce(numeric_precision, 0)          as numeric_precision,
            coalesce(numeric_scale, 0)              as numeric_scale,
            coalesce(character_maximum_length, 0)   as character_maximum_length
        from information_schema.columns
        where
            table_name       = (select table_name   from params)
            and table_schema = (select table_schema from params)
    ),
    -- Проверяем существование таблицы и подсчитываем количество столбцов в ней
    check_table_exist as (
        select
            case when count_all_fields < 1 then false else true end table_exists,
            case when count_all_fields < 1 then 1 else 0 end table_exists_error,
            count_all_fields
        from (
            select count (*) as count_all_fields
            from columns_info
        ) sq
    ),
    -- Сравниваем ожидаемый и текущий наборы атрибутов таблицы
    fields_comparison as (
        select t.*
        from columns_info t
        inner join required_columns r
            on  t.column_name   = r.column_name
            and t.data_type     = r.data_type
            and t.is_nullable   = r.is_nullable
            and (
                -- Сравниваем целую часть десятичных значений
                case
                    when t.data_type = 'numeric'
                    then t.numeric_precision = trunc(r.character_maximum_length::numeric)
                end
                and
                -- Сравниваем дробную часть десятичных значений
                case
                    when t.data_type = 'numeric'
                    then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))
                    * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
                end
                or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
           )
    ),
    -- Ищем лишние столбцы и считаем их количество
    check_unexpected_fields as (
        select
            count (column_name) as count_unexpected_fields,
            string_agg(column_name, ', ') as unexpected_fields
        from (
            select column_name
            from columns_info
            except
            select column_name
            from required_columns
        ) sq
    ),
    -- Ищем недостающие столбцы и считаем их количество
    check_missing_fields as (
        select
            count (column_name) as count_missing_fields,
            string_agg(column_name, ', ') as missing_fields
        from (
            select column_name
            from required_columns
            except
            select column_name
            from columns_info
        ) sq
    ),
    -- Ищем невалидные столбцы и считаем их количество
    check_invalid_fields as (
        select
            count (column_name) as count_invalid_fields,
            string_agg(column_name, ', ') as invalid_fields
        from (
            select column_name
            from required_columns
            except
            select column_name
            from fields_comparison
            except
            select string_to_table(missing_fields, ', ')
            from check_missing_fields
        ) sq
    ),
    -- Ищем все ограничения для таблицы
    constraints_query as(
        select
            t1.constraint_type,
            t2.column_name,
            t3.column_name as foreign_column
        from information_schema.table_constraints t1
        left join information_schema.constraint_column_usage t2
            on t1.constraint_name = t2.constraint_name
        left join information_schema.key_column_usage as t3
            on t1.constraint_name = t3.constraint_name
            and t1.table_schema = t3.table_schema
        where
            t1.table_name            = (select table_name   from params)
            and t1.constraint_schema = (select table_schema from params)
            and t2.column_name is not null
    ),
    -- Включаем значения зависимых ключей (foreign_column) в список ограничений (column_name)
    union_foreign_ref_columns as (
        select column_name, constraint_type
        from constraints_query
        union all
        select foreign_column as column_name, constraint_type
        from constraints_query
    ),
    -- Ищем лишние ограничения и считаем их количество
    check_unexpected_constraints as (
        select
            count (column_name) as count_unexpected_constraints,
            string_agg(column_name || ' (' || constraint_type || ')', ', ') as unexpected_constraints
        from (
            select *
            from union_foreign_ref_columns
            except
            select column_name, constraint_type
            from required_constraints
        ) sq
    ),
    -- Ищем недостающие ограничения и считаем их количество
    check_missing_constraints as (
        select
            count (column_name) as count_missing_constraints,
            string_agg(column_name || ' (' || constraint_type || ')', ', ') as missing_constraints
        from (
            select column_name, constraint_type
            from required_constraints
            except
            select *
            from union_foreign_ref_columns
        ) sq
    ),
    -- Собираем полученные данные
    checks as (
        select
            -- Выводим всю информацию об ошибках и суммируем их количество
            table_exists_error + count_unexpected_fields + count_missing_fields + count_invalid_fields +
            count_unexpected_constraints + count_missing_constraints as errors,
            *
        from        check_table_exist
        cross join  check_unexpected_fields
        cross join  check_missing_fields
        cross join  check_invalid_fields
        cross join  check_unexpected_constraints
        cross join  check_missing_constraints
)
select *
from checks

Важное примечание: чтобы использовать этот запрос для какой-либо вашей таблицы, нужно внести изменения только в CTE params (названия вашей таблицы и схемы, где она хранится),  required_columns (описание ожидаемых значений метаданных столбцов вашей таблицы) и required_constraints (описание ожидаемых ограничений для вашей таблицы). Остальной код не нужно менять.

img.2024-11-20 21.46.34.png

Возможность такой автоматической проверки обеспечивается наличием во всех БД Postgresql информационной схемы (information_schema). В Oracle, если не ошибаюсь, тоже такая есть, поэтому описываемый запрос может быть актуален и для других СУБД (полагаю, с корректировками).

Что показывает этот запрос?

Столбец

Расшифровка

errors

Сумма количества ошибок

table_exists

Существует ли таблица

table_exists_error

Счетчик для учета ошибки в случае отсутствия таблицы

count_all_fields

Счетчик всех столбцов таблицы

count_unexpected_fields

Счетчик лишних столбцов

unexpected_fields

Названия лишних столбцов через запятую

count_missing_fields

Счетчик недостающих столбцов

missing_fields

Названия недостающих столбцов через запятую

count_invalid_fields

Счетчик ожидаемых столбцов, в которых есть какие-либо ошибки

invalid_fields

Названия ожидаемых столбцов через запятую, в которых есть какие-либо ошибки

count_unexpected_constraints

Счетчик лишних ограничений

unexpected_constraints

Названия лишних ограничений с указанием типа ограничения в скобках через запятую

count_missing_constraints

Счетчик недостающих ограничений

missing_constraints

Названия недостающих ограничений с указанием типа ограничения в скобках через запятую

img.2024-11-27 19.40.40.png

Анализ запроса

В CTE params,  required_columns и required_constraints описываются входные данные. Это — ожидаемые датасеты значений. С params, думаю, все понятно — указываем название тестируемой таблицы и название схемы, где она хранится:

with
    -- Настраиваем запрос: задаем название таблицы и название схемы
    params(table_name, table_schema) as (
        values (
            'price',
            'smartphones'
        )
    )
select *
from params

Каждый CTE, о котором я пишу, буду оформлять с select’ом, который обращается к этому CTE. Весомой причины для этого нет — мне просто так удобнее:)
Этот код можно выполнить у себя локально. Он просто покажет значения внутри CTE:

img.2024-11-27 20.01.42.png

required_column рассмотрим подробнее:

with
    -- Описываем ожидаемые столбцы таблицы
    required_columns(column_name, data_type, is_nullable, character_maximum_length) as (
        values            
            ('id',          'bigint',               'NO',   null),
            ('price_value', 'numeric',              'NO',   18.6),
            ('model_id',    'integer',  		    'NO',   null),
            ('comment',     'character varying',    'YES',  255)
    )
select *
from required_columns

Здесь описываются значения в 4-х столбцах:

  • column_name: название столбца в тестируемой таблице;

  • data_type: тип столбца. В текущей версии запроса указывать нужно изначальное название типа (character varying), а не синоним (varchar). Эти названия можно просмотреть здесь:
    https://www.postgresql.org/docs/8.1/datatype.html

  • is_nullable: допускается ли в качестве значения для столбца null;

  • character_maximum_length: длина строки. Иначе говоря, максимально допустимое количество символов (цифр) значения. Если ограничения нет — указывается null.

В CTE required_constraints содержатся ожидаемые ограничения по 2-м столбцам:

  • column_name: название столбца, к которому применяется ограничение;

  • constraint_type: тип ограничения.

Типы ограничений (constraint_type) бывают следующие:

  • CHECK: значения в столбце должны удовлетворять какому-либо заданному условию;

  • NOT NULL: название говорящее — значение в столбце не должно быть NULL;

  • UNIQUE: в столбце не допускается наличие

  • PRIMARY KEY: комбинация ограничений UNIQUE и NOT NULL;

  • FOREIGN KEY: значение в столбце у отдельно взятой записи должно быть таким же, как значение в столбце другой таблицы.

Ограничение NOT NULL проверяется отдельно в рамках изучения соответствия фактических метаданных значениям в CTE required_columns. Как для этого там есть столбец is_nullable.

Теперь рассмотрим код, который описан после required_constraints. До этого CTE включительно, как упоминал выше, указываются входные данные. В CTE columns_info происходит получение метаданных о столбцах проверяемой таблицы с помощью запроса к columns:

select *
from information_schema.columns

Ограничим выборку только по тем метаданным, которые нужны для проверок. null’ы преобразуем в 0, чтобы значения можно было сравнивать. Также добавим фильтрацию (в where) по тестируемой таблице и по схеме, где она хранится:

select
    column_name, data_type, is_nullable,
    coalesce(numeric_precision, 0)          as numeric_precision,
    coalesce(numeric_scale, 0)              as numeric_scale,
    coalesce(character_maximum_length, 0)   as character_maximum_length
from information_schema.columns
where
    table_name       = 'price'
    and table_schema = 'public'

В select’е запроса:

Название столбца

Описание

column_name

Название столбца в тестируемой таблице

data_type

Тип столбца

is_nullable

Обязателен ли столбец для заполнения

numeric_precision

Используется только для столбцов числовых типов (numeric, integer и т.д.). Ограничение по количеству цифр целой части числа

numeric_scale

Используется только для для столбцов числовых типов. Ограничение по количеству цифр дробной части числа

character_maximum_length

ограничение по количеству символов для столбцов типа varchar (character varying)

Если у столбца тестируемой таблицы нет ограничений по количеству символов или цифр (например, при наличии типа boolean или jsonb), то в соответствующем столбце метаданных будет null.

Перенесем запрос в CTE и обратимся к записям, которые по нему возвращаются, через select:

with columns_info as (
    select
        column_name, data_type, is_nullable,
        coalesce(numeric_precision, 0)          as numeric_precision,
        coalesce(numeric_scale, 0)              as numeric_scale,
        coalesce(character_maximum_length, 0)   as character_maximum_length
    from information_schema.columns
    where
        table_name       = 'price'
        and table_schema = 'public'
)
select *
from columns_info

Добавим код для проверки существования таблицы. Сделаем это через подсчет количества значений в запросе, которые мы описали ранее (меняем select * на select count (*) ):

with columns_info as (
    select
        column_name, data_type, is_nullable,
        coalesce(numeric_precision, 0)          as numeric_precision,
        coalesce(numeric_scale, 0)              as numeric_scale,
        coalesce(character_maximum_length, 0)   as character_maximum_length
    from information_schema.columns
    where
        table_name       = 'price'
        and table_schema = 'public'
)
select count (*)
from columns_info

Логика проста: нет таблицы — нет и записей, которые возвращаются по значениям, указанным в фильтрах. Добавим сюда case для вывода boolean’а, который будет отображать статус проверки. А также счетчик, который будет показывать 1, если boolean — true и 0, если boolean — false. Он пригодится для подсчета суммы ошибок. Еще выведем фактическое количество столбцов в тестируемой таблице — это просто полезная информация. Все эти данные возьмем из select’а, описанного выше под CTE. Выполнять его будем через подзапрос:

with columns_info as (
    select
        column_name, data_type, is_nullable,
        coalesce(numeric_precision, 0)          as numeric_precision,
        coalesce(numeric_scale, 0)              as numeric_scale,
        coalesce(character_maximum_length, 0)   as character_maximum_length
    from information_schema.columns
    where
        table_name       = 'price'
        and table_schema = 'public'
)
select
    case when count_all_fields < 1 then false   else true   end table_exists,
    case when count_all_fields < 1 then 1       else 0      end table_exists_error,
    count_all_fields
from (
    select count (*) as count_all_fields
    from columns_info
) sq

В следующем блоке кода будем проводить сравнение — ожидаемых значений, описанных в CTE required_columns, и фактических значений, описанных в CTE columns_info. Уберем код для проверки существования таблицы в отдельный CTE check_table_exists. Также добавим 3 CTE с ожидаемыми данными, чтобы можно было проводить сравнение. Название таблицы (table_name) и название схемы (schema_name) для columns_info теперь будем получать через подзапросы. Пишем код дальше:

with
    -- Настраиваем запрос: задаем название таблицы и название схемы
    params(table_name, table_schema) as (
        values (
            'price',
            'public'
        )
    ),
    -- Описываем ожидаемые столбцы таблицы
    required_columns(column_name, data_type, is_nullable, character_maximum_length) as (
        values            
            ('id',          'bigint',                       'NO', 	null),
            ('price_value', 'numeric',            			'NO', 	18.6),
            ('model_id',    'integer',  					'NO',	null),
            ('comment',     'character varying',            'YES',	255)
    ),
    -- Описываем ожидаемые ограничения
    required_constraints(column_name, constraint_type) as (
        values            
            ('id',          'PRIMARY KEY'),
            ('id',    		'FOREIGN KEY'),
            ('model_id',    'FOREIGN KEY'),
            ('price_value', 'UNIQUE'),
            ('model_id',    'UNIQUE')
    ),
    -- Находим информацию о столбцах тестируемой таблицы и добавляем обработку null'ов
    columns_info as (
        select
            column_name, data_type, is_nullable,
            coalesce(numeric_precision, 0)          as numeric_precision,
            coalesce(numeric_scale, 0)              as numeric_scale,
            coalesce(character_maximum_length, 0)   as character_maximum_length
        from information_schema.columns
        where
            table_name       = (select table_name   from params)
            and table_schema = (select table_schema from params)
    ),
    -- Проверяем существование таблицы и подсчитываем количество столбцов в ней
    check_table_exist as (
        select
            case when count_all_fields < 1 then false else true end table_exists,
            case when count_all_fields < 1 then 1 else 0 end table_exists_error,
            count_all_fields
        from (
            select count (*) as count_all_fields
            from columns_info
        ) sq
    )
select *
from columns_info

Сравнить датасеты в SQL можно разными способами. Мне известны три — через:

В данном случае я воспользуюсь join’ом. Соединение будем делать по столбцам, описанным в CTE columns_info. Начнем с первых 3-х. Таблицам добавим алиасы t и r. Возвращать записи будем только из columns_info. Иначе говоря, только те, которые фактически есть в БД. Код в with идентичен блоку, описанному выше, поэтому его далее буду сокращать для удобства:

with (...)
select t.*
from columns_info t
inner join required_columns r
    on  t.column_name               = r.column_name
    and t.data_type                 = r.data_type
    and t.is_nullable               = r.is_nullable

Остаются еще 3 столбца. С ними сложнее. В CTE required_columns есть только 4 столбца, тогда в columns_info возвращается 6. Это связано с тем, что ожидаемое ограничение по количеству символов и цифр мы задаем в 1-м столбце для всех типов. При этом фактически в метаданных есть разделение такого ограничения на 3 столбца.

Начнем с простого: сравнение для столбцов varchar. null’ы в датасете с ожидаемыми значениями заменяем на 0. На всякий случай повторю, что это нужно для сравнения: null’ы через равенство проверить не получится:

with (...)
select t.*
from columns_info t
inner join required_columns r
    on  t.column_name               = r.column_name
    and t.data_type                 = r.data_type
    and t.is_nullable               = r.is_nullable
    and t.character_maximum_length  = coalesce(r.character_maximum_length, 0)

Теперь нужно сравнить целую и дробную части числовых значений ограничений. Но только для типа numeric и decimal, т.к. остальные числовые типы имеют фиксированный диапазон значений. Начнем с целой части чисел. Здесь нужно использовать case для отбора значений с типами numeric и decimal. С учетом того, что сравнивается целая часть, у числа из столбца character_maximum_length (из CTE required_columns) нужно отсекать дробную часть с помощью функции trunc (). Проверяемый столбец может быть единовременно только одного типа, поэтому в ранее написанную обработку для ограничений varchar новый код добавляем через or. Всю обработку при этом оборачиваем в скобки:

with (...)
select t.*
from columns_info t
inner join required_columns r
    on  t.column_name               = r.column_name
    and t.data_type                 = r.data_type
    and t.is_nullable               = r.is_nullable
    and t.character_maximum_length  = coalesce(r.character_maximum_length::numeric, 0)
    and (
    	case
            when t.data_type in ('numeric', 'decimal')
            then t.numeric_precision = trunc(r.character_maximum_length::numeric)
        end
        or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
    )

И теперь самое сложное: сравнение дробной части. Вычислить её просто — нужно лишь из числа вычесть его целую часть. Пример для наглядности:

18,6 — 18 = 0,6

Но описать кодом это будет сложнее. В таблице price, которая используется в этой статье как объект тестирования, есть столбец price_value. Он имеет тип numeric и ограничение 18.6, то есть 18 цифр в целой части и 6 в дробной. То, что нужно для примера. Попробуем в select добавить разность, чтобы посмотреть, что получится:

img.2024-11-21 22.11.48.png

Это ожидаемое значение. А что в фактическом?

img.2024-11-21 22.10.37.png

То есть чтобы корректно провести сравнение, нужно дробную часть ожидаемого значения перевести в целую. В данном случае достаточно выполнить умножение на 10. Но что если бы ограничение было не 18.6, а, скажем, 18.15? Тогда умножение на 10 нужно проводить во 2-й степени. И значение этой степени — переменное, равное количеству знаков после запятой. Иначе говоря, для столбца price_value, у которого есть ограничение 18.6, степень будет равна 1 (после запятой только 1 цифра — 6). А если ограничение было 18.15, то степень должна была быть 2.

Итак, разность числа и его целой части, которые берутся из character_maximum_length (CTE required_columns), показанная в виде кода на скриншоте выше, выглядит так:

r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)

Теперь её нужно умножить на 10 в степени:

(r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)) * power(10, 1)

Но единица (1) в power () покрывает не все случаи, поэтому добавим код для динамического расчета:

(r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)) * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))

Прокомментирую. r.character_maximum_length: text — выполняем приведение типов, т.к. этот столбец имеет тип numeric, но его значения будут использоваться в функциях, которые работают только с символьными типами. Функция split_part () используется для отбора цифр после запятой (но в качестве разделителя указывается именно точка, т.к. дробное значение в метаданных хранится именно в таком виде). А length подсчитывает количество этих цифр.

Добавляем этот код в выражения для соединения:

with(...)
select t.*
from columns_info t
inner join required_columns r
    on  t.column_name   = r.column_name
    and t.data_type     = r.data_type
    and t.is_nullable   = r.is_nullable
    and (
        -- Сравниваем целую часть десятичных значений
        case
            when t.data_type in ('numeric', 'decimal')
            then t.numeric_precision = trunc(r.character_maximum_length::numeric)
        end
        and
        -- Сравниваем дробную часть десятичных значений
        case
            when t.data_type in ('numeric', 'decimal')
            then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))
            * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
        end
        or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
   )

Результат сравнения покажет, какие столбцы тестируемой таблицы имеют ожидаемые метаданные. В дальнейшем отобранные таким образом записи будем сравнивать с датасетом из CTE required_columns. Так мы найдем столбцы, у которых какие-либо метаданные не соответствуют ожидаемым. Добавим этот запрос в CTE с названием fields_comparison:

-- Сравниваем ожидаемый и текущий наборы атрибутов таблицы
with 
	(...),
    fields_comparison as (
        select t.*
        from columns_info t
        inner join required_columns r
            on  t.column_name   = r.column_name
            and t.data_type     = r.data_type
            and t.is_nullable   = r.is_nullable
            and (
                -- Сравниваем целую часть десятичных значений
                case
                    when t.data_type = 'numeric'
                    then t.numeric_precision = trunc(r.character_maximum_length::numeric)
                end
                and
                -- Сравниваем дробную часть десятичных значений
                case
                    when t.data_type = 'numeric'
                    then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))
                    * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
                end
                or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
           )
	)
select *
from fields_comparison

Едем дальше. В следующем CTE,  check_unexpected_fields, выполняется поиск столбцов таблицы, которые есть в фактическом датасете, но отсутствуют в ожидаемом (их условно можно назвать «лишними»). Это реализовано через сравнение записей в CTE columns_info и CTE required_columns. Запрос довольно простой:

select column_name
from columns_info
except
select column_name
from required_columns

По такому запросу каждое название «лишнего» столбца будет выведено в отдельной строке. Однако с учетом того, что это не единственный CTE, из которого будет формироваться результирующая выборка, вернувшиеся значения нужно собрать в одной ячейке. Я это делаю через перечисление в строке, с запятой в качестве разделителя, с помощью функции string_agg (). И дополнительно подсчитываю количество значений, вернувшихся по запросу:

with
	(...),
    -- Ищем лишние столбцы и считаем их количество
    check_unexpected_fields as (
        select
            count (column_name) as count_unexpected_fields,
            string_agg(column_name, ', ') as unexpected_fields
        from (
            select column_name
            from columns_info
            except
            select column_name
            from required_columns
        ) sq
    )
select *
from check_unexpected_fields

Далее в CTE check_missing_fields содержится почти такой же запрос. Разница лишь в том, что запросы в except меняются друг с другом местами. Этот CTE показывает недостающие столбцы тестируемой таблицы. Иначе говоря, те, что есть в ожидаемом датасете, но отсутствуют в фактическом:

with 
	(...),
    -- Ищем недостающие столбцы и считаем их количество
    check_missing_fields as (
        select
            count (column_name) as count_missing_fields,
            string_agg(column_name, ', ') as missing_fields
        from (
            select column_name
            from required_columns
            except
            select column_name
            from columns_info
        ) sq
    )
select *
from check_missing_fields

Итак, на текущей стадии нам известны столбцы:

  • Ожидаемые (CTE required_columns);

  • Полностью совпавшие с ожидаемыми (CTE fields_comparison);

  • Лишние (CTE check_unexpected_fields);

  • Недостающие (CTE check_missing_fields).

С помощью этих данных можно найти еще одно множество. В нем будут содержаться столбцы, которые совпадают с ожидаемым датасетом лишь частично. Соответственно, CTE с таким запросом будет возвращать невалидные значения. Следовательно, для него выглядит логичным название check_invalid_fields.

Запрос для поиска невалидных столбцов также содержит в себе оператор except, но в данном случае он используется дважды. Из множества записей ожидаемых столбцов (CTE required_columns) вычитается множество полностью совпавших столбцов (CTE fields_comparison) и недостающих столбцов (CTE check_missing_fields). Для CTE check_missing_fields в рамках такого запроса нужно «развернуть» строку со значениями обратно в набор записей. Это можно сделать с помощью функции string_to_table (). Получаем такой код:

with 
	(...),
    -- Ищем невалидные столбцы и считаем их количество
    check_invalid_fields as (
        select
            count (column_name) as count_invalid_fields,
            string_agg(column_name, ', ') as invalid_fields
        from (
            select column_name
            from required_columns
            except
            select column_name
            from fields_comparison
            except
            select string_to_table(missing_fields, ', ')
            from check_missing_fields
        ) sq
select *
from check_invalid_fields

Вся необходимая информация для тестирования столбцов — собрана. Перейдем к ограничениям (constraints). К слову, к ним в контексте метаданных, в частности, относятся зависимые ключи (foreign keys). Для получения необходимых сведений будем запрашивать из информационной схемы столбцы таблиц:

  • constraint_type из table_constraints;

  • column_name из constraint_column_usage;

  • column_name из key_column_usage;

Если какое-либо ограничение не удовлетворяется при изменении данных таблицы, то по запросу возвращается ошибка. В её формулировке будет указано название нарушаемого в таком случае ограничения (violates constraint):

img.2024-11-27 18.34.53.png

Ограничения помогают поддерживать целостность данных. По этой причине важно проверять их наличие и логику. Информацию об ограничениях тестируемой таблицы будем получать по отдельному запросу. Его опишем в CTE constraints_query:

with
	(...),
    -- Ищем все ограничения для таблицы
    constraints_query as(
        select
            t1.constraint_type,
            t2.column_name,
            t3.column_name as foreign_column
        from information_schema.table_constraints t1
        left join information_schema.constraint_column_usage t2
            on t1.constraint_name = t2.constraint_name
        left join information_schema.key_column_usage as t3
            on t1.constraint_name = t3.constraint_name
            and t1.table_schema = t3.table_schema
        where
            t1.table_name            = (select table_name   from params)
            and t1.constraint_schema = (select table_schema from params)
            and t2.column_name is not null
    )
select *
from constraints_query

По этому запросу будут возвращаться 3 столбца:

img.2024-11-27 19.06.34.png

При этом в ожидаемом датасете, в CTE required_constraints, у нас всего 2 столбца:

with
    (...)
    -- Описываем ожидаемые ограничения
    required_constraints(column_name, constraint_type) as (
        values            
            ('id',          'PRIMARY KEY'),
            ('id',    		'FOREIGN KEY'),
            ('model_id',    'FOREIGN KEY'),
            ('price_value', 'UNIQUE'),
            ('model_id',    'UNIQUE')
    )
select *
from required_constraints

Поэтому в следующем CTE (назовем его union_foreign_ref_columns) объединим значения из столбцов column_name и foreign_column в 1 столбец:

with
	(...),
    -- Включаем значения зависимых ключей (foreign_column) в список ограничений (column_name)
    union_foreign_ref_columns as (
        select column_name, constraint_type
        from constraints_query
        union all
        select foreign_column as column_name, constraint_type
        from constraints_query
    )
select *
from union_foreign_ref_columns

Фрагмент as column_name необязателен, но я его оставил его для улучшения читаемости кода.

Фактический набор ограничений получили. Теперь можно проводить сравнение. Я решил не делать отдельный CTE под поиск невалидных ограничений, т.к. здесь рассматриваются всего 2 столбца (column_name и constraints_type). В конечном запросе рассматриваются только «лишние» и недостающие ограничения в 2-х соответствующих CTE. И это почти конец запроса:

with
	(...),
    -- Ищем лишние ограничения и считаем их количество
    check_unexpected_constraints as (
        select
            count (column_name) as count_unexpected_constraints,
            string_agg(column_name || ' (' || constraint_type || ')', ', ') as unexpected_constraints
        from (
            select *
            from union_foreign_ref_columns
            except
            select column_name, constraint_type
            from required_constraints
        ) sq
    )
select *
from check_unexpected_constraints

и

with
	(...),
    -- Ищем недостающие ограничения и считаем их количество
    check_missing_constraints as (
        select
            count (column_name) as count_missing_constraints,
            string_agg(column_name || ' (' || constraint_type || ')', ', ') as missing_constraints
        from (
            select column_name, constraint_type
            from required_constraints
            except
            select *
            from union_foreign_ref_columns
        ) sq
    )
select *
from check_missing_constraints

Логика сравнения здесь такая же, как при сравнении столбцов. Отличие между этими 2-мя CTE — только в порядке расположения относительно except: в check_unexpected_constraints запрос к union_foreign_ref_columns идет до except, а в check_missing_constraints — после. Из интересного здесь разве что данная конструкция:

string_agg(column_name || ' (' || constraint_type || ')', ', ')

string_agg () здесь по-прежнему нужен для вывода нескольких записей в 1-й строке. Но здесь 1-й аргумент функции логически немного отличается от этого же аргумента функции в коде сравнения столбцов. Это нужно исключительно для того, чтобы в 1-й ячейке уместить не только несколько строк, но и информацию сразу по 2-м столбцам. В итоговой выборке значение в этой ячейке будет выглядеть так:

img.2024-11-27 19.27.36.png

По этому скриншоту проще понять, почему конструкция с string_agg () именно такая, какая она есть. В нее включается информацию из 2-х столбцов, при этом значения 2-го столбца берется в скобки с пробелом перед 1-м значением.

Ну и в последнем CTE собираем информацию, полученную в других CTE, и суммируем количество найденных ошибок, которые мы получали с помощью count ():

with
	(...),
    -- Собираем полученные данные
    checks as (
        select
            -- Выводим всю информацию об ошибках и суммируем их количество
            table_exists_error + count_unexpected_fields + count_missing_fields + count_invalid_fields +
            count_unexpected_constraints + count_missing_constraints as errors,
            *
        from        check_table_exist
        cross join  check_unexpected_fields
        cross join  check_missing_fields
        cross join  check_invalid_fields
        cross join  check_unexpected_constraints
        cross join  check_missing_constraints
)
select *
from checks

Значения, выводимые этим запросом в строках, видны в самих строках, в тултипе (если в строке большое значение), и их также можно скопировать и вставить блокнот:

acd503fc20796a453df631ed19dda565.gif

Почему этот запрос полезен?

  • По моему опыту, почему-то новые таблицы БД, которые создают разработчики, часто не соответствуют тому, что описано в требованиях. Исходя из этого, проще один раз описать ожидаемые значения в запросе и запускать его после правок. Это сэкономит время при сравнении и исключит человеческий фактор. Особенно если итераций правок будет несколько;

  • Польза от таких проверочных запросов тем выше, чем больше таблиц добавляется в рамках доработки;

  • Пригодится для быстрого тестирования метаданных таблиц после миграции баз данных на другой стенд.

© Habrahabr.ru