КЛАДРируем адреса произвольной формы (ч.1 — импорт)

Достаточно часто при работе с вводимыми пользователем адресными данными возникает необходимость сначала подсказать ему, как правильно свой адрес указать, а потом — полученную строку адреса привести в некоторый машинно-читаемый вид.

Таким видом у нас в стране можно считать код по справочникам КЛАДР или ФИАС.

Первый из них уже несколько лет считается устаревающим, но отличается более простой структурой и исторически продолжает использоваться во множестве систем, поскольку вполне подходит для большинства задач.

Давайте научимся разбирать строку адреса «туда и обратно», а заодно познакомимся с некоторыми алгоритмическими подходами и их реализацией на SQL.

image-loader.svg

Получение справочника КЛАДР

База КЛАДР в настоящее время администрируется ФНС и представлена на сайте ГНИВЦ в виде периодически (примерно раз в неделю) обновляемого архива. Для начала мы научимся его скачивать, исправлять некоторые ошибки и преобразовывать в более подходящую для наших задач структуру.

Для этого нам понадобятся wget, p7zip, dbview, recode и psql.

Исходный архив

База выкладывается на сервере ГНИВЦ всегда по одному и тому же адресу в виде 7z-архива Base.7z — версии отличаются друг от друга только заголовком Last-Modified в ответе сервера.

При этом сервер достаточно нервно реагирует на скорость загрузки и не поддерживает HEAD-запросы, увы. Поэтому нам придется скачивать архив ежесуточно полностью, но чтобы снизить нагрузку на сервер установим ограничение скорости, а заголовки ответа сохраним и разберем отдельно:

wget -S https://gnivc.ru/html/gnivcsoft/KLADR/Base.7z --limit-rate=8k 2>.hdr

В сгенерированном .hdr нас будет интересовать эта строка:

...
  Last-Modified: Thu, 05 Aug 2021 09:28:39 GMT
...

Чтобы не пытаться обрабатывать архив и обновлять данные в нашей базе повторно, будем сохранять в ней этот таймстамп для последующих сравнений.

Если же таймстамп файла не совпал с сохраненным, распакуем полученный архив:

p7zip -d Base.7z

Результатом будет 7 DBF-файлов в DOS-кодировке:

Extracting  ALTNAMES.DBF
Extracting  DOMA.DBF
Extracting  FLAT.DBF
Extracting  KLADR.DBF
Extracting  NAMEMAP.DBF
Extracting  SOCRBASE.DBF
Extracting  STREET.DBF

Переберем все эти файлы, формируя единый скрипт выгрузки данных через psql в COPY-формате:

# получаем структуру полей DBF
#   | DOS2WIN
#   | берем только описания полей (skip 2 строки)
#   | оставляем только их имена
dbview -b -t -e -o -r ALTNAMES.DBF \
  | recode CP866..CP1251 \
  | tail -n+2 \
  | xargs -l \
  | egrep -io "^[a-z0-9_]+"

# ... формируем SQL-заголовок временной таблицы

# получаем данные DBF, разделенные '~'
#   | склеиваем "висящие" строки ([\t\r\n] в теле поля данных)
#   | DOS2WIN
#   | убираем все '\t', убираем концевые '~', заменяем '~'->'\t'
dbview -d~ -b -t ALTNAMES.DBF \
  | sed -e :a -e '/[\r\t]$/N; s/[\r\t]\n//g; ta' \
  | recode CP866..CP1251 \
  | sed -e 's/\t//g; s/~\r//g; s/~,/,/g; s/~/\t/g' >>.sql

По итогу мы получаем большой-большой SQL-файл примерно такого вида:

CREATE TEMPORARY TABLE "STREET.DBF"(
  "NAME"
    varchar,
  "SOCR"
    varchar,
  "CODE"
    varchar,
  "INDEX"
    varchar,
  "GNINMB"
    varchar,
  "UNO"
    varchar,
  "OCATD"
    varchar
);
COPY "STREET.DBF"(
  "NAME",
  "SOCR",
  "CODE",
  "INDEX",
  "GNINMB",
  "UNO",
  "OCATD"
) FROM stdin;
Абадзехская     ул      01000001000000100       385013  0105            79401000000
Абрикосовая     ул      01000001000000200       385013  0105            79401000000
Авиационный     пер     01000001000000300       385006  0105            79401000000
Автодорога 7    ул      01000001000000400       385019  0105            79401000000
...

