Распределённые блокировки с помощью Tarantool 3

Распределенная блокировка — очень удобный инструмент в кластере, который помогает обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису или запросу в данный момент времени. Так предотвращается гонка за данными и их неконсистентность. Распределенная (или кластерная) блокировка называется так потому, что она обеспечивается несколькими узлами, и выход из строя одного из них не повлияет на приложение. В этой статье я расскажу, как реализовать этот инструмент с помощью Tarantool 3.

Tarantool 3

Tarantool 3 — это новая версия in-memory базы данных, которая лежит в основе промежуточного ПО для хранения и обработки данных Tarantool. Она полностью совместима с файлами и репликацией второй версии.

Перечислю основные характеристики Tarantool 3 в сравнении с Tarantool 2:

  • Cервер с конфигом.

  • Ориентация на кластерность.

  • Удобная система триггеров.

  • Переопределение сетевого API (IPROTO).

  • Упрощение настройки репликации.

  • Имена узлов вместо UUID.

  • Расширенная статистика по потреблению памяти.

  • Значения полей по умолчанию.

Подробнее можно почитать в другой статье на Хабре

Принцип работы

Блокировки будут храниться в таблице (спейсе) с такой структурой:

name

token

expire

Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.

Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable. Для автоматического переключения лидера настроим raft-фейловер.

Напишем хранимую процедуру на Lua для взятия блокировки. Процедура работает следующим образом:

  • Принимает параметры: имя и таймаут.

  • Проверяет, существует ли блокировка. Если нет — создает новую. 

  • Если блокировка существовала, то проверяет, отпущена ли она (expire == 0). 

  • Если отпущена, то наращивает token и устанавливает время expire. 

  • Если уже была взята, процедура возвращает nil.

function _G.acquireLock(name, timeout)

    box.begin({txn_isolation="linearizable", timeout=timeout})

    local lock = box.space.locks:get(name)

    if lock == nil then

        lock = {name, 0, clock.time64() + timeout * 1e9}

        box.space.locks:insert(lock)

        box.commit()

        return lock

    end

    if lock['expire'] == 0 then

        lock = {name, lock['token'] + 1, clock.time64() + timeout * 1e9}

        log.info(lock)

        box.space.locks:put(lock)

        box.commit()

        return lock

    end

    box.commit()

    return nil

end

Теперь напишем процедуру на Lua для отпускания блокировки. Процедура принимает параметры: имя и токен — и проверяет, что токен блокировки совпадает и expire не равен нулю. Тогда блокировка отпускается и процедура возвращает true, иначе — false.

function _G.releaseLock(name, token)

    box.begin({txn_isolation="linearizable"})

    local lock = box.space.locks:get(name)

    if lock == nil then

        box.commit()

        return false

    end

    if lock['token'] == token and lock['expire'] ~= 0 then

        local lock = {name, token, 0}

        box.space.locks:put(lock)

        box.commit()

        return true

    end

    box.commit()

    return false

end

Для создания таблицы (спейса) с блокировками используется фоновая процедура. Она:

  1. Ждет, что узел станет лидером. 

  2. Создает таблицу с необходимыми полями.

  3. Запускает цикл для обработки тайм-аутов блокировок.

fiber.create(function()

    fiber.name("expire-lock-fiber")

    box.ctl.wait_rw()

    box.schema.space.create("locks", {

        is_sync=true,

        if_not_exists=true})

    box.space.locks:format({{name="name", type="string"},

        {name='token', type='unsigned'},

        {name='expire', type='unsigned'}})

    box.space.locks:create_index('name', {

        parts={{field="name", type="string"}},

        if_not_exists=true})

    box.space.locks:create_index('expire', {

        parts={{field="expire", type="unsigned"}},

        unique=false,

        if_not_exists=true})

    box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true})

    while true do

        box.ctl.wait_rw()

        local now = clock.time64()

        for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do

            if t[3] < now then

                local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})

                if not rc then

                    log.info(err)

                    break

                end

            end

        end

        fiber.sleep(1)

    end

end)

Инициализация приложения

Сначала инициализируем рабочее окружение для разработки:

tt init

Создадим директорию будущего приложения. Приложения располагаются в instances.enabled:

mkdir instances.enabled/app

Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера:

touch instances.enabled/app/config.yml

В конфигурации укажем:

  • Что в кластере должно быть три узла.

  • Параметры фейловера для работы кластера.

  • Включение mvcc-режима работы.

  • Файл с хранимыми процедурами.

credentials:

  users:

    client:

      password: "secret"

replication:

  failover: election

database:

  use_mvcc_engine: true

app:

  file: init.lua

groups:

  group-001:

    replicasets:

      replicaset-001:

        replication:

          bootstrap_strategy: config

        bootstrap_leader: instance-001

        instances:

          instance-001:

            iproto:

              listen:

                - uri: 127.0.0.1:3301

          instance-002:

            iproto:

              listen:

                - uri: 127.0.0.1:3302

          instance-003:

            iproto:

              listen:

                - uri: 127.0.0.1:3303

