Высокоуровневая репликация в СУБД Tarantool

Привет, я занимаюсь созданием приложений для СУБД Tarantool — это разработанная в Mail.ru Group платформа, совмещающая в себе высокопроизводительную СУБД и сервер приложений на языке Lua. Высокая скорость работы решений, основанных на Tarantool, достигается в частности за счет поддержки in-memory режима СУБД и возможности выполнения бизнес-логики приложения в едином адресном пространстве с данными. При этом обеспечивается персистентность данных с использованием ACID-транзакций (на диске ведется WAL-журнал). В Tarantool имеется встроенная поддержка репликации и шардирования. Начиная с версии 2.1, поддерживаются запросы на языке SQL. Tarantool имеет открытый исходный код и распространяется под лицензией Simplified BSD. Также имеется коммерческая Enterprise-версия.

4aa22fd046775dd3b44af444ddcde28d.jpg
Feel the power! (…aka enjoy the performance)

Все перечисленное делает Tarantool привлекательной платформой для создания высоконагруженных приложений, работающих с БД. В таких приложениях часто возникает необходимость репликации данных.
Как было сказано выше, в Tarantool есть встроенная репликация данных. Принцип ее работы заключается в последовательном выполнении на репликах всех транзакций, содержащихся в журнале мастера (WAL). Обычно такая репликация (будем далее называть ее низкоуровневой) используется для обеспечения отказоустойчивости приложения и/или для распределения нагрузки на чтение между нодами кластера.

d1c31b83b1c71220a514deebb7d49952.png
Рис. 1. Репликация внутри кластера

Примером альтернативного сценария может служить передача данных, созданных в одной БД, в другую БД для обработки/мониторинга. В последнем случае более удобным решением может оказаться использование высокоуровневой репликации — репликации данных на уровне бизнес-логики приложения. Т.е. мы не используем готовое решение, встроенное в СУБД, а своими силами реализуем репликацию внутри разрабатываемого нами приложения. У такого подхода есть как преимущества, так и недостатки. Перечислим плюсы.

1. Экономия трафика:

  • можно передавать не все данные, а только их часть (например, можно передавать только некоторые таблицы, некоторые их столбцы или записи, соответствующие определенному критерию);
  • в отличие от низкоуровневой репликации, которая выполняется непрерывно в асинхронном (реализовано в текущей версии Tarantool — 1.10) или синхронном (будет реализовано в последующих версиях Tarantool) режиме, высокоуровневую репликацию можно выполнять сеансами (т.е. приложение сначала выполняет синхронизацию данных — сеанс обмена данными, затем наступает пауза в репликации, после которой происходит следующий сеанс обмена и т.д.);
  • если запись изменилась несколько раз, можно передать только ее последнюю версию (в отличие от низкоуровневой репликации, при которой на репликах будут последовательно проиграны все изменения, сделанные на мастере).


2. Отсутствуют сложности с реализацией обмена по HTTP, что позволяет синхронизировать удаленные БД.

5515517aaa739f6260023c5e8ff30bf0.png
Рис. 2. Репликация по HTTP

3. Структуры БД, между которыми передаются данные, не обязаны быть одинаковыми (более того, в общем случае возможно даже использование разных СУБД, языков программирования, платформ и т.п.).

83085a64b69ff5dba2431db97213fa82.png
Рис. 3. Репликация в гетерогенных системах

Минус заключается в том, что в среднем программирование сложнее/затратнее, чем конфигурирование, и вместо настройки встроенного функционала придется реализовывать свой собственный.

Если в вашей ситуации приведенные плюсы играют решающее значение (или являются необходимым условием), то имеет смысл использовать высокоуровневую репликацию. Рассмотрим несколько способов реализации высокоуровневой репликации данных в СУБД Tarantool.

Минимизация трафика


Итак, одним из преимуществ высокоуровневой репликации является экономия трафика. Для того чтобы это преимущество проявилось в полной мере, необходимо минимизировать количество данных, передаваемых при каждом сеансе обмена. Разумеется, при этом не стоит забывать о том, что в конце сеанса приемник данных должен быть синхронизирован с источником (как минимум по той части данных, которая участвует в репликации).

