Гетерогенная конкурентная обработка данных в реальном времени строго один раз

Конкурентная сосиска


Аннотация

Обработка данных в реальном времени ровно один раз (exactly-once) — задача крайне нетривиальная и требующая серьезного и вдумчивого подхода на всей цепочке вычислений. Некоторые даже считают, что такая задача невыполнима. В реальности хочется иметь подход, обеспечивающий отказоустойчивую обработку вообще без каких-либо задержек и использование различных хранилищ данных, что выдвигает новые еще более жесткие требования, предъявляемые к системе: concurrent exactly-once и гетерогенность персистентного слоя. На сегодняшний день такое требование не поддерживает ни одна из существующих систем.

Предложенный подход последовательно раскроет секретные ингредиенты и необходимые понятия, позволяющие относительно просто реализовать гетерогенную обработку concurrent exactly-once буквально из двух компонент.


Введение

Разработчик распределенных систем проходит несколько стадий:

Стадия 1: Алгоритмы. Здесь происходит изучение основных алгоритмов, структур данных, подходов к программированию типа ООП и т.д. Код исключительно однопоточный. Начальная фаза вхождения в профессию. Тем не менее, достаточно непростая и может длиться годами.

Стадия 2: Многопоточность. Далее возникают вопросы извлечения максимальной эффективности из железа, возникает многопоточность, асинхронность, гонки, дебагинг, strace, бессонные ночи… Многие застревают на этом этапе и даже начинают с какого-то момента ловить ничем не объяснимый кайф. Но лишь единицы доходят до понимания архитектуры виртуальной памяти и моделей памяти, lock-free/wait-free алгоритмах, различных асинхронных моделях. И почти никто и никогда — верификации многопоточного кода.

Стадия 3: Распределенность. Тут такой треш творится, что ни в сказке сказать, ни пером описать.

Казалось бы, чего сложного. Делаем трансформацию: много потоков → много процессов → много серверов. Но каждый шаг трансформации привносит качественные изменения, и все они ложатся на систему, раздавливая ее и превращая в прах.

И дело тут в изменении домена обработки ошибок и наличия разделяемой памяти. Если раньше всегда был кусочек памяти, который был доступен в каждом потоке, и при желании, в каждом процессе, то теперь такого кусочка нет и быть не может. Каждый сам за себя, независимый и гордый.

Если раньше сбой в потоке хоронил поток и процесс заодно, и это было хорошо, т.к. не приводило к частичным отказам, то теперь частичные отказы становятся нормой и каждый раз перед каждым действием ты думаешь: «а что, если?». Это настолько напрягает и отвлекает от написания, собственно, самих действий, что код из-за этого растет не в разы, а на порядки. Все превращается в лапшу обработки ошибок, состояний, переключений и сохранения контекста, восстановления из-за сбоев одного компонента, другого компонента, недоступности части сервисов и т.п. и т.д. Накрутив на все это добро мониторинг можно великолепно проводить ночи напролет за любимым ноутбуком.

То ли дело в многопоточности: взял мьютекс и пошел кромсать общую память в удовольствие. Красота!

В результате мы имеем, что ключевые и проверенные в боях паттерны забрали, а новые, на замену им, отчего-то не завезли, и получилось как в анекдоте про то, как фея взмахнула палочкой, и у танка отвалилась башня.

Тем не менее, в распределенных системах есть набор проверенных практик и доказанные алгоритмы. Однако, каждый уважающий себя программист считает своим долгом отринуть известные достижения и навелосипедить свое, родное добро, невзирая на накопленный опыт, немалое количество научных статей и академических исследований. Ведь если ты можешь в алгоритмы и многопоточность, как можно попасть впросак с распределенностью? Двух мнений тут быть не может!

В результате системы глючат, данные расходятся и портятся, сервисы периодически становятся недоступны на запись, а то и вовсе недоступны, потому что внезапно нода упала, сеть заглючила, Java скушала много памяти и GC затупил, и много еще других причин, позволяющие оттягивать свой конец перед начальством.