Создадим файл, управляющий запуском локальных узлов кластера:

touch instances.enabled/app/instances.yaml

Укажем три узла для запуска:

instance-001:

instance-002:

instance-003:

Приложение на Lua

Создадим файл init.lua:

touch instances.enabled/app/init.lua

Полный листинг приложения:

local log = require('log')

local fiber = require('fiber')

local clock = require('clock')

log.info("starting application")

function _G.acquireLock(name, timeout)

    box.begin({txn_isolation="linearizable", timeout=timeout})

    local lock = box.space.locks:get(name)

    if lock == nil then

        lock = {name, 0, clock.time64() + timeout * 1e9}

        box.space.locks:insert(lock)

        box.commit()

        return lock

    end

    if lock['expire'] == 0 then

        lock = {name, lock['token'] + 1, clock.time64() + timeout * 1e9}

        box.space.locks:put(lock)

        box.commit()

        return lock

    end

    box.commit()

    return nil

end

function _G.releaseLock(name, token)

    box.begin({txn_isolation="linearizable"})

    local lock = box.space.locks:get(name)

    if lock == nil then

        box.commit()

        return false

    end

    if lock['token'] == token and lock['expire'] ~= 0 then

        local lock = {name, token, 0}

        box.space.locks:put(lock)

        box.commit()

        return true

    end

    box.commit()

    return false

end

fiber.create(function()

    fiber.name("expire-lock-fiber")

    box.ctl.wait_rw()

    box.schema.space.create("locks", {

        is_sync=true,

        if_not_exists=true})

    box.space.locks:format({{name="name", type="string"},

        {name='token', type='unsigned'},

        {name='expire', type='unsigned'}})

    box.space.locks:create_index('name', {

        parts={{field="name", type="string"}},

        if_not_exists=true})

    box.space.locks:create_index('expire', {

        parts={{field="expire", type="unsigned"}},

        unique=false,

        if_not_exists=true})

    box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true})

    while true do

        box.ctl.wait_rw()

        local now = clock.time64()

        for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do

            if t[3] < now then

                local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})

                if not rc then

                    log.info(err)

                    break

                end

            end

        end

        fiber.sleep(1)

    end

end)

Запуск локального кластера

Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt:

tt start

Для проверки статуса узлов выполним команду:

tt status

Для проверки того, что кластер собрался, необходимо подключиться по очереди на узлы и проверить состояние репликации:

tt connect app:instance-001

> box.info.replication

Пример хорошего вывода статуса репликации

---

- 1:

    id: 1

    uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8

    lsn: 16

    upstream:

      status: follow

      idle: 0.92459100019187

      peer: 127.0.0.1:3301

      lag: 7.9154968261719e-05

    name: instance-001

    downstream:

      status: follow

      idle: 0.91285500023514

      vclock: {1: 16, 2: 5186, 3: 5013}

      lag: 0

  2:

    id: 2

    uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8

    lsn: 5186

    name: instance-002

  3:

    id: 3

    uuid: 7f40d300-e9e6-4916-adaa-b669b672256b

    lsn: 5013

    upstream:

      status: follow

      idle: 0.91279700025916

      peer: 127.0.0.1:3303

      lag: 5.7220458984375e-05

    name: instance-003

    downstream:

      status: follow

      idle: 0.91363300010562

      vclock: {1: 16, 2: 5186, 3: 5013}

      lag: 0
...

Пример использования кластерных блокировок на Golang

Tarantool общается с приложениями с помощью msgpack-формата, который иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64, сделаем утилитарную функцию.

func toUint64(v any) uint64 {

	switch t := v.(type) {

	case int, int8, int16, int32, int64:

		return uint64(reflect.ValueOf(t).Int())

	case uint, uint8, uint16, uint32, uint64:

		return reflect.ValueOf(t).Uint()

	default:

		panic("type error")

	}

}

Для общения с кластером Tarantool воспользуемся готовым пулом подключений, который умеет автоматически вычислять, какой узел является лидером:

instances := []pool.Instance{

	{

		Name: "instance-001",

		Dialer: tarantool.NetDialer{

			Address: "127.0.0.1:3301",

		},

	},

	{

		Name: "instance-002",

		Dialer: tarantool.NetDialer{

			Address: "127.0.0.1:3302",

		},

	},

	{

		Name: "instance-003",

		Dialer: tarantool.NetDialer{

			Address: "127.0.0.1:3303",

		},

	},

}

p, err := pool.Connect(context.Background(), instances)

if err != nil {

	panic(err)

}

Создадим структуру для хранения блокировки:

type Lock struct {

	name   string

	token  uint64

	expire uint64

}

Создадим функция для взятия блокировки. Эта функция:

  1. Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.

  2. Принимает в качестве параметров контекст для выполнения запроса, пул соединений с кластером, имя блокировки и тайм-аут блокировки.

  3. В случае успеха вернет объект блокировки, иначе вернет ошибку.

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {

	resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()

	if err != nil {

		return nil, err

	}

	if len(resp) == 0 {

		return nil, fmt.Errorf("no response")

	}

	if resp[0] == nil {

		return nil, fmt.Errorf("failed")

	}

	data := resp[0].([]any)

	result := Lock{

		name:   name,

		token:  toUint64(data[1]),

		expire: toUint64(data[2]),

	}

	return &result, nil

}

