Как написать Raft на чистом Python: основы

72c896471319680523a7721ac4f3f909.png

Привет, друзья! Сегодня рассмотрим, как реализовать алгоритм Raft на Python.

Raft — это алгоритм распределённого консенсуса, который делает три вещи:

  1. Выбирает лидера (тот, кто рулит кластером).

  2. Реплицирует данные по всем узлам (чтобы не потерять, если что‑то пойдет не так).

  3. Гарантирует согласованность данных (никакой битой записи в журнале).

Представьте себе группу людей, пытающихся решить, какую пиццу заказать. Если нет лидера — хаос. Если кто‑то заказал ананасовую, а другие решили, что это была шутка, — еще хуже. Вот Raft и помогает избежать этой катастрофы.

Структура проекта

Определимся, что мы собираемся написать:

  • Node — сердце системы, представляет один узел кластера.

  • Механизмы:

    • Выборы лидера.

    • Репликация журнала.

    • Управление состоянием.

  • Дополнительно:

    • Обработка отказов.

    • Оптимизация производительности.

    • Клиентская логика.

    • Тестирование и отказоустойчивость.

Узел кластера

Каждый узел в Raft знает:

  • Своё состояние (лидер, кандидат, или — чаще всего — просто унылый follower).

  • Текущий термин (порядковый номер цикла выборов).

  • Свой журнал (лог операций).

Начнём с базовой структуры:

import random
import threading
import time
from enum import Enum

class State(Enum):
    FOLLOWER = 1
    CANDIDATE = 2
    LEADER = 3

class Node:
    def __init__(self, node_id, peers):
        self.id = node_id
        self.peers = peers  # Список других узлов
        self.state = State.FOLLOWER
        self.current_term = 0
        self.voted_for = None
        self.log = []
        self.commit_index = -1
        self.last_applied = -1
        self.next_index = {}
        self.match_index = {}
        self.lock = threading.Lock()
        self.election_timeout = self.reset_election_timeout()
        self.disabled = False  # Инициализация флага отключения
        self.timer = threading.Thread(target=self.run_election_timer, daemon=True)
        self.timer.start()

    def reset_election_timeout(self):
        return time.time() + random.uniform(5, 10)

Здесь определяем класс Node с его основными атрибутами и инициализируем таймер выборов для каждого узла.

Таймер выборов

Узлы в Raft знают, что если лидер долго молчит, значит, пора искать нового. Здесь и поможет таймер выборов.

def run_election_timer(self):
    while True:
        time.sleep(0.1)
        with self.lock:
            if self.disabled:
                continue
            if time.time() >= self.election_timeout:
                print(f"Узел {self.id}: лидер потерян, начинаю выборы.")
                self.state = State.CANDIDATE
                self.current_term += 1
                self.voted_for = self.id
                self.election_timeout = self.reset_election_timeout()
                threading.Thread(target=self.start_election, daemon=True).start()

Этот метод постоянно проверяет, не истёк ли таймаут, и при необходимости инициирует процесс выборов нового лидера.

Выборы

Когда узел становится кандидатом, он рассылает всем остальным запросы на голосование:

def start_election(self):
    votes = 1  # Голосуем за себя
    for peer in self.peers:
        if self.disabled:
            continue
        if peer.request_vote(self.current_term, self.id):
            votes += 1
    if votes > len(self.peers) // 2:
        print(f"Узел {self.id}: выбран лидером!")
        self.become_leader()
    else:
        print(f"Узел {self.id}: выборы провалились.")

Здесь узел собирает голоса от своих собратьев. Если набирается большинство — он становится лидером.

Как голосуют узлы?

Каждый узел отвечает на запросы голосования:

def request_vote(self, term, candidate_id):
    with self.lock:
        if self.disabled:
            return False
        if term > self.current_term:
            self.current_term = term
            self.voted_for = None
            self.state = State.FOLLOWER
        if self.voted_for is None or self.voted_for == candidate_id:
            print(f"Узел {self.id}: голосую за {candidate_id}.")
            self.voted_for = candidate_id
            self.election_timeout = self.reset_election_timeout()
            return True
        else:
            print(f"Узел {self.id}: отказал в голосе {candidate_id}.")
            return False

Этот метод решает, дать ли свой голос кандидату, основываясь на текущем терминe и предыдущих голосах.

Лидерство

Если кандидат получает большинство голосов, он становится лидером:

def become_leader(self):
    self.state = State.LEADER
    print(f"Узел {self.id}: я лидер!")
    for peer in self.peers:
        self.next_index[peer.id] = len(self.log)
        self.match_index[peer.id] = -1
    threading.Thread(target=self.send_heartbeats, daemon=True).start()

При становлении лидером узел инициализирует индексы для репликации журнала и начинает рассылку сердцебиений.

Сердцебиения

Лидер периодически шлёт всем узлам «сигналы жизни»:

def send_heartbeats(self):
    while True:
        with self.lock:
            if self.state != State.LEADER or self.disabled:
                break
        for peer in self.peers:
            if self.disabled:
                continue
            print(f"Лидер {self.id}: отправляю heartbeat узлу {peer.id}.")
            threading.Thread(target=self.append_entries, args=(peer,), daemon=True).start()
        time.sleep(1)

Этот цикл обеспечивает поддержание лидерства и синхронизацию журнала с другими узлами.

Репликация журнала

Когда клиент отправляет команду,  лидер добавляет её в журнал и синхронизирует с другими узлами:

def client_command(self, command):
    with self.lock:
        if self.state != State.LEADER or self.disabled:
            print(f"Узел {self.id}: я не лидер, перенаправляю запрос.")
            return False
        entry = {'term': self.current_term, 'command': command}
        self.log.append(entry)
        print(f"Лидер {self.id}: добавляю команду {command} в журнал.")
        threading.Thread(target=self.replicate_log, daemon=True).start()
        return True

Здесь лидер обрабатывает команду клиента, добавляя её в свой журнал и инициируя процесс репликации на других узлах.

Репликация на другие узлы

def replicate_log(self):
    while True:
        with self.lock:
            if self.disabled:
                return
            replicated = 1  # Лидер уже имеет запись
            for peer in self.peers:
                if self.match_index.get(peer.id, -1) >= len(self.log) - 1:
                    replicated += 1
            if replicated > len(self.peers) // 2:
                self.commit_index = len(self.log) - 1
                self.apply_log()
                break
        time.sleep(0.1)

Этот метод гарантирует, что запись будет реплицирована на большинстве узлов перед её применением.

Применение журнала к состоянию

Метод apply_log применяет подтверждённые записи к состоянию узла:

def apply_log(self):
    with self.lock:
        while self.last_applied < self.commit_index:
            self.last_applied += 1
            entry = self.log[self.last_applied]
            # Здесь мы применяем команду к состоянию
            print(f"Узел {self.id} применил команду: {entry['command']}")

Последовательно применяем команды из журнала к состоянию узла.

Обработка AppendEntries

Узлы должны уметь принимать записи от лидера:

def append_entries(self, peer):
    with self.lock:
        if self.disabled:
            return
        prev_log_index = self.next_index.get(peer.id, 0) - 1
        prev_log_term = self.log[prev_log_index]['term'] if prev_log_index >= 0 and prev_log_index < len(self.log) else -1
        entries = self.log[self.next_index.get(peer.id, 0):]
        term = self.current_term
    success = peer.receive_append_entries(
        term=term,
        leader_id=self.id,
        prev_log_index=prev_log_index,
        prev_log_term=prev_log_term,
        entries=entries,
        leader_commit=self.commit_index
    )
    with self.lock:
        if self.disabled:
            return
        if success:
            self.match_index[peer.id] = self.next_index.get(peer.id, 0) + len(entries) - 1
            self.next_index[peer.id] = self.match_index[peer.id] + 1
        else:
            self.next_index[peer.id] = max(0, self.next_index.get(peer.id, 0) - 1)