Однако, даже с известными и проверенными подходами жизнь не становится проще, т.к. распределенные надежные примитивы являются тяжеловесными с серьезными требованиями, предъявляемые к логике исполняемого кода. Поэтому углы срезаются везде, где только можно. И, как это часто бывает, с отрезанными наспех углами появляется простота и относительная масштабируемость, но улетучивается надежность, доступность и консистентность распределенной системы.

В идеале хотелось бы вообще не думать о том, что система у нас распределенная и многопоточная, т.е. работать на 1-й стадии (алгоритмы), не задумываясь о 2-й (многопоточность+асинхронность) и 3-й (распределенность). Такой способ изоляции абстракций существенно повысил бы простоту, надежность и скорость написания кода. К сожалению, на текущий момент это возможно лишь в мечтах.

Тем не менее, отдельные абстракции позволяют добиться относительной изоляции. Один из характерных примеров — это использование сопрограмм, где вместо асинхронного кода мы получаем синхронный, т.е. переходим от 2-й стадии к 1-й, что и позволяет существенно упростить написание и сопровождение кода.

В статье последовательно раскрывается использование lock-free алгоритмов для построения надежной консистентной распределенной масштабируемой real-time системы, т.е. как lock-free достижения 2-й стадии помогают в реализации 3-й, сводя задачу к однопоточным алгоритмам 1-й стадии.


Постановка задачи

Данная задача лишь иллюстрирует некоторые важные подходы и представлена в качестве примера для введения в контекст проблематики. Ее легко обобщить на более сложные случаи, что и будет сделано в дальнейшем.

Задача: обработка потоковых данных в реальном времени.

Имеется два потока чисел. Обработчик читает данные этих входных потоков и отбирает последние числа за определенный промежуток. Эти числа усредняются в этом временном промежутке, т.е. в скользящем окне данных за некоторое заданное время. Полученное среднее значение необходимо записать в выходную очередь для последующей обработки. Помимо этого если количество чисел в окне превышает некоторое пороговое, то увеличить на единицу счетчик во внешней транзакционной базе данных.

Initial

Отметим некоторые особенности данной задачи.


  1. Недетерминированность. Источников недетерминированного поведения два: это считывание из двух потоков, а также временное окно. Понятно, что считывание можно проводить разными способами, и от того, в какой последовательности будут извлекаться данные, и будет зависеть конечный результат. Временное окно также изменяет результат от запуска к запуску, т.к. от скорости работы будет зависеть количество данных в окне.
  2. Состояние обработчика. Присутствует состояние обработчика в виде набора чисел в окне, от которого зависит текущий и последующие результаты работы. Т.е. мы имеем stateful обработчик.
  3. Взаимодействие с внешним хранилищем. Необходимо обновлять значение счетчика во внешней базе данных. Принципиальный момент заключается в том, что тип внешнего хранилища отличается от хранилища состояния обработчика и потоков.

Все это, как будет показано ниже, серьезно влияет на используемые инструменты и на возможные способы реализации.

Осталось добавить к задаче маленький штришок, который сразу переводит задачу из области запредельной сложности в область невозможную: необходима гарантия concurrent exactly-once.


Exactly-Once

Exactly-once часто трактуется слишком широко, что выхолащивает сам термин, и он перестает отвечать изначальным требованиям задачи. Если мы говорим про систему, которая работает локально на одном компьютере — то тут все просто: бери больше, кидай дальше. Но в данном случае речь идет про распределенную систему, в которой:


  1. Число обработчиков может быть большим: каждый обработчик работает со своим куском данных. При этом результаты могут складываться в различные места, например, внешняя база данных, возможно даже пошардированная.
  2. Каждый обработчик может внезапно прекратить свою обработку. Отказоустойчивая система подразумевает продолжение работы даже в случае отказа отдельных частей системы.

Таким образом, надо быть готовым к тому, что обработчик может упасть, и другой обработчик должен подхватить уже проделанную работу и продолжить обработку.

