Пишем небесных пчелок на Cloud Haskell
Привет, Хабр!
Прошло всего лишь каких-то 11577635 секунд с конца осенней школы GoTo в ИТМО. Неделя направления Распределённых систем началась с прототипирования распределённой системы на Cloud Haskell. Мы начали бодро и потому быстро выяснили, что существующую документацию без PhD понять сложновато — и решили написать методичку.
Под катом введение в p2p cloud haskell, немножко функционального стека прототипирования РС, мотивация и «но зачем».
Положим, вам захотелось сделать что-то такое распределённое (скажем, %sCoin), что не покрывается хорошо существующими системами (YARN таки не ответ на все вопросы). Если начать делать всё руками, можно быстро обнаружить огромное количество проблем — от мультиплексирования соединений и шифрования до пробива NAT и peer routing, которые совсем, очень не хочется решать (не в первый раз в истории человечества), особенно если цель — конечный продукт или красивый, работающий прототип.
Любому прикладному программисту от такой постановки задачи быстро придёт в голову слово «библиотека». И действительно. Можно взять discovery и кусочки routing из, например, Kademlia, стандартные механизмы пробива NAT — STUN, TURN, ICE — в общем, тоже известны, для шифрования — ну, прибьём TCP (зная специфику своей сети) и сделаем TLS 1.3 с захардкоженными шифрами, etc.
Но это всё ещё будет требовать много времени и экспертизы. Инвесторам терпения может и не хватить.
Здесь более опытным коллегам придёт мысль: «нужен фреймворк!». И правда. Для Прототипирования Распределённых Систем и Приложений.
А кто-то даже скажет: б-же, так это же libp2p! И будет прав. Частично.
libp2p решает проблему транспорта, его мультиплексирования и шифрования, discovery, peer routing, пробива NAT, connection upgrade и т.д. — в общем, многие сетевые и криптопотребности распределённых приложений. На Go и JS.
Это отличный фреймворк, но у него есть пара проблем. Это Go и JS. Кроме того, было бы приятно иметь во фреймворке что-нибудь для репликации.
the fragmented nature of the tutorials, some of which didn«t work at all, convinced me to not use Cloud Haskell
http://www.scs.stanford.edu/14sp-cs240h/projects/joshi.pdf, перефразировано
Наш проект начался с амбиции сделать блокчейн (простите, инновации) на Хаскелле — поэтому libp2p у нас не было — и за четыре дня. Мы начали искать нечто, что сделало бы сеть (транспорт, discovery, сериализацию) за нас. Нашли Cloud Haskell. Обнаружили, что с документацией сложновато. Решили написать своё введение. Итак:
Пишем небесных пчелок на Cloud Haskell
В примере мы напишем систему из пчелок: есть улей — кластер машин, и пчелки — ноды (машины). Пчелки отправляются на разведку искать цветочки и возвращаются с координатами вкусных цветочков в улей, а все другие пчелки должны об этих координатах узнать.
Вам вовсе не обязательно запускать программу на нескольких компьютерах — достаточно и ноутбука, на котором мы параллельно запустим нашу программу.
Полный код находится в репозитории.
Cloud Haskell работает по принципу обмена сообщений между нодами (такая модель называется message passing), потому что ноды не разделяют общее пространство ресурсов (RAM, …) — модель shared state легко использовать не получится. Actor Model — частный пример модели message passing, когда сообщения рассылают акторы другим акторам и принимают сообщения в свой mailbox — так message passing выглядит в Cloud Haskell.
1. Для начала определим типы данных, представителями которых будут обмениваться пчелы: src/Types.hs
type Flower = (Int, Int) -- координаты цветка
type Flowers = GSet Flower -- Grow-Only Set цветков
- Пчелы должны знать о каждом цветке, известном хотя бы одной пчелке в улье, следовательно им необходимо поддерживать единое состояние «базы данных» цветков в своем пчелином мозгу — решить проблему репликации данных, для чего нужно уметь достигать консенсус: разрешать конфликты между базами данных разных пчел. Для этого мы будем использовать структуру данных GSet (Grow-only Set) — множество, в которое элементы можно только добавлять, но не удалять. Это одна из структур данных CRDT.
Log A Log B | | logA.append("one") logA.append("two") | | v v +-----+ +-------+ |"one"| |"hello"| +-----+ +-------+ | | logA.append("two") logA.append("world") | | v v +-----------+ +---------------+ |"one","two"| |"hello","world"| +-----------+ +---------------+ | | | | logA.join(logB) <----------+ | v +---------------------------+ |"one","hello","two","world"| +---------------------------+
Схема достижения консенсуса с помощью CRDT (Из https://github.com/haadcode/ipfs-log) - Необходим интерфейс, который будет служить рецепторами пчелки: добавление элемента в GSet, а также просмотр вкусных цветочков, известных улью — реализуем это в виде REPL (интерактивной оболочки).
2. Приступим к реализации ноды, которую в дальнейшем будем запускать из командной строки: app/Main.hs
main = do -- точка входа
[port, bootstrapPort] <- getArgs -- (1) считываем порт ноды и bootstrap ноды из аргументов командной строки
let hostName = "127.0.0.1" -- IP ноды
P2P.bootstrap -- вызываем функцию инициализации ноды со следующими аргументами:
hostName
port -- порт ноды
(\port -> (hostName, port))
initRemoteTable -- (2) создаем remote table
[P2P.makeNodeId (hostName ++ ":" ++ bootstrapPort)] -- список из одной bootstrap ноды
spawnNode -- функция запуска логики ноды, ее код мы напишем потом
- Для начала ноды должны как-то друг о друге узнать, то есть совершить peer discovery. В Cloud Haskell есть решение «из коробки» — при инициализации ноды нам достаточно указать хотя бы одну другую bootstrap ноду: нода совершает с bootstrap нодой Peer Exchange — они обмениваются адресами знакомых им нод (aka пиров).
- Remote table — штука, которая позволяет пирам обмениваться типами haskell, если они поддерживают сериализацию, то есть их можно представить в формате, который можно переслать по сети и восстановить обратно в обьект Haskell. Тип поддерживает сериализацию, если он реализовывает тайпкласс
class (Binary a, Typeable) => Serializable a
. Вам не надо самому придумывать реализациюSerializable
,Binary
иTypeable
— haskell сделает это за вас (с помощью магического механизма automatic deriving):
Далее мы будем опускать{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} -- прагмы языка, позволяющие автоматически реализовать Binary data Example = Example deriving (Typeable, Generic) instance Binary Example
deriving ...
,instance Binary
и прагмы ради краткости кода.
3. Теперь напишем логику запуска ноды:
spawnNode :: Process () -- (1) функция запуска логики ноды
spawnNode = do
liftIO $ threadDelay 3000000 -- даем bootstrap ноде время чтобы запуститься
let flowers = S.initial :: Flowers -- инициализирум GSet для хранения координат цветков
self <- getSelfPid -- (3) получаем наш Pid чтобы REPL мог посылать нам сообщения
repl <- spawnLocal $ runRepl self -- (2) создаем REPL в отдельном потоке
register "bees" self -- теперь нода будет получать сообщения из канала "bees"
spawnLocal $ forever $ do -- (3) запускаем тикер:
send self Tick -- оповестить основной поток что надо передать пирам свое состояние
liftIO $ threadDelay $ 10^6 -- ждемс 0.1 секунды перед тем, как снова отослать состояние
runNode (NodeConfig repl) flowers -- (5) запускаем ноду
- В Cloud Haskell основная функциональная единица это
Process
(не путайте с процессом ОС). Они основаны на легковесных зеленых потоках и могут посылать другим процессам сообщения (функцияsend
чтобы послать определенному процессу илиP2P.nsendPeers
чтобы послать всем знакомым нодам), принимать сообщения в свой mailbox (функцияexpect
илиreceive*
), запускать другие процессы (например локально с помощьюspawnLocal
) и т.д. - Нам необходимо реализовать REPL в отдельном потоке, иначе будет происходить блокировка основного потока (ноды), следовательно надо сделать потоко-безопасный интерфейс для GSet, чтобы он был доступен для изменения как для REPL, так и для ноды. Так как система основана на акторах, мы будем посылать сообщения на изменение множества и обрабатывать их последовательно в бесконечном цикле обработки соообщений в главном потоке.
- Мы запускаем REPL как отдельный процесс cloud-haskell (т.е. как зеленый поток), и также передаем ему Pid основного процесса (уникальный идентификатор процесса) чтобы REPL знал, куда посылать команды, введенные пользователем, в виде сообщений. Далее мы получаем Pid REPL«а (его возвращает
spawnLocal
) чтобы посылать ему ответы на команды. Код REPL лежит тут. - Как будет работать репликация цветков?
- Каждая нода будет периодически рассылать свое состояние всем пирам (broadcast) — и это в совокупности с CRDT решает проблему репликации:
Пусть есть нодыA
иB
. Предположим уA
нет элемента x, а уB
x есть. После того, какB
совершит broadcast,A
добавит x — консенсус достигнут, ч.т.д.
Если бы мы имели обыкновенное множество, а не GSet, то ничего бы не получилось: Предположим уA
иB
есть элемент y. ПустьA
удалит y. После того, какB
совершит broadcast,A
получит y обратно. - Когда мы посылаем сообщение всем нодам, мы должны указать имя сервиса — на самом деле мы посылаем сообщение только тем нодам, которые зарегистрировали себя в регистре как поддерживающиe этот сервис. Здесь мы регистрируем нашу ноду как поддерживающую сервис «bees»:
register "bees" self
. - Нода должна знать, когда надо послать другим свое состояние. Самое простое решение — делать это по таймеру: ждать секунду, а потом действовать, но тогда бы мы блокировали основной поток обработки сообщений. Здесь мы запускаем процесс через
spawnLocal
, который сначала посылает сообщение Tick главному процессу (когда главный процесс видет Tick, он посылает нодам свое состояние), а потом ждет 1 секунду и повторяет.
- Каждая нода будет периодически рассылать свое состояние всем пирам (broadcast) — и это в совокупности с CRDT решает проблему репликации:
4. Ок, теперь (наконец-то!) мы можем приступить к логике работы основного процесса — код исполнения ноды:
runNode :: NodeConfig -> Flowers -> Process () -- (1) функция логики ноды
runNode config@(NodeConfig repl) flowers = do
let run = runNode config
receiveWait -- (2) ждем сообщений
[ match (\command -> -- (3) если нам пришло что-то типа Command от REPL, то
newFlowers <- handleReplCommand config flowers -- получаем новое состояние цветков
run newFlowers)
, match (\Tick -> do -- сигнал о том, что надо поделиться своим состоянием с другими
P2P.nsendPeers "bees" flowers -- отправить всем пирам цветки
run flowers)
, match (\newFlowers -> do -- кто-то отправил ноде цветочки
run $ newFlowers `union` flowers) -- добавляем новые в базу - по сути обьединение множеств
]
- Посмотрим на сигнатуру:
runNode
принимает конфигурацию ноды типаNodeConfig
— та информация, которая не будет меняться во время исполнения. В нашем случае это просто Pid REPL. Еще она принимает свое текущее состояние — GSet цветочков. Но как добавить цветок, ведь GSet — неизменяемый тип данных? Очень просто: сделаем нашу функцию рекурсивной, и при каждом изменении состояния будем запускать ее заново. receiveWait
принимает список функций с одним аргументом (входящим сообщением), вытаскивает сообщение и вызывает функцию, подходящую по типу сообщения.- Если нам пришло сообщение такого типа:
data Command = Add Flower | Show
, то это — команда от REPL.handleReplCommmand
— функция для обработки команды:handleReplCommand :: NodeConfig -> Flowers -> Command -> Process Flowers handleReplCommand (NodeConfig repl) flowers (Add flower) = do -- команда добавления элемента от пользователя send repl (Added flower) -- отправить REPLу сообщение, что цветок добавлен return $ S.add flower flowers -- запускаем ее уже с новым цветком handleReplCommand (NodeConfig repl) flowers Show = do -- запрос показать цветочки send repl (HereUR $ toList flowers) -- отправить цветочки в виде списка return flowers
- Если пришел Tick от тикера — значит надо отправить свое состояние:
P2P.nsendPeers "bees" flowers
. Здесь «bees» — имя сервиса, то есть мы пересылаем цветки только тем нодам, которые зарегистрировали себя как «bees». - Если же нам пришли цветочки от какой-то другой пчелки, нам надо все незнакомые цветки добавить себе, то есть попросту объединить множество новых с множеством существующих.
5. Вот и все! Загрузим полный исходный код и скомпилируем:
git clone https://github.com/SenchoPens/cloud-bees.git
cd cloud-bees
stack setup # Stack установит GHC
stack build # Компилируем
Теперь запустите в одном терминале эту строчку:
stack exec cloud-bees-exe 9000 9001 2>/dev/null
И в другом эту:
stack exec cloud-bees-exe 9001 9000 2>/dev/null
REPL выведет приглашение. Попробуйте в одном терминале ввести Add (1, 2)
, т.е. добавить цветок с координатами (1, 2), а в другом — Show
, и увидите, что и у второй ноды теперь есть такой цветок.
- Часть
2>/dev/null
нужна чтобы скрыть stderr, в который Cloud Haskell выводит лог. Если этого не сделать, то мы не сможем нормально пользоваться REPL. Можете заменить/dev/null
наlog.txt
и потом посмотреть, что же он вывел.
Надеюсь, мы убедили вас, что создание распределенных систем на Haskell — это не так уж и страшно :)
Можно придумать много реальных юз-кейсов для похожей системы: например, решение проблемы зайцев в общественном транспорте: человек, проходя в транспорт по карточке, маркируется как зашедший (добавляем его id в первый GSet), а на выходе — как вышедший (добавляем id во второй GSet). Ночью (когда транспорт не работает) происходит проверка — если человек вошел и вышел, то он не заяц.
Если вам интересно — можете посмотреть наш более обьемный проект с шифрованием, который мы сделали во время смены.
С любовью, Арсений и компания, 9 класс; под нежным руководством wldhx