Этот метод отправляет записи журнала другим узлам и обновляет индексы в зависимости от ответа.

А теперь реализуем обработку входящих AppendEntries

def receive_append_entries(self, term, leader_id, prev_log_index, prev_log_term, entries, leader_commit):
    with self.lock:
        if self.disabled:
            return False
        if term < self.current_term:
            return False
        self.state = State.FOLLOWER
        self.current_term = term
        self.election_timeout = self.reset_election_timeout()
        if prev_log_index >= 0:
            if len(self.log) <= prev_log_index or self.log[prev_log_index]['term'] != prev_log_term:
                return False
        # Добавляем новые записи, если их ещё нет
        for entry in entries:
            if len(self.log) > prev_log_index + 1:
                if self.log[prev_log_index + 1]['term'] != entry['term']:
                    self.log = self.log[:prev_log_index + 1]
                    self.log.append(entry)
            else:
                self.log.append(entry)
        if leader_commit > self.commit_index:
            self.commit_index = min(leader_commit, len(self.log) - 1)
            threading.Thread(target=self.apply_log, daemon=True).start()
        return True

Этот метод будет проверять согласованность журнала и обновлять его, если все в порядке.

Симуляция отказов

Чтобы протестировать отказоустойчивость, добавим возможность отключать узлы:

def disable(self):
    with self.lock:
        self.disabled = True
        print(f"Узел {self.id} отключен.")

def enable(self):
    with self.lock:
        self.disabled = False
        self.election_timeout = self.reset_election_timeout()
        print(f"Узел {self.id} включен.")

И изменим методы отправки сообщений, чтобы учитывать состояние узла:

def request_vote(self, term, candidate_id):
    if getattr(self, 'disabled', False):
        return False
    # Остальной код...

def receive_append_entries(self, *args, **kwargs):
    if getattr(self, 'disabled', False):
        return False
    # Остальной код...

Теперь можно симулировать отключение узлов и проверять, как кластер реагирует на это.

Протестируем кластер

Создадим несколько узлов и запустим их:

# main.py

from node import Node, State
import time

if __name__ == "__main__":
    nodes = []
    # Создаем все узлы сначала с пустыми peers
    for i in range(5):
        node = Node(node_id=i, peers=[])
        nodes.append(node)
    # Теперь устанавливаем peers для каждого узла
    for node in nodes:
        node.peers = [peer for peer in nodes if peer.id != node.id]
    leader = None
    # Ждем, пока лидер не будет выбран
    while not leader:
        for node in nodes:
            if node.state == State.LEADER:
                leader = node
                break
        time.sleep(0.5)
    print(f"Лидер выбран: Узел {leader.id}")
    leader.client_command("Сохранить данные")
    # Отключаем лидера
    leader.disable()
    print(f"Узел {leader.id} отключен")
    time.sleep(15)
    # Проверяем, выбран ли новый лидер
    new_leader = None
    for node in nodes:
        if node.state == State.LEADER and not getattr(node, 'disabled', False):
            new_leader = node
            break
    if new_leader:
        print(f"Новый лидер: Узел {new_leader.id}")
        new_leader.client_command("Новая команда")
    else:
        print("Не удалось выбрать нового лидера.")

Скрипт создаст пять узлов, инициирует выборы лидера, отправит команды и симулирует отказ лидера. Вот что можно ожидать в консоли:

# main.py

from node import Node, State
import time

