Python: строим распределенную систему c PySyncObj
Представьте, что у вас есть класс:
class MyCounter(object):
def __init__(self):
self.__counter = 0
def incCounter(self):
self.__counter += 1
def getCounter(self):
return self.__counter
И вы хотите сделать его распределённым. Просто наследуете его от SyncObj (передав ему список серверов, с которыми нужно синхронизироваться) и отмечаете декоратором @replicated все методы, которые изменяют внутреннее состояние класса:
class MyCounter(SyncObj):
def __init__(self):
super(MyCounter, self).__init__('serverA:4321', ['serverB:4321', 'serverC:4321'])
self.__counter = 0
@replicated
def incCounter(self):
self.__counter += 1
def getCounter(self):
return self.__counter
PySyncObj автоматически обеспечит репликацию вашего класса между серверами, отказоустойчивость (всё будет работать до тех пор, пока живо больше половины серверов), а также (при необходимости) асинхронный дамп содержимого на диск.
На базе PySyncObj можно строить различные распределенные системы, например распределенный мьютекс, децентрализованные базы данных, биллинговые системы и другие подобные штуки. Все те, где на первом месте стоит надёжность и отказоустойчивость.
Общее описание
Для репликации PySyncObj использует алгоритм Raft. Raft — это простой алгоритм достижения консенсуса в распределённой системе. Raft разрабатывался в качестве более простой замены алгоритма Paxos. Вкратце алгоритм raft работает следующим образом. Среди всех узлов выбирается лидер, который пингует остальные узлы через определенный промежуток времени. Каждый узел выбирает случайный промежуток времени, который он будет ждать получение пинга от лидера. Когда время ожидания заканчивается, а пинг от лидера не пришел — узел считает, что лидер упал и посылает остальным узлам сообщение, в котором говорит, что он сам стал лидером. При удачном стечении обстоятельств на этом всё и заканчивается (остальные узлы соглашаются). А в случае, если два узла захотели стать лидерами одновременно, процедура выбора лидера повторяется (но уже с другими случайными значениями времени ожидания). Подробнее о выборе лидера вы можете узнать посмотрев визуализацию, либо почитав научную статью.
После того как определён лидер, он отвечает за поддержание распределённого журнала. В распределённый журнал пишутся все действия, изменяющие состояние системы. Действие применяется к системе только в том случае, если большинство узлов подтверждает получение записи — это обеспечивает консистетность. Для того чтобы количество записей в распределенном логе не росло до бесконечности, периодически происходит операция под названием log compaction. Текущий лог выкидывается, а вместо него начинает хранится сериализованное состояние системы на текущий момент.
Чтобы не потерять содержимое (например, при выключении вообще всех серверов), его нужно периодически сохранять на диск. Так как количество данных может быть очень большим, содержимое сохраняется асинхронно. Чтобы одновременно иметь возможность работать с данными и параллельно сохранять их же на диск, PySyncObj использует CopyOnWrite через fork процесса. После fork-а процесс родитель и дочерний процесс имеют общую память. Копирование данных осуществляется операционной системой лишь в случае попытки перезаписи этих данных.
PySyncObj реализован целиком на Python (поддерживается Python 2 и Python 3) и не использует каких-либо внешних библиотек. Работа с сетью происходит при помощи select или poll, в зависимости от платформы.
Примеры
А теперь несколько примеров.Key-value storage
class KVStorage(SyncObj):
def __init__(self, selfAddress, partnerAddrs, dumpFile):
conf = SyncObjConf(
fullDumpFile=dumpFile,
)
super(KVStorage, self).__init__(selfAddress, partnerAddrs, conf)
self.__data = {}
@replicated
def set(self, key, value):
self.__data[key] = value
@replicated
def pop(self, key):
self.__data.pop(key, None)
def get(self, key):
return self.__data.get(key, None)
Вообщем-то всё то же самое что и со счетчиком. Для того чтобы периодически сохранять данные на диск создаём SyncObjConf и передаём ему fullDumpFile.Callback
PySyncObj поддерживает callback-и — вы можете создавать методы, возвращающие какие-то значения, они автоматически будут прокинуты в callback:
class Counter(SyncObj):
def __init__(self):
super(Counter, self).__init__('localhost:1234', ['localhost:1235', 'localhost:1236'])
self.__counter = 0
@replicated
def incCounter(self):
self.__counter += 1
return self.__counter
def onAdd(res, err):
print 'OnAdd: counter = %d:' % res
counter = Counter()
counter.incCounter(callback=onAdd)
Distributed lock
Пример чуть сложнее — распределенный лок. Весь код можете посмотреть на github, а здесь я просто опишу основные аспекты его работы.
Начнём с интерфейса. Лок поддерживает следующие операции:
- tryAcquireLock — попытка взять лок
- isAcquired — проверка, взят ли лок или отпущен
- release — отпустить лок
Первый возможный вариант реализации лока — аналогичный key-value хранилищу. Если по ключу lockA что-то есть, значит лок взят, иначе он свободен, и мы можем сами его взять. Но не всё так просто.
Во-первых, если мы просто воспользуемся kv-хранилищем из примера выше без всяких модификаций, то операции проверки наличия элемента (проверки взят ли лок) и записи элемента (взятие лока) будут не атомарны (то есть мы можем перезаписать чей-то другой лок). Поэтому проверка и взятие лока должны быть одной операцией, реализуемой внутри реплицируемого класса (в данном случае в tryAcquireLock).
Во-вторых, в случае если какой-то из клиентов, взявших лок упадёт, лок останется висеть навсегда (ну или пока клиент не переподнимется и не отпустит его). В большинстве случаев это нежелательное поведение. Поэтому мы введём timeout, после которого lock будет считаться отпущенным. Также придется добавить операцию, подтверждающую взятие лока (назовём её ping), которая будет вызываться с интервалом timeout / 4, и которая будет продлевать жизнь взятым локам.
Третья особенность — реплицируемые классы должны обеспечивать идентичное поведение на всех серверах. Это значит, что они не должны использовать внутри себя никаких данных, которые могут отличаться. Например, список процессов на сервере, значение random-а или время. Поэтому если мы всё же хотим использовать время — придется передавать его в качестве параметра всем методам класса, в которых оно используется.
С учетом этого, получившаяся реализации состоит из двух классов — LockImpl, являющийся реплицируемым объектом, а так же Lock, обертка над ним. Внутри Lock мы автоматически добавляем текущее время ко всем операциям над LockImpl, а так же осуществляем периодический ping с целью подтвердить взятые локи. Получившийся лок — всего лишь минимальный пример, который можно дорабатывать с учетом требуемой функциональности. Например, добавить колбэки, информирующие нас о взятии и отпускании лока.
Заключение
Мы используем PySyncObj в проекте WOT Blitz для синхронизации данных между серверами в разных регионах. Например, для счетчика оставшихся танках во время ивента ИС-3 Защитник. PySyncObj является неплохой альтернативной существующим механизмам хранения данных в распределенных системах. Основные аналоги — различные распределённые БД, например Apache Zookeeper, etcd и прочие. В отличие от них PySyncObj не является БД. Он является инструментом более низкого уровня и позволяет реплицировать сложные конечные автоматы. Кроме того он не требует внешних серверов и легко интегрируется в python приложения. Из недостатков текущей версии — потенциально не самая высокая производительность (сейчас это полностью python код, есть планы попробовать переписать в виде c++ экстеншена), а так же отсутствие разделение на серверную / клиентскую часть — иногда может возникнуть необходимость иметь большое количество нод-клиентов (часто подключающихся / отключающихся) и лишь несколько постоянно работающих серверов.
Ссылки
- github.com/bakwc/PySyncObj — исходники проекта
- pip install pysyncobj — установка через pypi
- raft.github.io — сайт протокола raft (описание и визуализация)
- ramcloud.stanford.edu/raft.pdf — оригинальная публикация raft (с подробным описанием деталей реализации)
- habrahabr.ru/post/222825 — Консенсус в распределенных системах. Paxos