Тут сразу возникает вопрос:, а что будет означать exactly-once в случае работы недетерминированного обработчика? Ведь каждый раз при перезапуске мы будем получать, вообще говоря, разные результирующие состояния. Ответ тут простой: при exactly-once существует такое исполнение системы, которые каждое входное значение обрабатывалось ровно один раз, давая соответствующий выходной результат. При этом это исполнение не обязательно должно быть физически на одной и той же ноде. Но результат должен быть таким, как если бы все обрабатывалось на некоторой одной логической ноде без падений.


Concurrent Exactly-Once

Для усугубления требований введем новое понятие: concurrent exactly-once. Принципиальное отличие от простого exactly-once состоит в отсутствие пауз при обработке, как если бы все обрабатывалось на одной ноде без падений и без пауз. В нашей задаче мы будем требовать именно concurrent exactly-once, для простоты изложения, чтобы не рассматривать сравнение с существующими системами, которых на сегодняшний день нет.

О последствиях наличия такого требования будет сказано ниже.


Транзакционность

Чтобы читатель еще глубже проникся возникшей сложностью, давайте рассмотрим различные нехорошие сценарии, которые необходимо учитывать при разработке подобной системы. Также попытаемся использовать общий подход, который позволит решить приведенную выше задачу с учетом наших требований.

Первое, что приходит на ум — это необходимость записывать состояние обработчика и входных и выходных потоков. Состояние выходных потоков описывается простой очередью чисел, а состояние входных потоков — позицией в них. По сути, поток представляет собой бесконечную очередь, и позиция в очереди однозначно задает местоположение.

Idea

Возникает следующая наивная реализация обработчика с использованием некоего хранилища данных. На данном этапе конкретные свойства хранилища нам не будут важны. Будем использовать язык Псеко для иллюстрации идеи (Псеко := псевдо код):

handle(input_queues, output_queues, state):
    # восстанавливаем позиции потоков
    input_indexes = storage.get_input_indexes()
    # в цикле обрабатываем входящие потоки
    while (true):
        # загружаем данные из очередей начиная с предыдущей позиции
        items, new_input_indexes = input_queues.get_from(input_indexes)
        # добавляем в очередь
        state.queue.push(items)
        # и обновляем окно согласно duration
        state.queue.trim_time_window(duration)
        avg = state.queue.avg()
        need_update_counter = state.queue.size() > size_boundary
        # (A) добавляем среднее в выходную очередь
        output_queues[0].push(avg)
        if need_update_counter:
            # (B) увеличиваем счетчик во внешней базе
            db.increment_counter()
        # (C) сохраняем состояние в хранилище
        storage.save_state(state)
        # (D) сохраняем значения индексов
        storage.save_queue_indexes(new_input_indexes)
        # (E) обновляем текущие индексы
        input_indexes = new_input_indexes

Здесь приведен простой однопоточный алгоритм, который вычитывает из входных потоков данные и записывает нужные значения согласно описанной выше задаче.

Давайте посмотрим, что будет происходить в случае падения ноды в произвольные моменты времени, а также после возобновления работы. Понятно, что в случае падения в точках (A) и (E) все будет отлично: либо данные никуда еще не успели записаться и мы просто восстановим состояние и продолжим на другой ноде, либо все необходимые данные уже записались и просто продолжим выполнение следующего шага.

Однако в случае падения во всех других точках нас ждут неприятные неожиданности. Если произойдет падение в точке (B), то при повторном запуске обработчика мы восстановим состояния и запишем повторно среднее значение на примерно том же интервале чисел. В случае падения в точке (C) помимо дубликата среднего возникнет дубликат в инкременте значения. А в случае падения в (D) мы получим неконсистентное состояние обработчика: состояние соответствует новому моменту времени, а зачитывать значения из входных потоков мы будет старые.

Неожиданности

При этом при перестановке операций записи принципиально ничего не поменяется: неконсистентность и дубликаты так и будут оставаться. Таким образом, мы приходим к выводу о том, что все действия по изменению состояния обработчика в хранилище, выходной очереди и базы данных должны выполняться транзакционно, т.е. все одновременно и атомарно.