Импорт данных

Поскольку сами данные в таблицах архива представлены в DOS-кодировке, но это как-то совсем уж немодно, развернем нашу базу в WIN1251 — она тоже однобайтовая, поэтому все поиски данных не станут тяжелее:

CREATE DATABASE kladr
  WITH ENCODING='WIN1251'
    OWNER=postgres
    TEMPLATE=template0
    LC_COLLATE='ru_RU.CP1251'
    LC_CTYPE='ru_RU.CP1251'
    CONNECTION LIMIT=-1;
-- лог проверок обновлений
CREATE TABLE kladr_chk(
  id
    serial
      PRIMARY KEY,
  ts
    timestamp
      DEFAULT now(),
  hostname
    varchar
);

-- лог проведенных обновлений
CREATE TABLE kladr_upd(
  id
    serial
      PRIMARY KEY,
  ts
    timestamp
      DEFAULT now(),
  lm
    varchar,
  hostname
    varchar
);

Защита от параллельной загрузки

Поскольку импорт данных из ГНИВЦ у нас может проходить длительное время и инициироваться с разных источников (мало ли, один из инстансов прикажет долго жить), неплохо бы позаботиться о защите данных, чтобы они не пострадали — дописываем все это в начале .sql:

-- защита от автоотключения по таймауту
SET statement_timeout = 0;
-- включаем WIN-кодировку
SET client_encoding = 'WIN1251';
-- включаем application_name для мониторинга активного процесса
SET application_name = 'kladr : import [`hostname`]';
-- включаем "последовательные" транзакции
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
  -- блокируем эксклюзивно upd-таблицу для "замораживания" параллельных операций
  LOCK TABLE kladr_upd IN EXCLUSIVE MODE NOWAIT;
  -- сбрасываем информацию в таблицу протокола проверок
  INSERT INTO kladr_chk(hostname) VALUES('`hostname`');
  -- сравниваем Last-Modified загруженного файла и состояния в базе
  SELECT lm FROM kladr_upd ORDER BY id DESC LIMIT 1;
  -- если lm не совпадает
    -- для каждого DBF
    CREATE TEMPORARY TABLE ...;
    COPY ... FROM stdin;
    ...
    \.
    --
    -- обновляем "версию" нашей базы
    INSERT INTO kladr_upd(lm, hostname) VALUES('$lm', '`hostname`');
COMMIT;

В результате такого подхода мы всегда знаем, кто и когда проверял наличие обновлений, и кому это удалось.

Соберем все вместе в единый скрипт:

Много bash-кода

#!/bin/sh

. `dirname "$0"`/../app.conf

echo "`date '+%F %T'` ==== Connecting to DB : $pghost:$pgport:$pgbase:$pguser"
# тестирование подключения к БД
psql -t -c 'SELECT 1' -h $pghost -p $pgport -U $pguser -w $pgbase 1>/dev/null 2>/dev/null
rv="$?"
if [ "$rv" != "0" ]; then
  echo "$pghost:$pgport:$pgbase:$pguser:$pgpass" >>~/.pgpass
  chmod 0600 ~/.pgpass
  psql -t -c 'SELECT 1' -h $pghost -p $pgport -U $pguser -w $pgbase 1>/dev/null 2>/dev/null
  rv="$?"
fi

if [ "$rv" != "0" ]; then
  echo "DB not connected : $pghost:$pgport:$pgbase:$pguser"
  exit 1
fi

# инициализация каталога _dbf
#_dbf=`mktemp -d`
mkdir ./dbf && chmod 777 ./dbf
_dbf=`readlink -f ./dbf`
rm -rf ${_dbf} 2>/dev/null
mkdir ${_dbf} 2>/dev/null
touch ${_dbf}/.sql
dir=`dirname "$0"`
dir=`readlink -f $dir`

