Автоматизация тестирования таблиц в Postgresql на SQL
Пример описываемой автоматизации
Привет.
Предположим, вам пришла задача на тестирование какой-либо функциональности, которая относится к бэкенду. Вы переходите к документу с требованиями и видите, помимо прочего, описание таблиц базы данных (БД). Примерно так это выглядит:
Таблица 1. Требования к таблице price в схеме public
Столбец | Ограничение | Тип | Not null |
---|---|---|---|
id | PK | int8 | Да |
price_value | UNIQUE price_unique | numeric (18, 6) | Да |
model_id | UNIQUE price_unique, FOREIGN KEY price_models_fk со столбцом id таблицы models | int4 | Да |
comment | | varchar (255) | Нет |
Следовательно, один из тест-кейсов должен содержать проверки на соответствие метаданных этих таблиц ожидаемым значениям. К таким метаданным относятся состав и тип столбцов, ограничения для значений, наличие связей и т.д. В Postgresql есть неочевидная возможность проводить подобные проверки автоматически с помощью SQL-запроса.
К слову, эта статья будет интересна не только с точки зрения обзора SQL-запроса для автоматизации тестирования, но и с точки зрения того, что и как можно писать на SQL.
Возьмем для примера пару таблиц:
-- drop table public.models
create table public.models (
id int8 generated by default as identity( increment by 1 minvalue 1 maxvalue 9223372036854775807 start 1 cache 1 no cycle) not null,
"name" varchar(50) not null,
manufacturer varchar(50) not null,
constraint models_pk primary key (id),
constraint models_unique unique (name, manufacturer)
);
-- drop table public.price
create table public.price (
id int8 generated by default as identity( increment by 1 minvalue 1 maxvalue 9223372036854775807 start 1 cache 1 no cycle) not null,
price_value numeric(18,6) not null,
model_id int4 not null,
comment varchar(255) null,
constraint price_pk primary key (id),
constraint price_models_fk foreign key (model_id) references public.models(id),
constraint price_unique unique (price_value, model_id)
);
Сосредоточимся на price. Таблица models нужна исключительно для того, чтобы добавить в price ограничение (constraint) по зависимому ключу (foreign key). SQL-запрос для автоматической проверки метаданных этой таблицы будет выглядеть так:
with
-- Настраиваем запрос: задаем название таблицы и название схемы
params(table_name, table_schema) as (
values (
'price',
'public'
)
),
-- Описываем ожидаемые столбцы таблицы
required_columns(column_name, data_type, is_nullable, character_maximum_length) as (
values
('id', 'bigint', 'NO', null),
('price_value', 'numeric', 'NO', 18.6),
('model_id', 'integer', 'NO', null),
('comment', 'character varying', 'YES', 255)
),
-- Описываем ожидаемые ограничения
required_constraints(column_name, constraint_type) as (
values
('id', 'PRIMARY KEY'),
('id', 'FOREIGN KEY'),
('model_id', 'FOREIGN KEY'),
('price_value', 'UNIQUE'),
('model_id', 'UNIQUE')
),
-- Находим информацию о столбцах тестируемой таблицы и добавляем обработку null'ов
columns_info as (
select
column_name, data_type, is_nullable,
coalesce(numeric_precision, 0) as numeric_precision,
coalesce(numeric_scale, 0) as numeric_scale,
coalesce(character_maximum_length, 0) as character_maximum_length
from information_schema.columns
where
table_name = (select table_name from params)
and table_schema = (select table_schema from params)
),
-- Проверяем существование таблицы и подсчитываем количество столбцов в ней
check_table_exist as (
select
case when count_all_fields < 1 then false else true end table_exists,
case when count_all_fields < 1 then 1 else 0 end table_exists_error,
count_all_fields
from (
select count (*) as count_all_fields
from columns_info
) sq
),
-- Сравниваем ожидаемый и текущий наборы атрибутов таблицы
fields_comparison as (
select t.*
from columns_info t
inner join required_columns r
on t.column_name = r.column_name
and t.data_type = r.data_type
and t.is_nullable = r.is_nullable
and (
-- Сравниваем целую часть десятичных значений
case
when t.data_type = 'numeric'
then t.numeric_precision = trunc(r.character_maximum_length::numeric)
end
and
-- Сравниваем дробную часть десятичных значений
case
when t.data_type = 'numeric'
then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))
* power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
end
or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
)
),
-- Ищем лишние столбцы и считаем их количество
check_unexpected_fields as (
select
count (column_name) as count_unexpected_fields,
string_agg(column_name, ', ') as unexpected_fields
from (
select column_name
from columns_info
except
select column_name
from required_columns
) sq
),
-- Ищем недостающие столбцы и считаем их количество
check_missing_fields as (
select
count (column_name) as count_missing_fields,
string_agg(column_name, ', ') as missing_fields
from (
select column_name
from required_columns
except
select column_name
from columns_info
) sq
),
-- Ищем невалидные столбцы и считаем их количество
check_invalid_fields as (
select
count (column_name) as count_invalid_fields,
string_agg(column_name, ', ') as invalid_fields
from (
select column_name
from required_columns
except
select column_name
from fields_comparison
except
select string_to_table(missing_fields, ', ')
from check_missing_fields
) sq
),
-- Ищем все ограничения для таблицы
constraints_query as(
select
t1.constraint_type,
t2.column_name,
t3.column_name as foreign_column
from information_schema.table_constraints t1
left join information_schema.constraint_column_usage t2
on t1.constraint_name = t2.constraint_name
left join information_schema.key_column_usage as t3
on t1.constraint_name = t3.constraint_name
and t1.table_schema = t3.table_schema
where
t1.table_name = (select table_name from params)
and t1.constraint_schema = (select table_schema from params)
and t2.column_name is not null
),
-- Включаем значения зависимых ключей (foreign_column) в список ограничений (column_name)
union_foreign_ref_columns as (
select column_name, constraint_type
from constraints_query
union all
select foreign_column as column_name, constraint_type
from constraints_query
),
-- Ищем лишние ограничения и считаем их количество
check_unexpected_constraints as (
select
count (column_name) as count_unexpected_constraints,
string_agg(column_name || ' (' || constraint_type || ')', ', ') as unexpected_constraints
from (
select *
from union_foreign_ref_columns
except
select column_name, constraint_type
from required_constraints
) sq
),
-- Ищем недостающие ограничения и считаем их количество
check_missing_constraints as (
select
count (column_name) as count_missing_constraints,
string_agg(column_name || ' (' || constraint_type || ')', ', ') as missing_constraints
from (
select column_name, constraint_type
from required_constraints
except
select *
from union_foreign_ref_columns
) sq
),
-- Собираем полученные данные
checks as (
select
-- Выводим всю информацию об ошибках и суммируем их количество
table_exists_error + count_unexpected_fields + count_missing_fields + count_invalid_fields +
count_unexpected_constraints + count_missing_constraints as errors,
*
from check_table_exist
cross join check_unexpected_fields
cross join check_missing_fields
cross join check_invalid_fields
cross join check_unexpected_constraints
cross join check_missing_constraints
)
select *
from checks
Важное примечание: чтобы использовать этот запрос для какой-либо вашей таблицы, нужно внести изменения только в CTE params (названия вашей таблицы и схемы, где она хранится), required_columns (описание ожидаемых значений метаданных столбцов вашей таблицы) и required_constraints (описание ожидаемых ограничений для вашей таблицы). Остальной код не нужно менять.
Возможность такой автоматической проверки обеспечивается наличием во всех БД Postgresql информационной схемы (information_schema). В Oracle, если не ошибаюсь, тоже такая есть, поэтому описываемый запрос может быть актуален и для других СУБД (полагаю, с корректировками).
Что показывает этот запрос?
Столбец | Расшифровка |
---|---|
errors | Сумма количества ошибок |
table_exists | Существует ли таблица |
table_exists_error | Счетчик для учета ошибки в случае отсутствия таблицы |
count_all_fields | Счетчик всех столбцов таблицы |
count_unexpected_fields | Счетчик лишних столбцов |
unexpected_fields | Названия лишних столбцов через запятую |
count_missing_fields | Счетчик недостающих столбцов |
missing_fields | Названия недостающих столбцов через запятую |
count_invalid_fields | Счетчик ожидаемых столбцов, в которых есть какие-либо ошибки |
invalid_fields | Названия ожидаемых столбцов через запятую, в которых есть какие-либо ошибки |
count_unexpected_constraints | Счетчик лишних ограничений |
unexpected_constraints | Названия лишних ограничений с указанием типа ограничения в скобках через запятую |
count_missing_constraints | Счетчик недостающих ограничений |
missing_constraints | Названия недостающих ограничений с указанием типа ограничения в скобках через запятую |
Анализ запроса
В CTE params, required_columns и required_constraints описываются входные данные. Это — ожидаемые датасеты значений. С params, думаю, все понятно — указываем название тестируемой таблицы и название схемы, где она хранится:
with
-- Настраиваем запрос: задаем название таблицы и название схемы
params(table_name, table_schema) as (
values (
'price',
'smartphones'
)
)
select *
from params
Каждый CTE, о котором я пишу, буду оформлять с select’ом, который обращается к этому CTE. Весомой причины для этого нет — мне просто так удобнее:)
Этот код можно выполнить у себя локально. Он просто покажет значения внутри CTE:
required_column рассмотрим подробнее:
with
-- Описываем ожидаемые столбцы таблицы
required_columns(column_name, data_type, is_nullable, character_maximum_length) as (
values
('id', 'bigint', 'NO', null),
('price_value', 'numeric', 'NO', 18.6),
('model_id', 'integer', 'NO', null),
('comment', 'character varying', 'YES', 255)
)
select *
from required_columns
Здесь описываются значения в 4-х столбцах:
column_name: название столбца в тестируемой таблице;
data_type: тип столбца. В текущей версии запроса указывать нужно изначальное название типа (character varying), а не синоним (varchar). Эти названия можно просмотреть здесь:
https://www.postgresql.org/docs/8.1/datatype.htmlis_nullable: допускается ли в качестве значения для столбца null;
character_maximum_length: длина строки. Иначе говоря, максимально допустимое количество символов (цифр) значения. Если ограничения нет — указывается null.
В CTE required_constraints содержатся ожидаемые ограничения по 2-м столбцам:
column_name: название столбца, к которому применяется ограничение;
constraint_type: тип ограничения.
Типы ограничений (constraint_type) бывают следующие:
CHECK: значения в столбце должны удовлетворять какому-либо заданному условию;
NOT NULL: название говорящее — значение в столбце не должно быть NULL;
UNIQUE: в столбце не допускается наличие
PRIMARY KEY: комбинация ограничений UNIQUE и NOT NULL;
FOREIGN KEY: значение в столбце у отдельно взятой записи должно быть таким же, как значение в столбце другой таблицы.
Ограничение NOT NULL проверяется отдельно в рамках изучения соответствия фактических метаданных значениям в CTE required_columns. Как для этого там есть столбец is_nullable.
Теперь рассмотрим код, который описан после required_constraints. До этого CTE включительно, как упоминал выше, указываются входные данные. В CTE columns_info происходит получение метаданных о столбцах проверяемой таблицы с помощью запроса к columns:
select *
from information_schema.columns
Ограничим выборку только по тем метаданным, которые нужны для проверок. null’ы преобразуем в 0, чтобы значения можно было сравнивать. Также добавим фильтрацию (в where) по тестируемой таблице и по схеме, где она хранится:
select
column_name, data_type, is_nullable,
coalesce(numeric_precision, 0) as numeric_precision,
coalesce(numeric_scale, 0) as numeric_scale,
coalesce(character_maximum_length, 0) as character_maximum_length
from information_schema.columns
where
table_name = 'price'
and table_schema = 'public'
В select’е запроса:
Название столбца | Описание |
---|---|
column_name | Название столбца в тестируемой таблице |
data_type | Тип столбца |
is_nullable | Обязателен ли столбец для заполнения |
numeric_precision | Используется только для столбцов числовых типов (numeric, integer и т.д.). Ограничение по количеству цифр целой части числа |
numeric_scale | Используется только для для столбцов числовых типов. Ограничение по количеству цифр дробной части числа |
character_maximum_length | ограничение по количеству символов для столбцов типа varchar (character varying) |
Если у столбца тестируемой таблицы нет ограничений по количеству символов или цифр (например, при наличии типа boolean или jsonb), то в соответствующем столбце метаданных будет null.
Перенесем запрос в CTE и обратимся к записям, которые по нему возвращаются, через select:
with columns_info as (
select
column_name, data_type, is_nullable,
coalesce(numeric_precision, 0) as numeric_precision,
coalesce(numeric_scale, 0) as numeric_scale,
coalesce(character_maximum_length, 0) as character_maximum_length
from information_schema.columns
where
table_name = 'price'
and table_schema = 'public'
)
select *
from columns_info
Добавим код для проверки существования таблицы. Сделаем это через подсчет количества значений в запросе, которые мы описали ранее (меняем select *
на select count (*)
):
with columns_info as (
select
column_name, data_type, is_nullable,
coalesce(numeric_precision, 0) as numeric_precision,
coalesce(numeric_scale, 0) as numeric_scale,
coalesce(character_maximum_length, 0) as character_maximum_length
from information_schema.columns
where
table_name = 'price'
and table_schema = 'public'
)
select count (*)
from columns_info
Логика проста: нет таблицы — нет и записей, которые возвращаются по значениям, указанным в фильтрах. Добавим сюда case для вывода boolean’а, который будет отображать статус проверки. А также счетчик, который будет показывать 1, если boolean — true и 0, если boolean — false. Он пригодится для подсчета суммы ошибок. Еще выведем фактическое количество столбцов в тестируемой таблице — это просто полезная информация. Все эти данные возьмем из select’а, описанного выше под CTE. Выполнять его будем через подзапрос:
with columns_info as (
select
column_name, data_type, is_nullable,
coalesce(numeric_precision, 0) as numeric_precision,
coalesce(numeric_scale, 0) as numeric_scale,
coalesce(character_maximum_length, 0) as character_maximum_length
from information_schema.columns
where
table_name = 'price'
and table_schema = 'public'
)
select
case when count_all_fields < 1 then false else true end table_exists,
case when count_all_fields < 1 then 1 else 0 end table_exists_error,
count_all_fields
from (
select count (*) as count_all_fields
from columns_info
) sq
В следующем блоке кода будем проводить сравнение — ожидаемых значений, описанных в CTE required_columns, и фактических значений, описанных в CTE columns_info. Уберем код для проверки существования таблицы в отдельный CTE check_table_exists. Также добавим 3 CTE с ожидаемыми данными, чтобы можно было проводить сравнение. Название таблицы (table_name) и название схемы (schema_name) для columns_info теперь будем получать через подзапросы. Пишем код дальше:
with
-- Настраиваем запрос: задаем название таблицы и название схемы
params(table_name, table_schema) as (
values (
'price',
'public'
)
),
-- Описываем ожидаемые столбцы таблицы
required_columns(column_name, data_type, is_nullable, character_maximum_length) as (
values
('id', 'bigint', 'NO', null),
('price_value', 'numeric', 'NO', 18.6),
('model_id', 'integer', 'NO', null),
('comment', 'character varying', 'YES', 255)
),
-- Описываем ожидаемые ограничения
required_constraints(column_name, constraint_type) as (
values
('id', 'PRIMARY KEY'),
('id', 'FOREIGN KEY'),
('model_id', 'FOREIGN KEY'),
('price_value', 'UNIQUE'),
('model_id', 'UNIQUE')
),
-- Находим информацию о столбцах тестируемой таблицы и добавляем обработку null'ов
columns_info as (
select
column_name, data_type, is_nullable,
coalesce(numeric_precision, 0) as numeric_precision,
coalesce(numeric_scale, 0) as numeric_scale,
coalesce(character_maximum_length, 0) as character_maximum_length
from information_schema.columns
where
table_name = (select table_name from params)
and table_schema = (select table_schema from params)
),
-- Проверяем существование таблицы и подсчитываем количество столбцов в ней
check_table_exist as (
select
case when count_all_fields < 1 then false else true end table_exists,
case when count_all_fields < 1 then 1 else 0 end table_exists_error,
count_all_fields
from (
select count (*) as count_all_fields
from columns_info
) sq
)
select *
from columns_info
Сравнить датасеты в SQL можно разными способами. Мне известны три — через:
В данном случае я воспользуюсь join’ом. Соединение будем делать по столбцам, описанным в CTE columns_info. Начнем с первых 3-х. Таблицам добавим алиасы t и r. Возвращать записи будем только из columns_info. Иначе говоря, только те, которые фактически есть в БД. Код в with идентичен блоку, описанному выше, поэтому его далее буду сокращать для удобства:
with (...)
select t.*
from columns_info t
inner join required_columns r
on t.column_name = r.column_name
and t.data_type = r.data_type
and t.is_nullable = r.is_nullable
Остаются еще 3 столбца. С ними сложнее. В CTE required_columns есть только 4 столбца, тогда в columns_info возвращается 6. Это связано с тем, что ожидаемое ограничение по количеству символов и цифр мы задаем в 1-м столбце для всех типов. При этом фактически в метаданных есть разделение такого ограничения на 3 столбца.
Начнем с простого: сравнение для столбцов varchar. null’ы в датасете с ожидаемыми значениями заменяем на 0. На всякий случай повторю, что это нужно для сравнения: null’ы через равенство проверить не получится:
with (...)
select t.*
from columns_info t
inner join required_columns r
on t.column_name = r.column_name
and t.data_type = r.data_type
and t.is_nullable = r.is_nullable
and t.character_maximum_length = coalesce(r.character_maximum_length, 0)
Теперь нужно сравнить целую и дробную части числовых значений ограничений. Но только для типа numeric и decimal, т.к. остальные числовые типы имеют фиксированный диапазон значений. Начнем с целой части чисел. Здесь нужно использовать case для отбора значений с типами numeric и decimal. С учетом того, что сравнивается целая часть, у числа из столбца character_maximum_length (из CTE required_columns) нужно отсекать дробную часть с помощью функции trunc (). Проверяемый столбец может быть единовременно только одного типа, поэтому в ранее написанную обработку для ограничений varchar новый код добавляем через or. Всю обработку при этом оборачиваем в скобки:
with (...)
select t.*
from columns_info t
inner join required_columns r
on t.column_name = r.column_name
and t.data_type = r.data_type
and t.is_nullable = r.is_nullable
and t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
and (
case
when t.data_type in ('numeric', 'decimal')
then t.numeric_precision = trunc(r.character_maximum_length::numeric)
end
or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
)
И теперь самое сложное: сравнение дробной части. Вычислить её просто — нужно лишь из числа вычесть его целую часть. Пример для наглядности:
18,6 — 18 = 0,6
Но описать кодом это будет сложнее. В таблице price, которая используется в этой статье как объект тестирования, есть столбец price_value. Он имеет тип numeric и ограничение 18.6, то есть 18 цифр в целой части и 6 в дробной. То, что нужно для примера. Попробуем в select добавить разность, чтобы посмотреть, что получится:
Это ожидаемое значение. А что в фактическом?
То есть чтобы корректно провести сравнение, нужно дробную часть ожидаемого значения перевести в целую. В данном случае достаточно выполнить умножение на 10. Но что если бы ограничение было не 18.6, а, скажем, 18.15? Тогда умножение на 10 нужно проводить во 2-й степени. И значение этой степени — переменное, равное количеству знаков после запятой. Иначе говоря, для столбца price_value, у которого есть ограничение 18.6, степень будет равна 1 (после запятой только 1 цифра — 6). А если ограничение было 18.15, то степень должна была быть 2.
Итак, разность числа и его целой части, которые берутся из character_maximum_length (CTE required_columns), показанная в виде кода на скриншоте выше, выглядит так:
r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)
Теперь её нужно умножить на 10 в степени:
(r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)) * power(10, 1)
Но единица (1) в power () покрывает не все случаи, поэтому добавим код для динамического расчета:
(r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)) * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
Прокомментирую. r.character_maximum_length: text — выполняем приведение типов, т.к. этот столбец имеет тип numeric, но его значения будут использоваться в функциях, которые работают только с символьными типами. Функция split_part () используется для отбора цифр после запятой (но в качестве разделителя указывается именно точка, т.к. дробное значение в метаданных хранится именно в таком виде). А length подсчитывает количество этих цифр.
Добавляем этот код в выражения для соединения:
with(...)
select t.*
from columns_info t
inner join required_columns r
on t.column_name = r.column_name
and t.data_type = r.data_type
and t.is_nullable = r.is_nullable
and (
-- Сравниваем целую часть десятичных значений
case
when t.data_type in ('numeric', 'decimal')
then t.numeric_precision = trunc(r.character_maximum_length::numeric)
end
and
-- Сравниваем дробную часть десятичных значений
case
when t.data_type in ('numeric', 'decimal')
then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))
* power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
end
or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
)
Результат сравнения покажет, какие столбцы тестируемой таблицы имеют ожидаемые метаданные. В дальнейшем отобранные таким образом записи будем сравнивать с датасетом из CTE required_columns. Так мы найдем столбцы, у которых какие-либо метаданные не соответствуют ожидаемым. Добавим этот запрос в CTE с названием fields_comparison:
-- Сравниваем ожидаемый и текущий наборы атрибутов таблицы
with
(...),
fields_comparison as (
select t.*
from columns_info t
inner join required_columns r
on t.column_name = r.column_name
and t.data_type = r.data_type
and t.is_nullable = r.is_nullable
and (
-- Сравниваем целую часть десятичных значений
case
when t.data_type = 'numeric'
then t.numeric_precision = trunc(r.character_maximum_length::numeric)
end
and
-- Сравниваем дробную часть десятичных значений
case
when t.data_type = 'numeric'
then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))
* power(10, length(split_part(r.character_maximum_length::text, '.', 2)))
end
or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)
)
)
select *
from fields_comparison
Едем дальше. В следующем CTE, check_unexpected_fields, выполняется поиск столбцов таблицы, которые есть в фактическом датасете, но отсутствуют в ожидаемом (их условно можно назвать «лишними»). Это реализовано через сравнение записей в CTE columns_info и CTE required_columns. Запрос довольно простой:
select column_name
from columns_info
except
select column_name
from required_columns
По такому запросу каждое название «лишнего» столбца будет выведено в отдельной строке. Однако с учетом того, что это не единственный CTE, из которого будет формироваться результирующая выборка, вернувшиеся значения нужно собрать в одной ячейке. Я это делаю через перечисление в строке, с запятой в качестве разделителя, с помощью функции string_agg (). И дополнительно подсчитываю количество значений, вернувшихся по запросу:
with
(...),
-- Ищем лишние столбцы и считаем их количество
check_unexpected_fields as (
select
count (column_name) as count_unexpected_fields,
string_agg(column_name, ', ') as unexpected_fields
from (
select column_name
from columns_info
except
select column_name
from required_columns
) sq
)
select *
from check_unexpected_fields
Далее в CTE check_missing_fields содержится почти такой же запрос. Разница лишь в том, что запросы в except меняются друг с другом местами. Этот CTE показывает недостающие столбцы тестируемой таблицы. Иначе говоря, те, что есть в ожидаемом датасете, но отсутствуют в фактическом:
with
(...),
-- Ищем недостающие столбцы и считаем их количество
check_missing_fields as (
select
count (column_name) as count_missing_fields,
string_agg(column_name, ', ') as missing_fields
from (
select column_name
from required_columns
except
select column_name
from columns_info
) sq
)
select *
from check_missing_fields
Итак, на текущей стадии нам известны столбцы:
Ожидаемые (CTE required_columns);
Полностью совпавшие с ожидаемыми (CTE fields_comparison);
Лишние (CTE check_unexpected_fields);
Недостающие (CTE check_missing_fields).
С помощью этих данных можно найти еще одно множество. В нем будут содержаться столбцы, которые совпадают с ожидаемым датасетом лишь частично. Соответственно, CTE с таким запросом будет возвращать невалидные значения. Следовательно, для него выглядит логичным название check_invalid_fields.
Запрос для поиска невалидных столбцов также содержит в себе оператор except, но в данном случае он используется дважды. Из множества записей ожидаемых столбцов (CTE required_columns) вычитается множество полностью совпавших столбцов (CTE fields_comparison) и недостающих столбцов (CTE check_missing_fields). Для CTE check_missing_fields в рамках такого запроса нужно «развернуть» строку со значениями обратно в набор записей. Это можно сделать с помощью функции string_to_table (). Получаем такой код:
with
(...),
-- Ищем невалидные столбцы и считаем их количество
check_invalid_fields as (
select
count (column_name) as count_invalid_fields,
string_agg(column_name, ', ') as invalid_fields
from (
select column_name
from required_columns
except
select column_name
from fields_comparison
except
select string_to_table(missing_fields, ', ')
from check_missing_fields
) sq
select *
from check_invalid_fields
Вся необходимая информация для тестирования столбцов — собрана. Перейдем к ограничениям (constraints). К слову, к ним в контексте метаданных, в частности, относятся зависимые ключи (foreign keys). Для получения необходимых сведений будем запрашивать из информационной схемы столбцы таблиц:
constraint_type из table_constraints;
column_name из constraint_column_usage;
column_name из key_column_usage;
Если какое-либо ограничение не удовлетворяется при изменении данных таблицы, то по запросу возвращается ошибка. В её формулировке будет указано название нарушаемого в таком случае ограничения (violates constraint):
Ограничения помогают поддерживать целостность данных. По этой причине важно проверять их наличие и логику. Информацию об ограничениях тестируемой таблицы будем получать по отдельному запросу. Его опишем в CTE constraints_query:
with
(...),
-- Ищем все ограничения для таблицы
constraints_query as(
select
t1.constraint_type,
t2.column_name,
t3.column_name as foreign_column
from information_schema.table_constraints t1
left join information_schema.constraint_column_usage t2
on t1.constraint_name = t2.constraint_name
left join information_schema.key_column_usage as t3
on t1.constraint_name = t3.constraint_name
and t1.table_schema = t3.table_schema
where
t1.table_name = (select table_name from params)
and t1.constraint_schema = (select table_schema from params)
and t2.column_name is not null
)
select *
from constraints_query
По этому запросу будут возвращаться 3 столбца:
При этом в ожидаемом датасете, в CTE required_constraints, у нас всего 2 столбца:
with
(...)
-- Описываем ожидаемые ограничения
required_constraints(column_name, constraint_type) as (
values
('id', 'PRIMARY KEY'),
('id', 'FOREIGN KEY'),
('model_id', 'FOREIGN KEY'),
('price_value', 'UNIQUE'),
('model_id', 'UNIQUE')
)
select *
from required_constraints
Поэтому в следующем CTE (назовем его union_foreign_ref_columns) объединим значения из столбцов column_name и foreign_column в 1 столбец:
with
(...),
-- Включаем значения зависимых ключей (foreign_column) в список ограничений (column_name)
union_foreign_ref_columns as (
select column_name, constraint_type
from constraints_query
union all
select foreign_column as column_name, constraint_type
from constraints_query
)
select *
from union_foreign_ref_columns
Фрагмент as column_name
необязателен, но я его оставил его для улучшения читаемости кода.
Фактический набор ограничений получили. Теперь можно проводить сравнение. Я решил не делать отдельный CTE под поиск невалидных ограничений, т.к. здесь рассматриваются всего 2 столбца (column_name и constraints_type). В конечном запросе рассматриваются только «лишние» и недостающие ограничения в 2-х соответствующих CTE. И это почти конец запроса:
with
(...),
-- Ищем лишние ограничения и считаем их количество
check_unexpected_constraints as (
select
count (column_name) as count_unexpected_constraints,
string_agg(column_name || ' (' || constraint_type || ')', ', ') as unexpected_constraints
from (
select *
from union_foreign_ref_columns
except
select column_name, constraint_type
from required_constraints
) sq
)
select *
from check_unexpected_constraints
и
with
(...),
-- Ищем недостающие ограничения и считаем их количество
check_missing_constraints as (
select
count (column_name) as count_missing_constraints,
string_agg(column_name || ' (' || constraint_type || ')', ', ') as missing_constraints
from (
select column_name, constraint_type
from required_constraints
except
select *
from union_foreign_ref_columns
) sq
)
select *
from check_missing_constraints
Логика сравнения здесь такая же, как при сравнении столбцов. Отличие между этими 2-мя CTE — только в порядке расположения относительно except: в check_unexpected_constraints запрос к union_foreign_ref_columns идет до except, а в check_missing_constraints — после. Из интересного здесь разве что данная конструкция:
string_agg(column_name || ' (' || constraint_type || ')', ', ')
string_agg () здесь по-прежнему нужен для вывода нескольких записей в 1-й строке. Но здесь 1-й аргумент функции логически немного отличается от этого же аргумента функции в коде сравнения столбцов. Это нужно исключительно для того, чтобы в 1-й ячейке уместить не только несколько строк, но и информацию сразу по 2-м столбцам. В итоговой выборке значение в этой ячейке будет выглядеть так:
По этому скриншоту проще понять, почему конструкция с string_agg () именно такая, какая она есть. В нее включается информацию из 2-х столбцов, при этом значения 2-го столбца берется в скобки с пробелом перед 1-м значением.
Ну и в последнем CTE собираем информацию, полученную в других CTE, и суммируем количество найденных ошибок, которые мы получали с помощью count ():
with
(...),
-- Собираем полученные данные
checks as (
select
-- Выводим всю информацию об ошибках и суммируем их количество
table_exists_error + count_unexpected_fields + count_missing_fields + count_invalid_fields +
count_unexpected_constraints + count_missing_constraints as errors,
*
from check_table_exist
cross join check_unexpected_fields
cross join check_missing_fields
cross join check_invalid_fields
cross join check_unexpected_constraints
cross join check_missing_constraints
)
select *
from checks
Значения, выводимые этим запросом в строках, видны в самих строках, в тултипе (если в строке большое значение), и их также можно скопировать и вставить блокнот:
Почему этот запрос полезен?
По моему опыту, почему-то новые таблицы БД, которые создают разработчики, часто не соответствуют тому, что описано в требованиях. Исходя из этого, проще один раз описать ожидаемые значения в запросе и запускать его после правок. Это сэкономит время при сравнении и исключит человеческий фактор. Особенно если итераций правок будет несколько;
Польза от таких проверочных запросов тем выше, чем больше таблиц добавляется в рамках доработки;
Пригодится для быстрого тестирования метаданных таблиц после миграции баз данных на другой стенд.