Соответственно, необходимо разработать механизм, чтобы различные хранилища могли транзакционно изменять свое состояние, причем не внутри каждого независимо, а транзакционно между всеми хранилищами одновременно. Конечно, можно поместить наше хранилище внутрь внешней базы данных, однако в задаче предполагалось, что движок базы данных и движок фреймворка обработки потоковых данных разведены и работают независимо друг от друга. Здесь я хочу рассматривать наиболее тяжелый случай, т.к. простые случаи рассматривать неинтересно.


Конкурентная отзывчивость

Рассмотрим конкурентное выполнение ровно один раз более подробно. В случае отказоустойчивой системы мы требуем продолжение работы с некоторой точки. Понятно, что эта точка будет некоторым моментом в прошлом, т.к. для сохранения производительности невозможно сохранять в хранилище все моменты изменения состояния в настоящем и будущем: сохраняется либо последний результат операций, либо группа значений для увеличения пропускной способности. Такое поведение сразу приводит нас к тому, что после восстановления состояния обработчика будет происходить некоторая задержка результатов, она будет расти с увеличением размера группы значений и размера состояния.

Помимо этой задержки в системе присутствуют также задержки, связанные с загрузкой состояния на другую ноду. Дополнительно к этому, детектирование проблемной ноды также занимает какое-то время, причем зачастую немалое. Связано это, прежде всего, с тем, что если мы поставим малое время детектирования, то возможны частые ложные срабатывания, что будет приводить к разного рода неприятным спецэффектам.

Кроме того, с ростом количества параллельных обработчиков внезапно оказывается, что не все они одинаково хорошо работают даже в условиях отсутствия отказов. Иногда случаются затупы, которые также приводят к задержкам в обработке. Причина таких затупов может быть разнообразная:

Программная: паузы GC, фрагментация памяти, паузы в аллокаторе, прерывание ядра и планирование задач, проблемы с драйверами устройств, вызывающие замедление работы.
Аппаратная: высокая загруженность диска или сети, CPU throttling из-за проблем охлаждения, перегрузки и т.д., замедление работы диска из-за технических проблем.

И это далеко не исчерпывающий список проблем, которые могут приводить к замедлению обработчиков.

Соответственно, замедление работы — это данность, с которой приходится жить. Иногда это не является серьезной проблемой, а иногда крайне важно сохранять высокую скорость обработки несмотря на сбои или замедления.

Сразу возникает идея дублирования систем: запустим для одного и того же потока данных не один, а сразу два обработчика, или даже три. Проблема тут в том, что в этом случае легко могут возникать дубликаты и неконсистентное поведение системы. Обычно фреймворки не рассчитаны на такое поведение и предполагают, что количество обработчиков в каждый момент времени не превышает одного. Системы, допускающие описанное дублирование исполнения, называются concurrent exactly-once.

Такая архитектура позволяет решать сразу несколько проблем:


  1. Отказоустойчивое поведение: если одна из нод падает, то другая просто продолжает работу как будто ничего не произошло. Здесь нет необходимости в дополнительной координации действий, т.к. второй обработчик выполняется безотносительно состояния первого.
  2. Удаление затупов: кто первый предоставил результат, тот и молодец. Другому лишь останется подцепить новое состояние и продолжить с этого момента.

Такой подход, в частности, позволяет завершить сложный тяжелый долгий расчет за более предсказуемое время, т.к. вероятность того, что оба будут тупить и падать существенно меньше.


Вероятностная оценка

Попытаемся оценить преимущества дублирования исполнения. Предположим, что с обработчиком в среднем каждый день что-то происходит: либо затупил GC, либо нода лежит, либо контейнеры стали раком. Предположим также, что мы подготавливаем пачки данных за 10 секунд.

Тогда вероятность того, что что-то произойдет за время создания пачки равна 10 / (24 · 3600) ≃ 1e-4.

Если запустить параллельно два обработчика, то вероятность того, что обоим поплохеет ≃ 1e-8. А значит это событие наступит через 23 года! Да системы столько не живут, а значит этого не произойдет никогда!

При этом если время подготовки пачки будет еще меньше и/или затупы будут происходить еще реже, то эта цифра будет лишь увеличиваться.