Как же минимизировать количество данных, передаваемых при высокоуровневой репликации? Решением «в лоб» может быть отбор данных по дате-времени. Для этого можно использовать уже имеющееся в таблице поле даты-времени (если оно есть). Например, у документа «заказ» может быть поле «требуемое время исполнения заказа» — delivery_time. Проблема такого решения заключается в том, что значения в этом поле не обязаны располагаться в последовательности, соответствующей созданию заказов. Таким образом, мы не можем запомнить максимальное значение поля delivery_time, переданное при предыдущем сеанса обмена, и при следующем сеансе обмена отобрать все записи с более высоким значением поля delivery_time. В промежутке между сеансами обмена могли добавиться записи с меньшим значением поля delivery_time. Также заказ мог претерпеть изменения, которые тем не менее не затронули поле delivery_time. В обоих случаях изменения не будут переданы с источника в приемник. Для решения этих проблем нам потребуется передавать данные «внахлест». Т.е. при каждом сеансе обмена мы будем передавать все данные со значением поля delivery_time, превышающим некоторый момент в прошлом (например, N часов от текущего момента). Однако очевидно, что для крупных систем такой подход является сильно избыточным и может свести экономию трафика, к которой мы стремимся, на нет. Кроме того, в передаваемой таблице может не быть поля, связанного с датой-временем.

Другое решение, более сложное с точки зрения реализации, заключается в подтверждении получения данных. В этом случае при каждом сеансе обмена передаются все данные, получение которых не подтверждено получателем. Для реализации потребуется добавить в таблицу-источник булевскую колонку (например, is_transferred). Если приемник подтверждает получение записи, соответствующее поле принимает значение true, после чего запись более не участвует в обменах. Такой вариант реализации имеет следующие минусы. Во-первых, для каждой переданной записи необходимо сгенерировать и отправить подтверждение. Грубо говоря, это может быть сопоставимо с удвоением количества передаваемых данных и привести к удвоению количества раундтрипов. Во-вторых, отсутствует возможность отправки одной и той же записи в несколько приемников (первый получивший приемник подтвердит получение за себя и за всех остальных).

Способ, лишенный недостатков, приведенных выше, состоит в добавлении в передаваемую таблицу колонки для отслеживания изменений ее строк. Такая колонка может иметь тип дата-время и должна задаваться/обновляться приложением на текущее время каждый раз при добавлении/изменении записей (атомарно с добавлением/изменением). В качестве примера назовем колонку update_time. Сохранив максимальное значение поля этой колонки для переданных записей, мы сможем начать следующий сеанс обмена с этого значения (отобрать записи со значением поля update_time, превышающим сохраненное ранее значение). Проблема, связанная с последним подходом, заключается в том, что изменения данных могут происходить в пакетном режиме. В результате значения полей в колонке update_time могут быть не уникальными. Таким образом, эта колонка не может быть использована для порционной (постраничной) выдачи данных. Для постраничной выдачи данных придется изобретать дополнительные механизмы, которые, скорее всего, будут иметь очень низкую эффективность (например, извлечение из БД всех записей со значением update_time выше заданного и выдача определенного количества записей, начиная с некоторого смещения от начала выборки).

Можно повысить эффективность передачи данных, немного усовершенствовав предыдущий подход. Для этого в качестве значений полей колонки для отслеживания изменений будем использовать целочисленный тип (длинное целое). Назовем колонку row_ver. Значение поля этой колонки по-прежнему должно задаваться/обновляться каждый раз при создании/изменении записи. Но в данном случае полю будет присваиваться не текущее дата-время, а значение некоторого счетчика, увеличенного на единицу. В результате колонка row_ver будет содержать уникальные значения и сможет быть использована не только для выдачи «дельты» данных (данных, добавившихся/изменившихся после завершения предыдущего сеанса обмена), но и для простой и эффективной разбивки их на страницы.

Последний предложенный способ минимизации количества данных, передаваемых в рамках высокоуровневой репликации, представляется мне наиболее оптимальным и универсальным. Остановимся на нем подробнее.

Передача данных с использованием счетчика версий строк


Реализация серверной/master части


В MS SQL Server для реализации подобного подхода существует специальный тип колонки — rowversion. Каждая БД имеет счетчик, который увеличивается на единицу каждый раз при добавлении/изменении записи в таблице, имеющей колонку типа rowversion. Значение этого счетчика автоматически присваивается полю этой колонки в добавившейся/изменившейся записи. СУБД Tarantool не имеет аналогичного встроенного механизма. Однако в Tarantool его несложно реализовать вручную. Рассмотрим, как это делается.