## импорт базы КЛАДР'а в _dbf/.sql
# защита от автоотключения по таймауту
echo "SET statement_timeout = 0;" >>${_dbf}/.sql
# включаем WIN-кодировку
echo "SET client_encoding = 'WIN1251';" >>${_dbf}/.sql
# включаем application_name для мониторинга активного процесса
echo "SET application_name = 'kladr : import [`hostname`]';" >>${_dbf}/.sql
# включаем "последовательные" транзакции
echo "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;" >>${_dbf}/.sql
# блокируем эксклюзивно upd-таблицу для "замораживания" параллельных операций
echo "LOCK TABLE kladr_upd IN EXCLUSIVE MODE NOWAIT;" >>${_dbf}/.sql
# сбрасываем информацию в таблицу протокола проверок
echo "INSERT INTO kladr_chk(hostname) VALUES('`hostname`');" >>${_dbf}/.sql

# инициализация временного каталога импорта
#tmp=`mktemp -d`
mkdir ./tmp && chmod 777 ./tmp
tmp=`readlink -f ./tmp`
cd $tmp

echo "`date '+%F %T'` ==== Downloading : $source"
# загрузка базы КЛАДР'а с ограничением по скорости или без
wget -S $source --limit-rate=8k 2>.hdr
# wget -S $source 2>.hdr

echo "`date '+%F %T'` ==== Comparing 'Last-Modified'"
rc=`cat .hdr | egrep 'HTTP/[0-9]\.[0-9] [0-9]{3}' | sed -e 's/^[ ]*HTTP\/[0-9]\.[0-9][ ]*\([0-9]*\).*$/\1/i' | egrep -v '301' | head -1`
lm=`cat .hdr | egrep 'Last-Modified' | sed -e 's/^[ ]*Last-Modified:[ ]*//i' | head -1`
echo "  -- HTTP code : $rc"
echo "  -- HTTP 'Last-Modified' : $lm"
pglm=`psql -h $pghost -p $pgport -U $pguser -w -t -c 'SELECT lm FROM kladr_upd ORDER BY id DESC LIMIT 1' $pgbase | sed -e 's/^[ ]*//i'`
echo "  -- PGDB 'Last-Modified' : $pglm"