Таким образом приходим к выводу, что рассматриваемый подход существенно повышает надежность всей нашей системы. Осталось лишь решить маленький такой вопросик:, а где прочитать про то, как сделать concurrent exactly-once систему. А ответ простой: здесь и надо читать.


Полутранзакции

Для дальнейшего изложения нам понадобится понятие полутранзакция. Проще всего его объяснить на примере.

Рассмотрим перевод средств с одного банковского счета на другой. Традиционный подход с использованием транзакций на языке Псеко можно описать следующим образом:

transfer(from, to, amount):
    tx = db.begin_transaction()
    amount_from = tx.get(from)
    if amount_from < amount:
        return error.insufficient_funds
    tx.set(from, amount_from - amount)
    tx.set(to, tx.get(to) + amount)
    tx.commit()
    return ok

Однако что делать, если такие транзакции нам недоступны? Применяя блокировки, это можно сделать следующим образом:

transfer(from, to, amount):
    # автоматически отпускает блокировку при выходе из области видимости
    lock_from = db.lock(from)
    lock_to = db.lock(to)
    amount_from = db.get(from)
    if amount_from < amount:
        return error.insufficient_funds
    db.set(from, amount_from - amount)
    db.set(to, db.get(to) + amount)
    return ok

Такой подход может приводить к дедлокам, т.к. блокировки могут браться в разной последовательности параллельно. Чтобы исправить такое поведение, достаточно ввести функцию, которая одновременно берет несколько блокировок в детерминированной последовательности (например, сортирует по ключам), что полностью избавляет от возможных дедлоков.

Тем не менее, реализацию можно несколько упростить:

transfer(from, to, amount):
    lock_from = db.lock(from)
    amount_from = db.get(from)
    if amount_from < amount:
        return error.insufficient_funds
    db.set(from, amount_from - amount)
    lock_from.release()
    # такая блокировка необходима,
    # т.к. db.set(db.get...) паттерн не является атомарным
    lock_to = db.lock(to)
    db.set(to, db.get(to) + amount)
    return ok

Такой подход также делает конечно состояние консистентным, сохраняя инварианты по типу предотвращения излишнего расхода средств. Главное отличие от предыдущего подхода в том, что в такой реализации у нас есть некоторый промежуток времени, в котором счета находятся в неконсистентном состоянии. А именно, такая операция подразумевает, что суммарное состояние средств на счетах не изменяется. В данном случае между lock_from.release и lock(to) существует временной зазор, в течении которого база данных может выдавать неконсистентное значение: итоговая сумма может отличаться от корректной в меньшую сторону.

По сути, мы разбили одну транзакцию по переводу денег на две полутранзакции:


  1. Первая полутранзакция делает проверку и снимает со счета необходимую сумму.
  2. Вторая полутранзакция записывает снятую сумму на другой счет.

Понятно, что раздробление транзакции на более мелкие, вообще говоря, нарушает транзакционное поведение. И приведенный выше пример — не исключение. Однако, если все полутранзакции в цепочки полностью исполнились, то результат будет консистентным с сохранением всех инвариантов. Именно это и является важным свойством цепочки полутранзакций.

Временно теряя некоторую консистентность, мы, тем не менее, приобретаем другое полезное свойство: независимость операций, и, как следствие, лучшую масштабируемость. Независимость проявляется в том, что полутранзакция каждый раз работает лишь с одной строкой, читая, проверяя и изменяя ее данные, не общаясь при этом к другим данным. Таким образом можно пошардировать базу данных, транзакции которой работают лишь с одним шардом. Более того, такой подход можно использовать в случае наличия гетерогенных хранилищ, т.е. полутранзакции могут начинаться на одном типе хранилища, а заканчиваться на другом. Именно такие полезные свойства и будут использованы в дальнейшем.

Возникает закономерный вопрос:, а как реализовать полутранзации в распределенных системах и не огрести? Для решения этого вопроса необходимо рассмотреть lock-free подход.


Lock-free

Как известно, lock-free подходы порой улучшают производительность многопоточных систем, особенно в случае конкурентного доступа к ресурсу. Тем не менее, совершенно неочевидно, что такой подход можно использовать в распределенных системах. Давайте копнем вглубь и рассмотрим, что же такое lock-free и почему это свойство будет полезно при решении нашей задачи.