Для начала немного терминологии: таблицы в Tarantool называются спейсами (space), а записи — кортежами (tuple). В Tarantool можно создавать последовательности (sequence). Последовательности представляют из себя не что иное, как именованные генераторы упорядоченных значений целых чисел. Т.е. это как раз то, что нужно для наших целей. Ниже мы создадим такую последовательность.

Прежде чем выполнить какую-либо операцию с базой данных в Tarantool, необходимо выполнить следующую команду:

box.cfg{}


В результате Tarantool начнет записывать в текущий каталог снимки БД (snapshot) и журнал транзакций.

Создадим последовательность row_version:

box.schema.sequence.create('row_version',
    { if_not_exists = true })


Опция if_not_exists позволяет выполнять скрипт создания многократно: если объект существует, Tarantool не будет пытаться создать его повторно. Эта опция будет использоваться во всех последующих DDL-командах.

Создадим спейс для примера.

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        },
        {
            name = 'row_ver',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})


Здесь мы задали имя спейса (goods), имена полей и их типы.

Автоинкрементные поля в Tarantool создаются тоже с помощью последовательностей. Создадим автоинкрементный первичный ключ по полю id:

box.schema.sequence.create('goods_id',
    { if_not_exists = true })
box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})


Tarantool поддерживает несколько типов индексов. Чаще всего используются индексы типов TREE и HASH, в основе которых лежат соответствующие наименованию структуры. TREE — наиболее универсальный тип индекса. Он позволяет извлекать данные в упорядоченном виде. Но для выбора по равенству больше подходит HASH. Соответственно, для первичного ключа целесообразно использовать HASH (что мы и сделали).

Чтобы использовать колонку row_ver для передачи изменившихся данных, необходимо привязать к полям этой колонки значения последовательности row_ver. Но в отличии от первичного ключа, значение поля колонки row_ver должно увеличиваться на единицу не только при добавлении новых записей, но и при изменении существующих. Для этого можно использовать триггеры. В Tarantool есть два типа триггеров для спейсов: before_replace и on_replace. Триггеры запускаются при каждом изменении данных в спейсе (для каждого кортежа, затронутого изменениями, запускается функция триггера). В отличие от on_replace, before_replace-триггеры позволяют модифицировать данные кортежа, для которого выполняется триггер. Соответственно, нам подходит последний тип триггеров.

box.space.goods:before_replace(function(old, new)
    return box.tuple.new({new[1], new[2], new[3],
        box.sequence.row_version:next()})
end)


Приведенный триггер заменяет значение поля row_ver сохраняемого кортежа на очередное значение последовательности row_version.

Для того чтобы можно было извлекать данные из спейса goods по колонке row_ver, создадим индекс:

box.space.goods:create_index('row_ver', {
    parts = { 'row_ver' },
    unique = true,
    type = 'TREE',
    if_not_exists = true
})


Тип индекса — дерево (TREE), т.к. данные нам потребуется извлекать в порядке возрастания значений в колонке row_ver.

Добавим в спейс некоторые данные:

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}


Т.к. первое поле — автоинкрементный счетчик, передаем вместо него nil. Tarantool автоматически подставит очередное значение. Аналогичным образом в качестве значения полей колонки row_ver можно передать nil — или не указывать значение вообще, т.к. эта колонка занимает последнюю позицию в спейсе.

Проверим результат вставки:

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1]
  - [2, 'pencil', 321, 2]
  - [3, 'brush', 100, 3]
  - [4, 'watercolour', 456, 4]
  - [5, 'album', 101, 5]
  - [6, 'notebook', 800, 6]
  - [7, 'rubber', 531, 7]
  - [8, 'ruler', 135, 8]
...


Как видим, первое и последнее поле заполнились автоматически. Теперь будет несложно написать функцию для постраничной выгрузки изменений спейса goods:

local page_size = 5
local function get_goods(row_ver)
    local index = box.space.goods.index.row_ver
    local goods = {}
    local counter = 0
    for _, tuple in index:pairs(row_ver, {
        iterator = 'GT' }) do
        local obj = tuple:tomap({ names_only = true })
        table.insert(goods, obj)
        counter = counter + 1
        if counter >= page_size then
            break
        end
    end
    return goods