Создадим функцию для отпускания блокировки. Эта функция:

  1. Принимает контекст, пул соединений к кластеру, объект с блокировкой. 

  2. Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.

  3. Вернет true, если блокировка успешно отпущена.

  4. Вернет false в случае тех или иных ошибок.

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {

	resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()

	if err != nil {

		return false, err

	}

	if len(resp) == 0 {

		return false, fmt.Errorf("no response")

	}

	return resp[0].(bool), nil

}

А вот как таким объектом блокировки пользоваться

instances := []pool.Instance{

		{

			Name: "instance-001",

			Dialer: tarantool.NetDialer{

				Address: "127.0.0.1:3301",

			},

		},

		{

			Name: "instance-002",

			Dialer: tarantool.NetDialer{

				Address: "127.0.0.1:3302",

			},

		},

		{

			Name: "instance-003",

			Dialer: tarantool.NetDialer{

				Address: "127.0.0.1:3303",

			},

		},

	}

p, err := pool.Connect(context.Background(), instances)

if err != nil {

	panic(err)

}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

defer cancel()

var l *Lock

for i := 0; i < 3; i++ {

	l, err = acquireLock(ctx, p, name, 10)

	if err != nil {

		time.Sleep(100 * time.Millisecond)

		continue

	}

	break

}

if l == nil {

	fmt.Println(name, "already locked")

	return

}

defer func() {

	ok, _ := releaseLock(ctx, p, l)

	if ok {

		fmt.Println(name, "success unlock")

	} else {

		fmt.Println(name, "lock expired")

	}

}()

fmt.Println(name, "success lock")

Весь код main.go

package main

import (

	"context"

	"fmt"

	"reflect"

	"sync"

	"time"

	"github.com/tarantool/go-tarantool/v2"

	_ "github.com/tarantool/go-tarantool/v2/datetime"

	_ "github.com/tarantool/go-tarantool/v2/decimal"

	"github.com/tarantool/go-tarantool/v2/pool"

	_ "github.com/tarantool/go-tarantool/v2/uuid"

	"github.com/tjarratt/babble"

)

func toUint64(v any) uint64 {

	switch t := v.(type) {

	case int, int8, int16, int32, int64:

		return uint64(reflect.ValueOf(t).Int()) // a has type int64

	case uint, uint8, uint16, uint32, uint64:

		return reflect.ValueOf(t).Uint() // a has type uint64

	default:

		panic("type error")

	}

}

type Lock struct {

	name   string

	token  uint64

	expire uint64

}

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {

	resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()

	if err != nil {

		return nil, err

	}

	if len(resp) == 0 {

		return nil, fmt.Errorf("no response")

	}

	if resp[0] == nil {

		return nil, fmt.Errorf("failed")

	}

	data := resp[0].([]any)

	result := Lock{

		name:   name,

		token:  toUint64(data[1]),

		expire: toUint64(data[2]),

	}

	return &result, nil

}

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {

	resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()

	if err != nil {

		return false, err

	}

	if len(resp) == 0 {

		return false, fmt.Errorf("no response")

	}

	return resp[0].(bool), nil

}

func main() {

	instances := []pool.Instance{

		{

			Name: "instance-001",

			Dialer: tarantool.NetDialer{

				Address: "127.0.0.1:3301",

			},

		},

		{

			Name: "instance-002",

			Dialer: tarantool.NetDialer{

				Address: "127.0.0.1:3302",

			},

		},

		{

			Name: "instance-003",

			Dialer: tarantool.NetDialer{

				Address: "127.0.0.1:3303",

			},

		},

	}

	p, err := pool.Connect(context.Background(), instances)

	if err != nil {

		panic(err)

	}

	babbler := babble.NewBabbler()

	babbler.Count = 1

	wg := sync.WaitGroup{}

	for i := 0; i < 1000; i++ {

		name := babbler.Babble()

		wg.Add(1)

		go func(name string) {

			defer wg.Done()

			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

			defer cancel()

			var l *Lock

			for i := 0; i < 3; i++ {

				l, err = acquireLock(ctx, p, name, 10)

				if err != nil {

					time.Sleep(100 * time.Millisecond)

					continue

				}

				break

			}

			if l == nil {

				fmt.Println(name, "already locked")

				return

			}

			defer func() {

				ok, _ := releaseLock(ctx, p, l)

				if ok {

					fmt.Println(name, "success unlock")

				} else {

					fmt.Println(name, "lock expired")

				}

			}()

			fmt.Println(name, "success lock")

			time.Sleep(50 * time.Millisecond)

		}(name)

	}

	wg.Wait()

}

Итоги

Примерно за 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml-файла. Если один из узлов Tarantool упадет, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.

Полезные ссылки

© Habrahabr.ru