Некоторые разработчики иногда не совсем четко представляют себе, что же такое lock-free. Обывательский взгляд подсказывает, что это что-то, связанное с атомарными процессорными инструкциями. Тут важно понимать при этом, что lock-free означает использование «атомиков», обратное же неверно, т.е. не всякие «атомики» дают lock-free поведение.

Важное свойство lock-free алгоритма заключается в том, что хотя бы один поток делает прогресс в системе. Но почему-то многие это свойство выдают за определение (именно такое тупорылое определение и можно найти, например, в википедии). Тут необходимо добавить один важный нюанс: прогресс совершается даже в случае затупов одного или нескольких потоков. Это очень критический момент, который часто упускается из виду, имеющий серьезные последствия для распределенной системы.

Почему отсутствие условие прогресса хотя бы одного потока сводит на нет понятие lock-free алгоритма? Дело в том, что в этом случае обычный spinlock также будет являться lock-free. Действительно, тот, кто взял блокировку, тот и будет совершать прогресс. Есть поток с прогрессом => lock-free?

Очевидно, что lock-free обозначает без блокировок, в то время как spinlock своим названием говорит о том, что это есть самая настоящая блокировка. Именно поэтому важно добавить условие о прогрессе даже в случае затупов. Ведь эти задержки могут длиться неограниченно долгое время, т.к. определение ничего не говорит о верхней временной границе. А раз так, то такие задержки будут эквивалентны в каком-то смысле выключениям потоков. При этом lock-free алгоритмы будут производить прогресс и в этом случае.

Но кто сказал, что lock-free подходы применимы исключительно для многопоточных систем? Заменив потоки в одном процессе на одной ноде на процессы на разных нодах, а общую память потоков на общее распределенное хранилище, мы получим lock-free распределенный алгоритм.

Падение ноды в такой системе эквивалентно задержке выполнения потока на какое-то время, т.к. для восстановление работы необходимо это самое время. При этом lock-free подход позволяет продолжать работу другим участникам распределенной системы. Более того, специальные lock-free алгоритмы можно запускать параллельно друг с другом, детектируя конкурентное изменение и вырезая дубликаты.

Exactly-once подход подразумевает наличие консистентного распределенного хранилища. Такие хранилища как правило представляют собой огромную персистентную key-value таблицу. Возможные операции: set, get, del. Однако, для lock-free подхода необходима более сложная операция: CAS или compare-and-swap. Рассмотрим более детально эту операцию, возможности ее использования, а также то, какие результаты это дает.


CAS

CAS или compare-and-swap — основной и важный примитив синхронизации для lock-free и wait-free алгоритмов. Суть его можно проиллюстрировать следующим Псеко:

CAS(var, expected, new):
    # все, что внутри atomic, выполняется атомарно
    atomic:
        if var.get() != expected:
            return false
        var.set(new)
        return true

Иногда для оптимизации возвращают не true или false, а предыдущее значение, т.к. очень часто такие операции производят в цикле, а чтобы получить expected значение, необходимо его для начала прочитать:

CAS_optimized(var, expected, new):
    # все, что внутри atomic, выполняется атомарно
    atomic:
        current = var.get()
            if current == expected:
        var.set(new)
        return current

# тогда CAS выражается через CAS_optimized
CAS(var, expected, new):
    return var.CAS_optimized(expected, new) == expected

Такой подход может сэкономить одно чтение. В рамках нашего рассмотрения мы будем использовать простую форму CAS, т.к. при желании такую оптимизацию можно проделать самостоятельно.

В случае распределенных систем каждое изменение версионируется. Т.е. сначала мы читаем значение из хранилища, получая текущую версию данных. А затем пытаемся записать, ожидая, что версия данных не изменилась. При этом версия инкрементируется каждый раз при обновлении данных:

CAS_versioned(var, expected_version, new):
    atomic:
        if var.get_version() != expected_version:
            return false
        var.set(new, expected_version + 1)
        return true