end


Функция принимает в качестве параметра значение row_ver, начиная с которого необходимо осуществить выгрузку изменений, и возвращает порцию изменившихся данных.

Выборка данных в Tarantool производится через индексы. Функция get_goods использует итератор по индексу row_ver для получения изменившихся данных. Тип итератора — GT (Greater Than, больше чем). Это означает, что итератор будет осуществлять последовательный обход значений индекса начиная с переданного ключа (значения поля row_ver).

Итератор возвращает кортежи. Чтобы впоследствии иметь возможность передать данные по HTTP, необходимо выполнить преобразование кортежей к структуре, удобной для последующей сериализации. В примере для этого используется стандартная функция tomap. Вместо использования tomap можно написать собственную функцию. Например, мы можем захотеть переименовать поле name, не передавать поле code и добавить поле comment:

local function unflatten_goods(tuple)
    local obj = {}
    obj.id = tuple.id
    obj.goods_name = tuple.name
    obj.comment = 'some comment'
    obj.row_ver = tuple.row_ver
    return obj
end


Размер страницы выдаваемых данных (количество записей в одной порции) определяется переменной page_size. В примере значение page_size равно 5. В реальной программе размер страницы обычно имеет большее значение. Он зависит от среднего размера кортежа спейса. Оптимальный размер страницы можно подобрать опытным путем, замеряя время передачи данных. Чем больше размер страницы, тем меньше количество раундтрипов между передающей и принимающей стороной. Так можно уменьшить общее время выгрузки изменений. Однако при слишком большом размере страницы мы будем слишком долго занимать сервер сериализацией выборки. В результате могут возникнуть задержки при обработке других запросов, пришедших на сервер. Параметр page_size можно загрузить из конфигурационного файла. Для каждого передаваемого спейса можно задать свое значение. При этом для большинства спейсов может подойти значение по умолчанию (например, 100).

Выполним функцию get_goods:

tarantool> get_goods(0)

---
- - row_ver: 1
    code: 123
    name: pen
    id: 1
  - row_ver: 2
    code: 321
    name: pencil
    id: 2
  - row_ver: 3
    code: 100
    name: brush
    id: 3
  - row_ver: 4
    code: 456
    name: watercolour
    id: 4
  - row_ver: 5
    code: 101
    name: album
    id: 5
...


Возьмем значение поля row_ver из последней строки и вновь вызовем функцию:

tarantool> get_goods(5)

---
- - row_ver: 6
    code: 800
    name: notebook
    id: 6
  - row_ver: 7
    code: 531
    name: rubber
    id: 7
  - row_ver: 8
    code: 135
    name: ruler
    id: 8
...


И еще раз:

tarantool> get_goods(8)
---
- []
...


Как видим, при таком использовании функция постранично возвращает все записи спейса goods. За последней страницей следует пустая выборка.

Внесем изменения в спейс:

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}


Мы изменили значение поля name для одной записи и добавили две новых записи.

Повторим последний вызов функции:

tarantool> get_goods(8)
---



- - row_ver: 9
    code: 800
    name: copybook
    id: 6
  - row_ver: 10
    code: 234
    name: clip
    id: 9
  - row_ver: 11
    code: 432
    name: folder
    id: 10
...


Функция вернула изменившуюся и добавившиеся записи. Таким образом, функция get_goods позволяет получать данные, изменившиеся с момента ее последнего вызова, что и является основой рассматриваемого способа репликации.

Оставим выдачу результатов по HTTP в виде JSON за рамками настоящей статьи. Об этом можно прочитать здесь: https://habr.com/ru/company/mailru/blog/272141/

Реализация клиентской/slave части


Рассмотрим, как выглядит реализация принимающей стороны. Создадим на принимающей стороне спейс для хранения загруженных данных:

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})


Структура спейса напоминает структуру спейса в источнике. Но поскольку мы не собираемся передавать полученные данные куда-либо еще, колонка row_ver в спейсе получателя отсутствует. В поле id будут записываться идентификаторы источника. Поэтому на стороне приемника нет необходимости делать его автоинкрементным.