if [ "$rc" = "200" ] && [ "$lm" != "" ] && [ "$lm" != "$pglm" ]; then
  # распаковка базы
  echo "`date '+%F %T'` ==== Unpacking 7z"
  p7zip -d Base.7z 1>/dev/null 2>/dev/null
  cp $tmp/* ${_dbf}
  cd $dir

  echo "`date '+%F %T'` ==== Processing DBF"
  # обработка всех .DBF
  for dbf in `find ${_dbf} -maxdepth 1 -iname '*.DBF'`; do
    dbfn=`basename $dbf | tr '[:lower:]' '[:upper:]'`
    # преобразование заголовков
    echo "  -- DBF : $dbfn"
    echo "    -- header"
    # получаем структуру полей DBF | DOS2WIN | берем только описания полей (skip 2 строки) | оставляем только их имена
    fld=`dbview -b -t -e -o -r $dbf | recode CP866..CP1251 | tail -n+2 | xargs -l | egrep -io "^[a-z0-9_]+"`
    echo "CREATE TEMPORARY TABLE \"$dbfn\"(" >>${_dbf}/.sql
    fl="0"
    for i in ${fld}; do
      [ "$fl" = "1" ] && echo ',' >>${_dbf}/.sql
      echo -n "  \"$i\"\n    varchar" >>${_dbf}/.sql
      fl="1"
    done
    echo ");" >>${_dbf}/.sql
    # преобразование данных
    echo "    -- data"
    echo "COPY \"$dbfn\"(" >>${_dbf}/.sql
    fl="0"
    for i in ${fld}; do
      [ "$fl" = "1" ] && echo ',' >>${_dbf}/.sql
      echo -n "  \"$i\"" >>${_dbf}/.sql
      fl="1"
    done
    echo ") FROM stdin;" >>${_dbf}/.sql
    # получаем данные DBF, разделенные '~' | склеиваем "висящие" строки ([\t\r\n] в теле поля данных) | DOS2WIN | убираем все '\t' | убираем концевые ';' | заменяем ';'->'\t'
    dbview -d~ -b -t $dbf | sed -e :a -e '/[\r\t]$/N; s/[\r\t]\n//g; ta' | recode CP866..CP1251 | sed -e 's/\t//g; s/~\r//g; s/~,/,/g; s/~/\t/g' >>${_dbf}/.sql
    echo "\\." >>${_dbf}/.sql
  done

  # интеграция процедуры обновления базы - последовательное подключение всех sql-файлов импорта
  ls ${dir}/import/*.sql | xargs -l readlink -f | xargs -l -I{} cat {} >>${_dbf}/.sql
  # вставка метки обновления
  echo "INSERT INTO kladr_upd(lm, hostname) VALUES('$lm', '`hostname`');" >>${_dbf}/.sql
fi
echo "COMMIT;" >>${_dbf}/.sql

cd $dir
rm -rf $tmp

echo "`date '+%F %T'` ==== Processing SQL"
psql -h $pghost -p $pgport -U $pguser -w -f ${_dbf}/.sql $pgbase
rv="$?"

if [ "$rv" = "0" ]; then
  rm -rf ${_dbf}/ 2>/dev/null
fi
echo "`date '+%F %T'` ==== Exit : $rv"

exit "$rv"

Тут можно использовать для импорта и временные каталоги, создаваемые через mktemp, но хватит ли у вас места на tmp-разделе?…

Реквизиты доступа к базе и КЛАДР-источник в нашем случае будут храниться в app.conf:

pghost="kladr.tensor.ru"
pgport="5432"
pguser="postgres"
pgpass="postgres"
pgbase="kladr"
source="https://gnivc.ru/html/gnivcsoft/KLADR/Base.7z"

Поисковая база

А зачем нам вообще нужна какая-то другая структура? Чем нас не устраивают таблицы в оригинальном КЛАДР-архиве?

  • хранение адресных объектов (улиц и населенных пунктов) в разных структурах

  • хранение статуса/версии объекта (00, 51, …) вместе с его кодом

  • невозможность наложить эффективные для поиска индексы

Напомню, что оригинальный код КЛАДР, согласно документации имеет вид СС РРР ГГГ ППП УУУУ АА, где:

  • СС — код субъекта РФ (региона)

  • РРР — код района

  • ГГГ — код города

  • ППП — код населенного пункта

  • УУУУ — код улицы (отсутствует у населенных пунктов)

  • АА — признак актуальности

То есть если вынести признак актуальности в отдельное поле, то у кодов многих объектов (например, городов) в конце окажутся нули, которые стоит безболезненно отсечь. И тогда коды будут иметь строго ограниченный набор длин в соответствии с «уровнем» объекта:

  • 2 — регионы и города федерального подчинения (77 — г. Москва, 76 — Ярославская обл.)

  • 5 — районы (76 001 — Ярославская обл., Ярославский р-н)

  • 8 — города (76 000 001 — Ярославская обл., г. Ярославль)

  • 11 — населенные пункты (76 001 000 008 — Ярославская обл., Ярославский р-н, д. Алешково)

  • 15 — улицы (76 001 000 008 0001 — Ярославская обл., Ярославский р-н, д. Алешково, ул. Малиновая)

При этом, как видим, коды «вышестоящих» объектов становятся префиксами кодов объектов вложенных.

Что же получилось по структуре?

image-loader.svg

-- основная таблица хранения адресных объектов
CREATE TABLE kladr(
  code
    varchar,
  status
    varchar,
  name
    varchar,
  abbr
    varchar,
  idx
    varchar,
  ifns
    varchar,
  ocato
    varchar,
  lvl
    smallint,
  norm
    varchar,
  PRIMARY KEY(code, status)
);
-- индекс по почтовому индексу
CREATE INDEX "kladr-idx"
  ON kladr(idx);
-- префиксный индекс по названию объекта
CREATE INDEX "kladr-nm"
  ON kladr(length(code), code varchar_pattern_ops, lower(name));

-- дома, привязанные к адресным объектам
CREATE TABLE kladr_house(
  code
    varchar,
  codeExt
    varchar,
  name
    varchar,
  idx
    varchar,
  ifns
    varchar,
  ocato
    varchar,
  PRIMARY KEY(code, codeExt)
);

-- замены и объединения объектов
CREATE TABLE kladr_repl(
  oldCode
    varchar,
  newCode
    varchar,
  PRIMARY KEY(oldCode, newCode)
);
CREATE INDEX "kladr-repl-new"
  ON kladr_repl(newCode);
CREATE INDEX "kladr-repl-old"
  ON kladr_repl(oldCode);

-- аббревиатуры административно-территориальных единиц ("ул", "пер", "г")
CREATE TABLE kladr_abbr(
  code
    varchar,
  lvl
    smallint,
  name
    varchar,
  PRIMARY KEY(code, lvl)
);
-- индекс по возможному "уровню" объекта
CREATE INDEX "kladr-abbr-lvl"
  ON kladr_abbr(lvl);

Это основные таблицы, данные в которые импортируются непосредственно из соответствующих DBF по модели наложения «диффов», описанной в статье «DBA: грамотно организовываем синхронизации и импорты»:

kladr-abbr

--// КЛАДР : сокращения

-- создаем временную таблицу с импортируемыми данными КЛАДР'а
CREATE TEMPORARY TABLE _kladr_abbr(
  LIKE kladr_abbr INCLUDING INDEXES
);

-- заполняем преобразованными из DBF данными
INSERT INTO _kladr_abbr(code, lvl, name)
  SELECT
    "SCNAME",
    "KOD_T_ST"::smallint,
    "SOCRNAME"
  FROM
    "SOCRBASE.DBF";

-- удаляем отсутствующие
DELETE FROM
  kladr_abbr T
USING
  kladr_abbr X LEFT JOIN
  _kladr_abbr Y
    USING(code, lvl)
WHERE
  (T.code, T.lvl) = (X.code, X.lvl) AND
  Y IS NULL;

-- обновляем оставшиеся
UPDATE
  kladr_abbr kl
SET
  name = kli.name
FROM
  _kladr_abbr kli
WHERE
  (kl.code, kl.lvl) = (kli.code, kli.lvl) AND
  (
    kl.name
  ) IS DISTINCT FROM
  (
    kli.name
  );

-- очищаем совпадающие
DELETE FROM
  _kladr_abbr kli
USING
  kladr_abbr kl
WHERE
  (kli.code, kli.lvl) = (kl.code, kl.lvl);

-- вставляем оставшиеся
INSERT INTO kladr_abbr
  SELECT
    *
  FROM
    _kladr_abbr;

kladr

--// КЛАДР : от регионов до улиц

-- создаем временную таблицу с импортируемыми данными КЛАДР'а
CREATE TEMPORARY TABLE _kladr(
  LIKE kladr INCLUDING INDEXES
);

-- заполняем преобразованными из DBF данными
INSERT INTO _kladr(code, status, name, abbr, idx, ifns, ocato, lvl)
  SELECT DISTINCT ON(code, status)
    *
  FROM
    (
      SELECT
        regexp_replace(rpad(substr("CODE", 1, length("CODE") - 2), 15, '0'), '^(.{0,}?)((((0{3})?0{3})?0{3})?0{4})?$', E'\\1', 'ig') code,
        substr("CODE", length("CODE") - 1, 2) status,
        "NAME",
        "SOCR",
        nullif("INDEX", ''),
        nullif("GNINMB", ''),
        nullif("OCATD", ''),
        "STATUS"
      FROM
        (
          SELECT
            "CODE",
            "NAME",
            "SOCR",
            "INDEX",
            "GNINMB",
            "OCATD",
            "STATUS"::smallint
          FROM
            "KLADR.DBF"
        UNION ALL
          SELECT
            "CODE",
            "NAME",
            "SOCR",
            "INDEX",
            "GNINMB",
            "OCATD",
            NULL::smallint "STATUS"
          FROM
            "STREET.DBF"
        ) T
    ) T;

-- удаляем отсутствующие
DELETE FROM
  kladr T
USING
  kladr X LEFT JOIN
  _kladr Y
    USING(code, status)
WHERE
  (T.code, T.status) = (X.code, X.status) AND
  Y IS NULL;

-- обновляем оставшиеся
UPDATE
  kladr kl
SET
  (
    name,
    abbr,
    idx,
    ifns,
    ocato,
    lvl
  ) =
  (
    kli.name,
    kli.abbr,
    kli.idx,
    kli.ifns,
    kli.ocato,
    kli.lvl
  )
FROM
  _kladr kli
WHERE
  (kl.code, kl.status) = (kli.code, kli.status) AND
  (
    kl.name,
    kl.abbr,
    kl.idx,
    kl.ifns,
    kl.ocato,
    kl.lvl
  ) IS DISTINCT FROM
  (
    kli.name,
    kli.abbr,
    kli.idx,
    kli.ifns,
    kli.ocato,
    kli.lvl
  );

-- очищаем совпадающие
DELETE FROM
  _kladr kli
USING
  kladr kl
WHERE
  (kli.code, kli.status) = (kl.code, kl.status);

-- вставляем оставшиеся
INSERT INTO kladr
  SELECT
    *
  FROM
    _kladr;

-- обновляем поисковый кэш
DELETE FROM
  kladr_kw
WHERE
  (code, status) IN (
    SELECT
      (ro).code,
      (ro).status
    FROM
      kladr$log
    WHERE
      ro IS DISTINCT FROM NULL
  );

INSERT INTO
  kladr_kw(code, status, keyword)
SELECT DISTINCT
  code,
  status,
  kw
FROM
  (
    SELECT
      (rn).code,
      (rn).status,
      regexp_split_to_table(lower((rn).name), E'[^\\-a-zа-яё0-9]+', 'i') kw
    FROM
      kladr$log
    WHERE
      rn IS DISTINCT FROM NULL
  ) T
WHERE
  kw <> '';

DELETE FROM kladr$log;

Здесь регулярное выражение используется для отсечения «хвостовых» нулей по маске до необходимой нам длины. То есть нельзя просто так взять 76 000 010 000 и убрать все 4 последних ноля, поскольку 010 тут является значимым кодом города.

kladr-house

--// КЛАДР : дома

-- создаем временную таблицу с импортируемыми данными КЛАДР'а
CREATE TEMPORARY TABLE _kladr_house(
  LIKE kladr_house INCLUDING INDEXES
);

-- заполняем преобразованными из DBF данными
INSERT INTO _kladr_house(code, codeExt, name, idx, ifns, ocato)
  SELECT
    regexp_replace(substr("CODE", 1, 15), '^(.{0,}?)((((0{3})?0{3})?0{3})?0{4})?$', E'\\1', 'ig'),
    substr("CODE", 16, 4),
    "NAME",
    nullif("INDEX", ''),
    nullif("GNINMB", ''),
    nullif("OCATD", '')
  FROM
    "DOMA.DBF";

-- удаляем отсутствующие
DELETE FROM
  kladr_house T
USING
  kladr_house X LEFT JOIN
  _kladr_house Y
    USING(code, codeExt)
WHERE
  (T.code, T.codeExt) = (X.code, X.codeExt) AND
  Y IS NULL;

-- обновляем оставшиеся
UPDATE
  kladr_house kl
SET
  (
    name,
    idx,
    ifns,
    ocato
  ) =
  (
    kli.name,
    kli.idx,
    kli.ifns,
    kli.ocato
  )
FROM
  _kladr_house kli
WHERE
  (kl.code, kl.codeExt) = (kli.code, kli.codeExt) AND
  (
    kl.name,
    kl.idx,
    kl.ifns,
    kl.ocato
  ) IS DISTINCT FROM
  (
    kli.name,
    kli.idx,
    kli.ifns,
    kli.ocato
  );

-- очищаем совпадающие
DELETE FROM
  _kladr_house kli
USING
  kladr_house kl
WHERE
  (kli.code, kli.codeExt) = (kl.code, kl.codeExt);

-- вставляем оставшиеся
INSERT INTO kladr_house
  SELECT
    *
  FROM
    _kladr_house;

-- обновляем поисковый кэш
DELETE FROM
  kladr_hs
WHERE
  (code) IN (
      SELECT
        (ro).code
      FROM
        kladr_house$log
      WHERE
        ro IS DISTINCT FROM NULL
    UNION ALL
      SELECT
        (rn).code
      FROM
        kladr_house$log
      WHERE
        rn IS DISTINCT FROM NULL
  );

-- заполняем преобразованными данными
CREATE TEMPORARY TABLE _kladr_hs0 AS
  SELECT DISTINCT ON(code, house)
    code,
    idx,
    ifns,
    ocato,
    unnest(houses) house
  FROM
    (
      SELECT
        *,
        CASE
          WHEN _range IS NULL AND name ~ E'_' THEN ARRAY[regexp_replace(name, '_', '-')]
          WHEN _range IS NULL THEN ARRAY[name]
          WHEN _range IS NOT NULL THEN ARRAY(
            SELECT
              i::text
            FROM
              generate_series(_range[1]::integer + CASE WHEN _range[4] IS NOT NULL THEN (_range[1]::integer + _range[4]::integer) % 2 ELSE 0 END, _range[2]::integer, _range[3]::integer) i
          )
          ELSE NULL
        END houses
      FROM
        (
          SELECT
            code,
            idx,
            ifns,
            ocato,
            name,
            CASE
              WHEN name ~ E'^Н\\(\\d+-\\d+\\)$' THEN regexp_split_to_array(substr(name, 3, length(name) - 3), '-') || '2'::text || '1'::text
              WHEN name ~ E'^Ч\\(\\d+-\\d+\\)$' THEN regexp_split_to_array(substr(name, 3, length(name) - 3), '-') || '2'::text || '0'::text
              WHEN name = 'Н' THEN '{1,999,2}'::text[]
              WHEN name = 'Ч' THEN '{2,998,2}'::text[]
              WHEN name ~ E'^\\d+-\\d+$' THEN regexp_split_to_array(name, '-') || '1'::text
              ELSE NULL
            END _range
          FROM
            (
              SELECT
                code,
                idx,
                ifns,
                ocato,
                unnest(regexp_split_to_array(upper(name), ',')) "name"
              FROM
                kladr_house
              WHERE
                (code) IN (
                    SELECT
                      (ro).code
                    FROM
                      kladr_house$log
                    WHERE
                      ro IS DISTINCT FROM NULL
                  UNION ALL
                    SELECT
                      (rn).code
                    FROM
                      kladr_house$log
                    WHERE
                      rn IS DISTINCT FROM NULL
                )
            ) T
        ) T
    ) T
ORDER BY
  code, house, (_range IS NULL) DESC;

CREATE INDEX ON _kladr_hs0(code, house, idx DESC NULLS LAST);

CREATE TEMPORARY TABLE _kladr_hs1 AS
  SELECT DISTINCT ON (code, house)
    code,
    idx,
    ifns,
    ocato,
    house
  FROM
    _kladr_hs0
  ORDER BY
    code, house, idx DESC NULLS LAST;

CREATE INDEX ON _kladr_hs1(code, house);

CREATE TEMPORARY TABLE _kladr_hs2 AS
  SELECT
    code,
    coalesce(
      idx, 
      coalesce(
        (
          SELECT
            idx
          FROM
            _kladr_hs1
          WHERE
            (code, house) = (T.code, regexp_replace(T.house, E'^(\\d+)(\\D)?.*$', E'\\1', 'ig'))
          LIMIT 1
        ), 
        coalesce(
          (
            SELECT
              idx
            FROM
              kladr
            WHERE
              code IN (
                substr(T.code, 1, 15),
                substr(T.code, 1, 11),
                substr(T.code, 1,  8),
                substr(T.code, 1,  5),
                substr(T.code, 1,  2)
              ) AND
--              status = '00' AND
              idx IS NOT NULL
            ORDER BY
              length(code) DESC
            LIMIT 1
          ),
          ''
        )
      )
    ) idx,
    ifns,
    ocato,
    house
  FROM
    _kladr_hs1 T;

CREATE INDEX ON _kladr_hs2(code, idx, ifns, ocato, house);

INSERT INTO kladr_hs(code, idx, ifns, ocato, houses)
  SELECT
    code,
    idx,
    ifns,
    ocato,
    array_agg(house ORDER BY house) houses
  FROM
    _kladr_hs2
  GROUP BY
    1, 2, 3, 4;

DELETE FROM kladr_house$log;

Здесь регулярными выражениями мы приводим форматы исходной базы в списки конкретных номеров домов:

  • 5-11 -> {5, 6, 7, 8, 9, 10, 11}

  • Н(5-11) -> {5, 7, 9, 11}

  • Ч(6-10) -> {6, 8, 10}

  • Н -> {1, 3, ..., 999}

  • Ч -> {2, 4, ..., 998}

kladr-repl

--// КЛАДР : замены

-- создаем временную таблицу с импортируемыми данными КЛАДР'а
CREATE TEMPORARY TABLE _kladr_repl(
  LIKE kladr_repl INCLUDING INDEXES
);

-- заполняем преобразованными из DBF данными
INSERT INTO _kladr_repl(oldCode, newCode)
  SELECT DISTINCT
    co,
    cn
  FROM
    (
      SELECT
        regexp_replace(rpad(co, 15, '0'), '^(.{0,}?)((((0{3})?0{3})?0{3})?0{4})?$', E'\\1', 'ig') co,
        so,
        regexp_replace(rpad(cn, 15, '0'), '^(.{0,}?)((((0{3})?0{3})?0{3})?0{4})?$', E'\\1', 'ig') cn,
        sn
      FROM
        (
          SELECT
            *,
            substr("OLDCODE", 1, length("OLDCODE") - 2) co,
            substr("OLDCODE", length("OLDCODE") - 1, 2) so,
            substr("NEWCODE", 1, length("NEWCODE") - 2) cn,
            substr("NEWCODE", length("NEWCODE") - 1, 2) sn
          FROM
            "ALTNAMES.DBF"
        ) T
    ) T;

-- удаляем отсутствующие
DELETE FROM
  kladr_repl T
USING
  kladr_repl X LEFT JOIN
  _kladr_repl Y
    USING(oldCode, newCode)
WHERE
  (T.oldCode, T.newCode) = (X.oldCode, X.newCode) AND
  Y IS NULL;

-- очищаем совпадающие
DELETE FROM
  _kladr_repl kli
USING
  kladr_repl kl
WHERE
  (kli.oldCode, kli.newCode) = (kl.oldCode, kl.newCode);

-- вставляем оставшиеся
INSERT INTO kladr_repl
  SELECT
    *
  FROM
    _kladr_repl;

Тут мы использовали еще две «производные» таблицы, которые выступают в качестве поисковых индексов — kladr_kw и kladr_hs:

-- ключевые слова адресных объектов
CREATE TABLE kladr_kw(
  code
    varchar,
  status
    varchar,
  keyword
    varchar
);
CREATE INDEX "kladr-kw-cd"
  ON kladr_kw(code, status);
CREATE INDEX "kladr-kw-kwcd"
  ON kladr_kw(keyword varchar_pattern_ops, code varchar_pattern_ops);
CREATE INDEX "kladr-kw-cdkw"
  ON kladr_kw(code varchar_pattern_ops, keyword varchar_pattern_ops);

-- группы номеров домов с одинаковыми признаками
CREATE TABLE kladr_hs(
  code
    varchar,
  idx
    varchar,
  ifns
    varchar,
  ocato
    varchar,
  houses
    varchar[]
);
CREATE INDEX "kladr-hs-code"
  ON kladr_hs(code);
CREATE INDEX "kladr-hs-idx"
  ON kladr_hs(idx);

-- специальные "уровневые" индексы
DO $$
DECLARE
  ln integer[] = '{2,5,8,11,15}'::integer[];
BEGIN
  FOR i IN 1..array_length(ln, 1) LOOP
    EXECUTE '
CREATE INDEX "kladr-' || lpad(ln[i]::text, 2, '0') || '"
  ON kladr(code varchar_pattern_ops, status)
WHERE
  length(code) = ' || ln[i] || ';
CREATE INDEX "kladr-kw-' || lpad(ln[i]::text, 2, '0') || '"
  ON kladr_kw(keyword varchar_pattern_ops)
WHERE
  length(code) = ' || ln[i] || ';
CREATE INDEX "kladr-kw-kwcd-' || lpad(ln[i]::text, 2, '0') || '"
  ON kladr_kw(keyword varchar_pattern_ops, code varchar_pattern_ops)
WHERE
  length(code) = ' || ln[i] || ';
    ';
  END LOOP;
END$$ LANGUAGE plpgsql;

Зачем нам понадобятся такие дополнительные структуры, рассмотрим в следующей части статьи.

© Habrahabr.ru