Такой подход позволяет более точно контролировать обновление значений, избегая проблемы ABA. В частности, версионирование поддерживают Etcd и Zookeeper.

Отметим важное свойство, которое дает использование CAS_versioned операций. Дело в том, что такую операцию можно повторять без ущерба вышестоящей логики. В многопоточном программировании данное свойство не имеет особой ценности, т.к. там если операция завершилась неудачно, то мы точно знаем, что она не применилась. В случае же распределенных систем данный инвариант нарушается, т.к. запрос может дойти до получателя, а успешный ответ уже нет. Поэтому важно иметь возможность перепосылать запросы без боязни нарушения инвариантов высокоуровневой логики.

Именно такое свойство и предоставляет операция CAS_versioned. По факту, можно бесконечно повторять эту операцию до тех пор, пока не вернется реальный ответ от получателя. Что, в свою очередь, выкидывает целый класс ошибок, связанный с сетевым взаимодействием.


Пример

Давайте рассмотрим, как на основе CAS_versioned и полутранзакций выполнить перевод с одного аккаунта на другой, которые принадлежат, например, разным экземплярам Etcd. Здесь я предполагаю, что функция CAS_versioned уже реализована соответствующим образом на основе предоставляемого API.

withdraw(from, amount):
    # атомарный цикл
    while true:
        # получение версии и данных
        version_from, amount_from = from.get_versioned()
        if amount_from < amount:
            return error.insufficient_funds
        if from.CAS_versioned(version_from, amount_from - amount):
            break
    return ok

deposit(to, amount):
    # атомарный цикл
    while true:
        version_to, amount_to = to.get_versioned()
        if to.CAS_versioned(version_to, amount_to + amount):
            break
    return ok

transfer(from, to, amount):
    # первая полутранзакция
    if withdraw(from, amount) is ok:
        # если первая полутранзакция произошла успешно,
        # то выполняем последующую
        deposit(to, amount)

Здесь мы разбили нашу операцию на полутранзакции, и каждую полутранзакцию выполняем через операцию CAS_versioned. Такой подход позволяет независимо работать с каждым аккаунтом, допуская использовать разнородные хранилища, никак не связанные друг с другом. Единственная проблема, которая нас тут поджидает — это потеря денег в случае падения текущего процесса в промежутке между полутранзакциями.


Очередь

Для того, чтобы идти дальше, необходимо реализовать очередь событий. Идея в том, что для общения обработчиков друг с другом необходимо иметь упорядоченную очередь сообщений, в которой данные не теряются и не дублируются. Соответственно, все взаимодействие в цепочке обработчиков будет построено на этом примитиве. Также это полезный инструмент для анализа и аудита входящих и исходящих потоков данных. Помимо этого мутации состояний обработчиков также можно производить через очередь.

Очередь будет состоять из пары операций:


  1. Добавление сообщения в конец очереди.
  2. Получение сообщения из очереди по заданному индексу.

В данном контексте я не рассматриваю удаление сообщений из очереди по нескольким причинам:


  1. Из одной и той же очереди могут читать несколько обработчиков. Синхронизация удаления будет представлять из себя нетривиальную задачу, хотя не невозможную.
  2. Полезно сохранять очередь на относительно длительный интервал (день или неделю) для возможности дебагинга и аудита. Полезность такого свойства сложно переоценить.
  3. Удалять старые элементы можно либо по расписанию, либо выставив TTL на элементы очереди. Важно при этом следить, чтобы обработчики успевали обработать данные до того, как придет метла и все подчистит. Если время обработки порядка секунд, а TTL порядка дней, то ничего такого не должно произойти.

Для хранения элементов и эффективной реализации добавления нам понадобятся:


  1. Значение с текущим индексом. Этот индекс указывает на конец очереди для добавления элементов.
  2. Элементы очереди, начиная с нулевого индекса.


Как бы lock-free очередь

Для вставки элемента в очередь нам необходимо обновить два ключа: текущий индекс и вставляемый элемент по текущему индексу. Сразу возникает идея сделать это в следующей последовательности:


  1. Сначала атомарно через CAS увеличиваем текущий индекс на единицу.
  2. Затем по возвращаемому значению записываем вставляемый элемент.