Кроме этого, нам потребуется спейс для сохранения значений row_ver:

box.schema.space.create('row_ver', {
    format = {
        {
            name = 'space_name',
            type = 'string'

        },
        {
            name = 'value',
            type = 'string'

        }
    },
    if_not_exists = true
})

box.space.row_ver:create_index('primary', {
    parts = { 'space_name' },
    unique = true,
    type = 'HASH',
    if_not_exists = true
})


Для каждого загружаемого спейса (поле space_name) будем сохранять здесь последнее загруженное значение row_ver (поле value). В качестве первичного ключа выступает колонка space_name.

Создадим функцию для загрузки данных спейса goods по HTTP. Для этого нам потребуется библиотека, реализующая HTTP-клиент. Следующая строка загружает библиотеку и создает экземпляр HTTP-клиента:

local http_client = require('http.client').new()


Также нам потребуется библиотека для десериализации json:

local json = require('json')


Этого достаточно для создания функции загрузки данных:

local function load_data(url, row_ver)
    local url = ('%s?rowVer=%s'):format(url,
        tostring(row_ver))
    local body = nil
    local data = http_client:request('GET', url, body, {
        keepalive_idle =  1,
        keepalive_interval = 1
    })
    return json.decode(data.body)
end


Функция выполняет HTTP-запрос по адресу url, передает в него row_ver в качестве параметра и возвращает десериализованный результат запроса.

Функция сохранения полученных данных выглядит следующим образом:

local function save_goods(goods)
    local n = #goods
    box.atomic(function()
        for i = 1, n do
            local obj = goods[i]
            box.space.goods:put(
                obj.id, obj.name, obj.code)
        end
    end)
end


Цикл сохранения данных в спейс goods помещен в транзакцию (для этого используется функция box.atomic) для уменьшения количества операций с диском.

Наконец, функцию синхронизации локального спейса goods с источником можно реализовать так:

local function sync_goods()
    local tuple = box.space.row_ver:get('goods')
    local row_ver = tuple and tuple.value or 0

    —— set your url here:
    local url = 'http://127.0.0.1:81/test/goods/list'

    while true do
        local goods = load_goods(url, row_ver)

        local count = #goods
        if count == 0 then
            return
        end

        save_goods(goods)

        row_ver = goods[count].rowVer
        box.space.row_ver:put({'goods', row_ver})
    end
end


Сначала считываем сохраненное ранее значение row_ver для спейса goods. Если оно отсутствует (первый сеанс обмена), то берем в качестве row_ver ноль. Далее в цикле производим постраничную загрузку измененных данных из источника по указанному url. На каждой итерации сохраняем полученные данные в соответствующий локальный спейс и обновляем значение row_ver (в спейсе row_ver и в переменной row_ver) — берем значение row_ver из последней строки загруженных данных.

Для защиты от случайного зацикливания (в случае ошибки в программе) цикл while можно заменить на for:

for _ = 1, max_req do ...


В результате выполнения функции sync_goods спейс goods в приемнике будет содержать последние версии всех записей спейса goods в источнике.

Очевидно, что таким способом нельзя транслировать удаление данных. Если такая необходимость существует, можно использовать пометку на удаление. Добавляем в спейс goods булевское поле is_deleted и вместо физического удаления записи используем логическое удаление — выставляем значение поля is_deleted в значение true. Иногда вместо булевского поля is_deleted удобнее использовать поле deleted, в котором хранится дата-время логического удаления записи. После выполнения логического удаления помеченная на удаление запись будет передана из источника в приемник (согласно рассмотренной выше логике).

Последовательность row_ver можно использовать для передачи данных других спейсов: нет необходимости в создании отдельной последовательности для каждого передаваемого спейса.

Мы рассмотрели эффективный способ высокоуровневой репликации данных в приложениях, использующих СУБД Tarantool.

Выводы


  1. СУБД Tarantool — привлекательный, перспективный продукт для создания высоконагруженных приложений.
  2. Высокоуровневая репликация данных имеет ряд преимуществ по сравнению с низкоуровневой репликацией.
  3. Рассмотренный в статье способ высокоуровневой репликации позволяет минимизировать количество передаваемых данных путем передачи только тех записей, которые изменились после последнего сеанса обмена.

© Habrahabr.ru