if __name__ == "__main__":
    nodes = []
    # Создаем все узлы сначала с пустыми peers
    for i in range(5):
        node = Node(node_id=i, peers=[])
        nodes.append(node)
    # Теперь устанавливаем peers для каждого узла
    for node in nodes:
        node.peers = [peer for peer in nodes if peer.id != node.id]
    leader = None
    # Ждем, пока лидер не будет выбран
    while not leader:
        for node in nodes:
            if node.state == State.LEADER:
                leader = node
                break
        time.sleep(0.5)
    print(f"Лидер выбран: Узел {leader.id}")
    leader.client_command("Сохранить данные")
    # Отключаем лидера
    leader.disable()
    print(f"Узел {leader.id} отключен")
    time.sleep(15)
    # Проверяем, выбран ли новый лидер
    new_leader = None
    for node in nodes:
        if node.state == State.LEADER and not getattr(node, 'disabled', False):
            new_leader = node
            break
    if new_leader:
        print(f"Новый лидер: Узел {new_leader.id}")
        new_leader.client_command("Новая команда")
    else:
        print("Не удалось выбрать нового лидера.")

Вывод получится такой:

Узел 0: лидер потерян, начинаю выборы.
Узел 0: голосую за 0.
Узел 1: голосую за 0.
Узел 2: голосую за 0.
Узел 3: голосую за 0.
Узел 4: голосую за 0.
Узел 0: выбран лидером!
Узел 0: я лидер!
Лидер 0: отправляю heartbeat узлу 1.
Лидер 0: отправляю heartbeat узлу 2.
Лидер 0: отправляю heartbeat узлу 3.
Лидер 0: отправляю heartbeat узлу 4.
Узел 0: добавляю команду Сохранить данные в журнал.
Лидер 0: отправляю heartbeat узлу 1.
Лидер 0: отправляю heartbeat узлу 2.
Лидер 0: отправляю heartbeat узлу 3.
Лидер 0: отправляю heartbeat узлу 4.
Узел 0 отключен
Узел 1: лидер потерян, начинаю выборы.
Узел 1: голосую за 1.
Узел 2: голосую за 1.
Узел 3: голосую за 1.
Узел 4: голосую за 1.
Узел 1: выбран лидером!
Узел 1: я лидер!
Лидер 1: отправляю heartbeat узлу 0.
Лидер 1: отправляю heartbeat узлу 2.
Лидер 1: отправляю heartbeat узлу 3.
Лидер 1: отправляю heartbeat узлу 4.
Новый лидер: Узел 1
Узел 1: добавляю команду Новая команда в журнал.
Лидер 1: отправляю heartbeat узлу 0.
Лидер 1: отправляю heartbeat узлу 2.
Лидер 1: отправляю heartbeat узлу 3.
Лидер 1: отправляю heartbeat узлу 4.
Узел 1: применил команду: Новая команда

В начале узлы начинают выборы лидера и узел 0 становится первым лидером после получения большинства голосов. Затем лидер 0 начинает отправлять сердцебиения и реплицировать команду «Сохранить данные». Когда мы отключаем лидера 0, узел 1 обнаруживает его отсутствие, инициирует новые выборы и становится новым лидером, после чего успешно обрабатывает и реплицирует команду «Новая команда».

Но тут сделаю замечание, что из‑за асинхронности и случайных таймаутов порядок событий может меняться при каждом запуске.

Если захотите прикрутить Raft в реальный проект, то нужно будет добавить обработку сетевых сбоев, механизм «догонки» отставших узлов, и проверки корректности журналов. Улучшите производительность, перейдя на асинхронное программирование, минимизируйте блокировки и оптимизируйте передачу данных. Реализуйте обратную связь для клиентов, чтобы они получали подтверждения, и протестируйте систему в сложных сценариях отказов.

Всех желающих приглашаю на открытые уроки по архитектуре высоких нагрузок:

9 декабря: «Обеспечение отказоустойчивости хранилищ» — Вы узнаете, как правильно проектировать и настраивать хранилища, чтобы минимизировать простои и предотвратить потерю данных при сбоях. Записаться

16 декабря: «Распределённые транзакции — как добиться согласования данных в распределённой сети». Записаться

© Habrahabr.ru