Перенос процесса с одной ноды на другую

Есть несколько стандартных вопросов для собеседований на позицию разработчик эрланг/эликсир. Речь не идёт про веб, там от людей обычно требуется умение писать код на Phoenix, который настолько хорошо абстрагирует OTP от ранимого разработчика, что можно годами клепать на нем сайты и не иметь представления о том, что такое акторная модель.
Но если речь идёт о понимании парадигмы, людей обычно спрашивают примерно о таких вещах:
▸ [junior] чем отличается GenServer.call/2
от GenServer.cast/2
и от Process.send/3
▸ [middle] как реализована модель синхронных вызовов (GenServer.call/2
) и зачем в колбэке нужен второй параметр
▸ [middle] зачем нужен колбэк init/1
и в каких случаях из него имеет смысл вернуть кортеж {:ok, state, {:continue, arg}}
▸ [senior] как перенести процесс на другую ноду
Поскольку на хабре все без исключения носят лычку Синьёр++, сегодня мы поговорим о переносе процесса.
Зачем это может быть нужно?
Мне слишком часто приходилось объяснять самым разным людям самой разной квалификации, почему «воткнуть еще один сервер за балансером» — это не «горизонтальное масштабирование». Если ваши сервера просто отвечают на запросы, разговаривают с клиентами, так сказать, tête à tête, — пусть даже с сохранением сессии, которое обеспечивается средствами всё того же балансера, — увеличение количества серверов можно сравнить с таким зумом в фотоаппарате, который бы не увеличивал изображение, а просто множил бы его по горизонтали и вертикали: при шестикратном зуме тогда вместо одной картинки получалось бы шесть, большой экран уложенный плиткой 2×3.
Настоящее горизонтальное масштабирование требуется, когда состояние каждой ноды в кластере потенциально зависит от состояния всех остальных нод. Представьте себе распределенный умный кэш, например, который может собрать ответ по запросу, даже если разные части результата находятся на разных узлах. В такой ситуации просто «воткнуть еще один сервер» — из коробки — не заработает, нужны всякие консенсусы и прочая сложная хрень. CAP-теорема касается именно такого кластера, в случае разрозненных серверов за балансером — она не имеет смысла.
И тут на смену старым добрым инстансам пришли контейнеры с их внезапным рестартом в любое время дня и ночи. Спасибо, они хоть не просто убивают мои ноды, а дают какое-то время на выгрузку накопленных данных. Но куда выгружать? За десять с лишним лет очень плотной работы с BEAM — я напрочь отвык от необходимости городить вокруг приложения велосипеды с хранилищами, брокерами сообщений и даже базами данных — просто для персистенса ради персистенса. Не, корзина с товарами и имя пользователя — всё еще лежит в базе, конечно, но сейчас мы говорим просто про микросервис, который отдает горячий кэш с гигантским количеством промежуточных счетных полей, которые не хочется потерять просто из-за рестарта тупого контейнера.
Самым изящным и правильным решением, которое приходит на ум — было бы просто переместить процессы с умирающих нод в воскресшие в процессе rolling update.
И?
Этот вопрос несколько раз задавали на Stack Overflow, на различных форумах, и просто в ирке. BEAM из коробки не предоставляет такую возможность, потому что она в теории нарушает гарантии (если интересно, какие и как именно, — я отвечаю на комментарии и входящую корреспонденцию, здесь рассказывать про это неуместно). Гарантии отказоустойчивости — это краеугольный камень технологии и они уж точно никогда не будут принесены в жертву сомнительному удобству.
Поэтому (как много раз до того) — я решил причинить добро сообществу и реализовать такой перенос с оговоренными условиями применимости (тут так принято: если условия применимости необходимо оговаривать, хоть на йоту, в корку языка такое не попадет никогда — поэтому только библиотека).
Это же должно быть просто
Несложно, пока не начнешь задумываться о деталях. Сам-то процесс перенести — пять с половиной строчек кода: остановил тут, стартовал там с текущим стейтом, опционально — подобрал сообщения, которые могли потеряться в доставке за миллисекунду, пока локальный процесс уже сдох, а удаленный — еще не стартовал.
К сожалению, стейтом дело не ограничивается. У процесса есть dictionary, доступ к которому можно получить только из самого процесса. А еще процесс мог насоздавать ETS, в том числе и приватных. Поэтому просто так взять и перенести процесс не удастся.
Peeper to the rescue
У меня уже была библиотека, которая сохраняла стейт между падениями процесса, поэтому мне оставалось её допилить так, чтобы она сохраняла и dictionary
с ets
, а там и перенос уже будет можно настроить.
Проблема в том, что мы не имеем права светить приватные данные процесса во внешний мир, поэтому просто сделать их публичными и запрашивать по необходимости — не вариант. Peeper
стартует три процесса для сохранения стейта между падениями — собственно worker
, state
(который воссоздаст стейт в воркере после падения) и их супервизор. Поэтому мы можем разрешить узнавать стейт воркера только из соседнего процесса.
@impl GenServer
def handle_call(:__state__, {from, _}, state) do
case Peeper.Supervisor.state(state.supervisor) do
^from -> {:reply, ‹RESPONSE›, state}
_ -> {:reply, :hidden, state}
end
end
Уже неплохо. Мы можем так же сдампить все ETS
, созданные процессом
defp dump_ets(%{supervisor: sup, keep_ets: keep_ets}) do
Enum.flat_map(:ets.all(), fn table ->
info = :ets.info(table)
name = Keyword.fetch!(info, :name)
if Keyword.fetch!(info, :owner) == self() and
(keep_ets == :all or keep_ets == true or (is_list(keep_ets) and name in keep_ets)),
do: [sup |> Peeper.Supervisor.state() |> dump(name, info)],
else: []
end)
end
Но гонять ETS
туда-сюда, пусть даже приватно, — идея так себе. Они могут быть довольно объёмными. По счастью, у нас есть опция : heir при создании таблицы и мы можем создавать таблицы с явным указанием нашего стейт-процесса в виде наследника, тогда при падении процесса его ETS
будут переданы стейт-процессу, а тот, в свою очередь, после перезапуска воркера — передаст их обратно через : ets.give_away/3.
Собственно перенос
Ну-с, мы готовы написать функцию, которая будет переносить процесс. Ей потребуется pid
текущего динамического супервизора, pid
супервизора, куда мы хотим перенести наш процесс, и, собственно, идентификатор самого процесса. Чтобы не светить приватные данные, этот перенос будет осуществляться отправкой сообщения стейт-процессу, поскольку у него есть все данные для переноса.
# хелпер в модуле `Peeper`
def transfer(name, source_pid, destination_pid, true) do
name
|> Peeper.Supervisor.state()
|> GenServer.call({:transfer, name, source_pid, destination_pid})
end
# колбэк в модуле `Peeper.GenServer`
def handle_call({:transfer, name, from_dynamic_supervisor, to_dynamic_supervisor}, _from, state) do
worker_child_spec =
state.supervisor
|> Peeper.call(:__state__)
|> Keyword.put_new(:name, name)
|> Peeper.child_spec()
task =
with peeper_pid when is_pid(peeper_pid) <- GenServer.whereis(name),
from_dynamic_supervisor_pid when is_pid(from_dynamic_supervisor_pid) <-
GenServer.whereis(from_dynamic_supervisor),
to_dynamic_supervisor_pid when is_pid(to_dynamic_supervisor_pid) <-
GenServer.whereis(to_dynamic_supervisor) do
fn ->
with :ok <- DynamicSupervisor.terminate_child(from_dynamic_supervisor_pid, peeper_pid) do
me = node()
case node(to_dynamic_supervisor_pid) do
^me ->
DynamicSupervisor.start_child(to_dynamic_supervisor_pid, worker_child_spec)
remote ->
:rpc.block_call(remote, DynamicSupervisor, :start_child, [worker_child_spec])
end
end
end
else
_ -> nil
end
{:reply, task, state}
end
Вот, собственно, и всё на сегодня. Вот тест с переносом, для дотошных.
Удачных трансферов!