Однако у этого подхода, как это ни странно, есть целых два фатальных недостатка.


  1. Такая реализация не является lock-free. Казалось бы, если мы параллельно вставляем несколько элементов, то хотя бы одна вставка в таком случае завершается успешно. Lock-free? Нет! Дело в том, что у нас 2 операции: вставка и чтение. И хотя вставка сама по себе и является lock-free, однако вставка и чтение — нет! В этом легко убедиться, если предположить, что сразу после атомарного обновления индекса возникла пауза, размером с вечность. Тогда мы никогда не сможем зачитать этот и последующий элементы и будем заблокированы навсегда. Это будет представлять серьезную проблему для доступности нашей очереди, т.к. в случае отказа обработчика в этом месте мы получаем залипание других обработчиков, зачитывающих значение с этой позиции.
  2. Проблемы при взаимодействии нескольких очередей. При падении обработчика после обновления индекса мы не знаем, по какому индексу нам необходимо будет записать значение в случае продолжения работы после восстановления состояния. Этот индекс потеряется навсегда.

Таким образом, крайне важно сохранять lock-free относительно всех операций для сохранения высокой доступности работы обработчиков.


Lock-free реализация добавления

Соответственно, рассуждая логично, остается только другой вариант реализации: в обратном порядке, т.е. сначала добавляем элемент в конец очереди, а потом обновляем индекс:

push(queue, value):
    # получение текущего индекса из очереди
    index = queue.get_current_index()
    while true:
        # получение переменной, указывающей на ячейку
        # для добавления элемента
        var = queue.at(index)
        # версия = 0 соответствует новому значению, т.е. такая проверка
        # означает, что ячейка должна быть пустой в момент записи
        if var.CAS_versioned(0, value):
            # запись произведена успешно, теперь обновляем индекс
            queue.update_index(index + 1)
            break
        # здесь хитрый момент, см. описание ниже
        index = max(queue.get_current_index(), index + 1)

update_index(queue, index):
    while true:
        # получение текущего версионированного значения
        cur_index, version = queue.get_current_index_versioned()
        # текущий индекс может внезапно оказаться больше,
        # чем записываемый, см. описание ниже
        if cur_index >= index:
            # кто-то проактивно обновил на более свежий,
            # а значит работа сделана и можно отдыхать
            break
        if queue.current_index_var().CAS_versioned(version, index):
            # удалось обновить индекс на более свежий, работа закончена
            break
        # кто-то обновил значение.
        # возможно, что индекс все еще недостаточно свежий, попробуем еще

Стоит подробнее остановиться на хитром моменте. Дело в том, что после успешного выполнения первой полутранзакции обработчик может упасть или затупить (падение или отказ в работе — это, вообще говоря, частный случай затупа на вечность). При этом мы хотим сохранить свойство lock-free для нашей системы. Что при этом произойдет?

А произойдет то, что следующий push будет крутиться в цикле бесконечно, ведь текущий индекс теперь некому обновить! Следовательно, это теперь наша задача по обновлению индекса и мы должны проактивно это сделать, самостоятельно подсматривая за следующим элементом очереди.

После записи значения теперь необходимо обновить текущий индекс. Однако тут тоже поджидает нас подводная грабелька: мы не можем просто так взять и перезаписать значение. Дело в том, что если мы затупили по какой-то причине между полутранзакциями, то кто-то другой за это время мог успеть добавить элемент в очередь и обновить текущее значение индекса. А значит, простое обновление индекса не будет работать, т.к. мы просто перетрем более новое значение. Помимо этого нам необходимо обновить именно на самое свежее значение. Какое самое свежее значение будет в этом случае? Оно будет соответствовать самому большому значению индекса, т.к. индекс соответствует позиции в очереди, и чем выше позиция, тем более свежие данные мы записали в очередь.

Стоит отметить, что мы записывали индекс только ради того, чтобы иметь возможность быстро найти конец очереди для дальнейшего добавления. Т.е. этого всего лишь оптимизация. Без индекса эта очередь также бу

© Habrahabr.ru