Data Vault: Мой опыт создания модели вручную17.07.2024 00:00
/*###########################
обработка - departure_airport
#############################*/
/*проверка на наличие изменение связей - если поменяют название аэропорта*/
DROP TABLE IF EXISTS data_vault.for_lfadp_update_time;
CREATE TABLE IF NOT EXISTS data_vault.for_lfadp_update_time AS
SELECT DISTINCT lfadd.hub_flights_hash_key,
lfadd.hub_airoport_data_hash_key,
MD5(hf.flight_id :: varchar) AS new_flight_id,
MD5(hf.departure_airport :: varchar) AS new_airport_name,
hf.flight_id new_flight,
hf.departure_airport new_airport,
to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time
FROM stage.flights hf
LEFT JOIN data_vault.Link_flights_airoport_data_departure lfadd ON md5(hf.flight_id::varchar) = lfadd.hub_flights_hash_key
WHERE (CASE WHEN upper(replace(lfadd.hub_airoport_data_hash_key, ' ', '')) = upper(replace(md5(hf.departure_airport::varchar), ' ', '')) THEN 1 ELSE 0 END) = 0;
-- обновляем время в линке на новое, если сменилось название аэропорта в исходнике
UPDATE data_vault.Link_flights_airoport_data_departure a
SET load_end_date = flut.new_fix_time
FROM data_vault.for_lfadp_update_time flut
WHERE a.hub_flights_hash_key =
flut.new_flight_id;
-- добавляем строку в таблицу Link_flights_airoport_data_departure новую
INSERT INTO data_vault.Link_flights_airoport_data_departure
(Link_flights_airoport_data_departure_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key)
SELECT
concat(new_flight, new_airport),
new_fix_time + INTERVAL '1 second',
'1111-11-11 11:11:11' :: timestamp,
'flights_airport_data',
new_flight_id,
new_airport_name
FROM data_vault.for_lfadp_update_time;
DROP TABLE IF EXISTS data_vault.for_lfadp_update_time;
/*обработка departure_airport завершена*/
/*###########################
обработка - arrival_airport
#############################*/
/*проверка на наличие изменение связей - если поменяют название аэропорта*/
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time AS
SELECT DISTINCT lfada.hub_flights_hash_key,
lfada.hub_airoport_data_hash_key,
MD5(hf.flight_id :: varchar) AS new_flight_id,
MD5(hf.arrival_airport :: varchar) AS new_airport_name,
hf.flight_id new_flight,
hf.arrival_airport new_airport,
to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time
FROM stage.flights hf
LEFT JOIN data_vault.Link_flights_airoport_data_arrival lfada ON md5(hf.flight_id::varchar) = lfada.hub_flights_hash_key
WHERE (CASE WHEN upper(replace(lfada.hub_airoport_data_hash_key, ' ', '')) = upper(replace(md5(hf.arrival_airport::varchar), ' ', '')) THEN 1 ELSE 0 END) = 0;
-- SELECT * FROM data_vault.for_lfada_update_time
-- обновляем время в линке на новое, если сменилось название аэропорта в исходнике
UPDATE data_vault.Link_flights_airoport_data_arrival a
SET load_end_date = flut.new_fix_time
FROM data_vault.for_lfada_update_time flut
WHERE a.hub_flights_hash_key =
flut.new_flight_id;
INSERT INTO data_vault.Link_flights_airoport_data_arrival
(Link_flights_airoport_data_arrival_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key)
SELECT
concat(new_flight_id, new_airport_name),
new_fix_time + INTERVAL '1 second',
'1111-11-11 11:11:11' :: timestamp,
'flights_airport_data',
new_flight_id,
new_airport_name
FROM data_vault.for_lfada_update_time;
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
/*обработка arrival_airport завершена*/
/*################################
обработка - flights_ticket_flights
##################################*/
/*проверка если отменили рейс flight_id и перенесли билет на новый рейс - такое же может быть , значит load end date будет актуален*/
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
WITH
new AS (SELECT distinct
replace(concat(tf.ticket_no, tf.flight_id), ' ', '') new_data
, tf.ticket_no
, tf.flight_id n_flight_id
FROM stage.ticket_flights tf
LEFT JOIN data_vault.Link_flights_ticket_flights lftf
ON replace(concat(md5(tf.ticket_no), md5(tf.flight_id :: varchar)), ' ', '') = replace(concat(lftf.ticket_no , lftf.Hub_flights_Hash_key), ' ', '')
WHERE LENGTH(replace(concat(lftf.ticket_no, lftf.Hub_flights_Hash_key), ' ', '')) = 0),
old AS (SELECT distinct
lftf.ticket_no
, Hub_flights_Hash_key o_flight_id
FROM data_vault.Link_flights_ticket_flights lftf
LEFT JOIN stage.ticket_flights tf
ON replace(concat(md5(tf.ticket_no), md5(tf.flight_id :: varchar)), ' ', '') = replace(concat(lftf.ticket_no , lftf.Hub_flights_Hash_key), ' ', '')
WHERE LENGTH(replace(concat(tf.ticket_no, tf.flight_id), ' ', '')) = 0)
SELECT md5(NEW.ticket_no) ticket_no,
concat(OLD.ticket_no, o_flight_id) AS OLD_ticket_no_flight_id,
o_flight_id AS OLD_flight_id,
concat(NEW.ticket_no, '&', n_flight_id) AS NEW_ticket_no_flight_id,
md5(n_flight_id ::varchar) AS NEW_flight_id,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time
FROM old JOIN NEW ON OLD.ticket_no = NEW.ticket_no;
-- SELECT * FROM data_vault.for_lfada_update_time;
-- обновляем время в линке на новое, если сменилось название аэропорта в исходнике
UPDATE data_vault.Link_flights_ticket_flights a
SET load_end_date = flut.new_fix_time
FROM data_vault.for_lfada_update_time flut
WHERE concat(a.ticket_no, a.hub_flights_hash_key) =
flut.OLD_ticket_no_flight_id;
-- добавляем строку в таблицу Link_flights_ticket_flights новую, которую добавили в ticket_flights
INSERT INTO data_vault.Link_flights_ticket_flights (Link_flights_ticket_flights_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, ticket_no, Hub_ticket_flights_Hash_key)
SELECT flut.NEW_ticket_no_flight_id,
to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'flights_ticket_flights',
md5(NEW_flight_id),
md5(ticket_no),
md5(flut.NEW_ticket_no_flight_id)
FROM data_vault.for_lfada_update_time flut;
/*обработка flights_ticket_flights завершена*/
/*########################################
обработка - ticket_flights_boarding_passes
##########################################*/
-- так как данные в таблице связаны с хабами , а обработка линка идет с не зашифрованными данными т.е. без md5, то удалим привязку
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes DROP CONSTRAINT IF EXISTS Hub_bp_fk;
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes DROP CONSTRAINT IF EXISTS Hub_tf_fk;
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
-- проверка на новые данные , если их нет то добавляем
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
WITH FIRST as(SELECT concat(bp.ticket_no, '&', bp.flight_id) AS concat_bp,
concat(tf.ticket_no, '&', tf.flight_id) AS concat_tf,
bp.ticket_no
FROM stage.boarding_passes bp
LEFT JOIN stage.ticket_flights tf USING(ticket_no, flight_id))
SELECT concat_bp,
concat_tf,
hub_boarding_passes_hash_key,
Hub_ticket_flights_Hash_key,
f.ticket_no
FROM FIRST f
LEFT JOIN data_vault.Link_ticket_flights_boarding_passes flut
ON f.concat_bp = Hub_ticket_flights_Hash_key AND f.concat_tf = Hub_boarding_passes_Hash_key
WHERE load_date IS null;
-- вставляем в таблицу обновленные элементы с источника, вставятся все поля если
INSERT INTO data_vault.Link_ticket_flights_boarding_passes (Link_ticket_flights_boarding_passes,
load_date, load_end_date, record_sourse,
ticket_no, Hub_boarding_passes_Hash_key,
Hub_ticket_flights_Hash_key)
SELECT concat(flut.concat_tf , flut.concat_bp),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'ticket_flights_boarding_passes',
md5(ticket_no),
flut.concat_bp,
flut.concat_tf
FROM data_vault.for_lfada_update_time flut;
-- если один из хешей не подгрузился, догружаем его, вытащив из уже известного, так как хеши таблиц boarding_passes и ticket_flights одинаковы
-- обновляем данные столбца ticket_flights_boarding_passes с хешем , если он будет не заполнен
UPDATE data_vault.Link_ticket_flights_boarding_passes a
SET (hub_boarding_passes_hash_key,
Hub_ticket_flights_Hash_key,
link_ticket_flights_boarding_passes) =
(concat(ticket_nom, '&', flight_id),
concat(ticket_nom, '&', flight_id),
concat(concat(ticket_nom, '&', flight_id), concat(ticket_nom, '&', flight_id)))
FROM
(SELECT CASE WHEN
length(CASE WHEN length(Hub_ticket_flights_Hash_key) < 2 OR length(hub_boarding_passes_hash_key) < 2
THEN substring(Hub_ticket_flights_Hash_key FROM 1 FOR position('&' IN Hub_ticket_flights_Hash_key) - 1)
ELSE '0'
END) < 1
THEN substring(hub_boarding_passes_hash_key FROM 1 FOR position('&' IN hub_boarding_passes_hash_key) - 1)
ELSE 'ошибка: необходима проверка данных' END ticket_nom,
CASE WHEN
length(CASE WHEN length(Hub_ticket_flights_Hash_key) < 2 OR length(hub_boarding_passes_hash_key) < 2
THEN substring(Hub_ticket_flights_Hash_key FROM position('&' IN Hub_ticket_flights_Hash_key) + 1)
ELSE '0'
END) < 1
THEN substring(hub_boarding_passes_hash_key FROM position('&' IN hub_boarding_passes_hash_key) + 1 )
ELSE 'ошибка: необходима проверка данных' END flight_id,
link_ticket_flights_boarding_passes,load_date,load_end_date,record_sourse,ticket_no,hub_boarding_passes_hash_key,Hub_ticket_flights_Hash_key
FROM data_vault.Link_ticket_flights_boarding_passes
WHERE length(hub_boarding_passes_hash_key) < 2
OR length(Hub_ticket_flights_Hash_key) < 2) flut
WHERE a.hub_boarding_passes_hash_key =
(concat(flut.ticket_nom, '&', flut.flight_id))
OR a.Hub_ticket_flights_Hash_key = (concat(flut.ticket_nom, '&', flut.flight_id));
-- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные
UPDATE data_vault.Link_ticket_flights_boarding_passes a
SET (hub_boarding_passes_hash_key,
hub_ticket_flights_hash_key) = (md5(b.hub_boarding_passes_hash_key), md5(b.hub_ticket_flights_hash_key))
FROM data_vault.Link_ticket_flights_boarding_passes b
WHERE (a.link_ticket_flights_boarding_passes = b.link_ticket_flights_boarding_passes)
AND (length(Hub_boarding_passes_Hash_key) < 30 OR (length(Hub_ticket_flights_Hash_key) < 30));
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes ADD CONSTRAINT Hub_bp_fk FOREIGN KEY (Hub_boarding_passes_Hash_key) REFERENCES data_vault.Hub_boarding_passes(hash_key);
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes ADD CONSTRAINT Hub_tf_fk FOREIGN KEY (Hub_ticket_flights_Hash_key) REFERENCES data_vault.Hub_ticket_flights(hash_key);
/*обработка ticket_flights_boarding_passes завершена*/
/*################################
обработка - ticket_flights_tickets
##################################*/
/*обработка - обновляем данные в таблице, если в одном из источников данные не сгенирились, то вставляем их из другого источника*/
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
-- проверка на новые данные , если их нет то добавляем
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
SELECT *
FROM data_vault.Link_ticket_flights_tickets ltft
FULL JOIN (SELECT concat(concat(tf.ticket_no, '&', tf.flight_id), t.ticket_no) link,
md5(concat(tf.ticket_no, '&', tf.flight_id)) new_hub_ticket_flights_hash_key,
md5(t.ticket_no) T_ticket_no,
t.ticket_no tticket_no,
md5(tf.ticket_no) TF_ticket_no,
tf.ticket_no tfticket_no,
md5(tf.flight_id :: varchar) flight_id,
tf.flight_id tflight_id
FROM stage.ticket_flights tf
full JOIN stage.tickets t
ON t.ticket_no = tf.ticket_no) we
ON we.link = ltft.Link_ticket_flights_tickets
WHERE link_ticket_flights_tickets IS NULL;
-- SELECT * FROM data_vault.for_lfada_update_time;
-- вставка данных в таблицу линк из таблиц источников, если их там нет
INSERT INTO data_vault.Link_ticket_flights_tickets (Link_ticket_flights_tickets, load_date, record_sourse, Hub_ticket_flights_Hash_key, Hub_tickets_Hash_key)
SELECT concat(COALESCE(tticket_no, tfticket_no), '&', CASE WHEN tflight_id :: varchar IS NULL THEN 'нет flight_id в ticket_flights' ELSE tflight_id :: varchar END, COALESCE(tticket_no, tfticket_no)),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'ticket_flights_tickets',
CASE WHEN length(new_hub_ticket_flights_hash_key) > 2
THEN new_hub_ticket_flights_hash_key
ELSE concat(COALESCE(t_ticket_no, tf_ticket_no), '&', CASE WHEN flight_id :: varchar IS NULL THEN 'нет flight_id в ticket_flights' ELSE flight_id :: varchar END)
END,
COALESCE(t_ticket_no, tf_ticket_no)
FROM data_vault.for_lfada_update_time;
/*обработка ticket_flights_tickets завершена*/
/*#########################
обработка - ticket_bookings
##########################*/
-- если клиент сдал билет, следовательно запись в таблице удалится, а значит необходимо сделать пометку времени
UPDATE data_vault.Link_tickets_bookings a
SET load_end_date = to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp
FROM (WITH
new_data AS (
SELECT concat(ticket_no, book_ref) Link_tickets_bookings ,
ticket_no,
book_ref
FROM stage.tickets t
FULL JOIN stage.bookings b USING(book_ref))
SELECT ltb.Link_tickets_bookings
FROM data_vault.Link_tickets_bookings ltb
left JOIN new_data nd USING (Link_tickets_bookings)
WHERE nd.Link_tickets_bookings IS NULL) b
WHERE a.link_tickets_bookings = b.Link_tickets_bookings;
-- добавляем новые данные из источника в таблицу линк
INSERT INTO data_vault.Link_tickets_bookings (Link_tickets_bookings, load_date, load_end_date, record_sourse, Hub_ticket_Hash_key, Hub_bookings_Hash_key)
WITH new_data AS (
SELECT concat(ticket_no, book_ref) Link_tickets_bookings ,
ticket_no,
book_ref
FROM stage.tickets t
FULL JOIN stage.bookings b USING(book_ref))
SELECT nd.Link_tickets_bookings,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'ticket_bookings',
md5(nd.ticket_no),
md5(nd.book_ref)
FROM data_vault.Link_tickets_bookings ltb
RIGHT JOIN new_data nd USING (Link_tickets_bookings)
WHERE load_date IS NULL;
/*обработка ticket_bookings завершена*/
/*################################
обработка - flights_aircrafts_data
#################################*/
-- при добавлении новых данных удаляем привязку к хабам, так как обработка идет без хеширования, после обработки привязка будет восстановлена
ALTER TABLE data_vault.Link_flights_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_f_fk;
ALTER TABLE data_vault.Link_flights_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_ad_fk;
-- Насыщение таблицы новыми данными из источников, предусмотрен момент если данные в одном источнике есть, а в другом не подгрузились
INSERT INTO data_vault.Link_flights_aircrafts_data (Link_flights_aircrafts_data_Hashkey, load_date, record_sourse, Hub_flights_Hash_key, Hub_aircrafts_data_Hash_key)
WITH
new_data AS (
SELECT DISTINCT
concat(f.aircraft_code, ad.aircraft_code) link_flights_aircrafts_data_hashkey,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'flights_aircrafts_data',
f.aircraft_code AS f_aircraft_code,
ad.aircraft_code AS ad_aircraft_code
FROM stage.flights f
FULL JOIN stage.aircrafts_data ad
ON f.aircraft_code = ad.aircraft_code)
SELECT CASE WHEN length(lfad.link_flights_aircrafts_data_hashkey) < 5 OR -- тк коды должны быть идентичны, если нет данных в одной таблице , то подтягиваем их из другой
length(nd.link_flights_aircrafts_data_hashkey) < 5 THEN concat(COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code))
ELSE COALESCE(nd.link_flights_aircrafts_data_hashkey, lfad.link_flights_aircrafts_data_hashkey)
END,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'flights_aircrafts_data',
COALESCE(f_aircraft_code, ad_aircraft_code),
COALESCE(f_aircraft_code, ad_aircraft_code)
FROM data_vault.Link_flights_aircrafts_data lfad
RIGHT JOIN new_data nd ON lfad.link_flights_aircrafts_data_hashkey = nd.link_flights_aircrafts_data_hashkey
WHERE lfad.link_flights_aircrafts_data_hashkey IS NULL;
-- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные
UPDATE data_vault.Link_flights_aircrafts_data a
SET (Hub_flights_Hash_key,
Hub_aircrafts_data_Hash_key) = (md5(b.Hub_flights_Hash_key), md5(b.Hub_aircrafts_data_Hash_key))
FROM data_vault.Link_flights_aircrafts_data b
WHERE (a.Link_flights_aircrafts_data_Hashkey = b.Link_flights_aircrafts_data_Hashkey)
AND (length(Hub_flights_Hash_key) < 30 OR (length(Hub_aircrafts_data_Hash_key) < 30));
-- восстанавливаем привязку таблицы к новым данным
ALTER TABLE data_vault.Link_flights_aircrafts_data ADD CONSTRAINT Hub_f_fk FOREIGN KEY (Hub_flights_Hash_key) REFERENCES data_vault.Hub_flights(hash_key);
ALTER TABLE data_vault.Link_flights_aircrafts_data ADD CONSTRAINT Hub_ad_fk FOREIGN KEY (Hub_aircrafts_data_Hash_key) REFERENCES data_vault.Hub_aircrafts_data(hash_key);
/*обработка flights_aircrafts_data завершена*/
/*##############################
обработка - seats_aircrafts_data
###############################*/
-- при добавлении новых данных удаляем привязку к хабам, так как обработка идет без хеширования, после обработки привязка будет восстановлена
ALTER TABLE data_vault.Link_seats_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_ad_fk;
ALTER TABLE data_vault.Link_seats_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_s_fk;
-- Насыщение таблицы новыми данными из источников, предусмотрен момент если данные в одном источнике есть, а в другом не подгрузились
INSERT INTO data_vault.Link_seats_aircrafts_data(Link_seats_aircrafts_data_Hashkey, load_date, record_sourse, Hub_aircrafts_data_Hash_key, Hub_seats_Hash_key)
WITH
new_data AS (
SELECT DISTINCT
concat(ad.aircraft_code, concat(s.aircraft_code, s.seat_no)) Link_seats_aircrafts_data_Hashkey,
ad.aircraft_code AS f_aircraft_code,
concat(s.aircraft_code, s.seat_no) AS ad_aircraft_code
FROM stage.aircrafts_data ad
FULL JOIN stage.seats s USING(aircraft_code))
SELECT
CASE
WHEN length(nd.link_seats_aircrafts_data_hashkey) < 6
THEN concat(COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code))
ELSE COALESCE(nd.Link_seats_aircrafts_data_Hashkey, lsad.Link_seats_aircrafts_data_Hashkey) END ,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'seats_aircrafts_data',
COALESCE(f_aircraft_code, ad_aircraft_code),
COALESCE(f_aircraft_code, ad_aircraft_code)
FROM data_vault.Link_seats_aircrafts_data lsad
RIGHT JOIN new_data nd ON lsad.Link_seats_aircrafts_data_Hashkey = nd.Link_seats_aircrafts_data_Hashkey
WHERE load_date IS null;
-- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные
UPDATE data_vault.Link_seats_aircrafts_data a
SET (Hub_aircrafts_data_Hash_key,
Hub_seats_Hash_key) = (md5(b.Hub_aircrafts_data_Hash_key), md5(b.Hub_seats_Hash_key))
FROM data_vault.Link_seats_aircrafts_data b
WHERE (a.Link_seats_aircrafts_data_Hashkey = b.Link_seats_aircrafts_data_Hashkey)
AND (length(a.Hub_seats_Hash_key) < 30 OR (length(a.Hub_aircrafts_data_Hash_key) < 30));
-- восстанавливаем привязку таблицы к новым данным
ALTER TABLE data_vault.Link_seats_aircrafts_data ADD CONSTRAINT Hub_ad_fk FOREIGN KEY (Hub_aircrafts_data_Hash_key) REFERENCES data_vault.Hub_aircrafts_data(hash_key);
ALTER TABLE data_vault.Link_seats_aircrafts_data ADD CONSTRAINT Hub_s_fk FOREIGN KEY (Hub_seats_Hash_key) REFERENCES data_vault.Hub_seats(hash_key);
© Habrahabr.ru