Акторная модель на Python: Ray, Thespian, Pykka

Привет, Хабр!
В этой статье мы коротко пройдемся по основным вариантам реализации акторной модели на Python.
Акторная модель — это парадигма, которая меняет представление о многопоточности. Вместо того, чтобы бороться с разделяемой памятью и мутексами, создаём независимых «актеров», каждый из которых умеет:
Принимать и обрабатывать сообщения.
Изолировать своё состояние от других акторов.
Спонтанно порождать новых акторов и пересылать сообщения.
Смысл в том, чтобы свести к минимуму всякие гонки данных и блокировки. В этом и вся суть — акторы общаются через сообщения, и если один из них падает, система остаётся живой и может даже перезапустить проблемного актёра.
Различные библиотеки для акторной модели в Python
Рассмотрим такие библиотеки как Ray, Thespian и Pykka.
Ray: это база
Ray — это полноценная платформа для распределённых вычислений, в которой акторы — лишь одна из концепций. Ray позволяет масштабировать задачи, распределять нагрузку по кластеру и даже интегрироваться с библиотеками для машинного обучения.
Начнём с простейшего примера, где актор принимает сообщения и возвращает ответ:
import ray
ray.init(ignore_reinit_error=True)
@ray.remote
class EchoActor:
def __init__(self):
self.state = "Ready"
def echo(self, message):
print(f"[EchoActor] Получено сообщение: {message}")
return f"Echo: {message}"
def get_state(self):
return self.state
# Создаём экземпляр актора
echo_actor = EchoActor.remote()
result = ray.get(echo_actor.echo.remote("Привет, мир!"))
print(result)
print(f"Состояние актора: {ray.get(echo_actor.get_state.remote())}")
ray.shutdown()
Всё происходит асинхронно, а ray.get
позволяет получить результат без лишних блокировок.
Ошибки — это норма. Предположим, актор выполняет рискованную операцию, и нужно автоматически перезапускать его при сбоях. Вот как можно это сделать с Ray:
@ray.remote(max_restarts=3, max_task_retries=1)
class RobustActor:
def __init__(self):
self.counter = 0
def risky_operation(self):
self.counter += 1
# Имитация ошибки при достижении порогового значения
if self.counter % 5 == 0:
raise ValueError("Что-то пошло не так!")
return f"Операция выполнена успешно, счётчик: {self.counter}"
robust_actor = RobustActor.remote()
for i in range(10):
try:
result = ray.get(robust_actor.risky_operation.remote())
print(result)
except Exception as e:
print(f"Ошибка в операции: {e}")
Ray позволяет задать параметры max_restart
s и max_task_retries
, что даёт контроль над поведением в случае сбоев.
Представим сценарий, когда нужно обработать сотни задач одновременно. Вместо того чтобы мучиться с пулом потоков, создаем пул акторов:
import time
@ray.remote
class Worker:
def __init__(self, worker_id):
self.worker_id = worker_id
def process(self, task):
print(f"[Worker {self.worker_id}] Обработка задачи: {task}")
time.sleep(1) # имитируем долгую операцию
return f"Результат {task} от Worker {self.worker_id}"
workers = [Worker.remote(i) for i in range(5)]
tasks = [f"task-{i}" for i in range(20)]
results = [workers[i % len(workers)].process.remote(task) for i, task in enumerate(tasks)]
final_results = ray.get(results)
for res in final_results:
print(res)
ray.shutdown()
Эта схема позволяет легко масштабировать обработку задач, просто добавляя или убирая акторы.
Thespian: классика акторной модели
Thespian — библиотека, которая живёт и дышит акторной моделью. Если вы хотите почувствовать всю мощь классической реализации, где каждый актор реализует метод receiveMessage
, то Thespian для вас.
Простой актор на Thespian:
from thespian.actors import Actor, ActorSystem, ActorExitRequest
class EchoActor(Actor):
def receiveMessage(self, message, sender):
if message == 'shutdown':
self.send(sender, 'shutting down')
self.exit() # завершаем работу актора
else:
response = f"Echo: {message}"
print(f"[EchoActor] Обработка сообщения: {message}")
self.send(sender, response)
if __name__ == '__main__':
actor_system = ActorSystem('multiprocTCPBase')
echo_actor = actor_system.createActor(EchoActor)
response = actor_system.ask(echo_actor, "Привет, Thespian!", timeout=3)
print(f"Ответ актора: {response}")
shutdown_response = actor_system.ask(echo_actor, "shutdown", timeout=3)
print(f"Shutdown response: {shutdown_response}")
actor_system.shutdown()
Усложним задачу и создадим систему с координатором, который распределяет задачи между воркерами.
from thespian.actors import Actor, ActorSystem, ActorExitRequest
class Worker(Actor):
def receiveMessage(self, message, sender):
if isinstance(message, dict) and 'task' in message:
task = message['task']
print(f"[Worker] Получена задача: {task}")
result = f"Результат обработки {task}"
self.send(sender, result)
class Coordinator(Actor):
def __init__(self):
self.workers = []
def receiveMessage(self, message, sender):
if message == 'init':
for _ in range(4): # создаём 4 воркера
worker = self.createActor(Worker)
self.workers.append(worker)
self.send(sender, "Workers initialized")
elif isinstance(message, dict) and 'task' in message:
worker = self.workers[hash(message['task']) % len(self.workers)]
self.send(worker, message)
elif message == 'shutdown':
for worker in self.workers:
self.send(worker, ActorExitRequest())
self.send(sender, "Coordinator shutting down")
if __name__ == '__main__':
actor_system = ActorSystem('multiprocTCPBase')
coordinator = actor_system.createActor(Coordinator)
init_response = actor_system.ask(coordinator, 'init', timeout=3)
print(init_response)
tasks = ['task-A', 'task-B', 'task-C', 'task-D', 'task-E']
for task in tasks:
response = actor_system.ask(coordinator, {'task': task}, timeout=3)
print(f"Ответ на {task}: {response}")
shutdown_response = actor_system.ask(coordinator, 'shutdown', timeout=3)
print(shutdown_response)
actor_system.shutdown()
Здесь координатор распределяет задачи по круговой схеме, что дает балансировку нагрузки
Pykka: акторная модель для любителей простоты
Если хочется чего‑то между классическим подходом и современными библиотеками, загляните в Pykka. Эта библиотека предоставляет реализацию акторов на основе потоков или gevent. Она проста в освоении и даёт возможность экспериментировать с акторной моделью без лишней суеты.
Простой актор на Pykka:
import pykka
class EchoActor(pykka.ThreadingActor):
def __init__(self):
super().__init__()
self.state = "Ready"
def echo(self, message):
print(f"[EchoActor] Получено сообщение: {message}")
return f"Echo: {message}"
def get_state(self):
return self.state
if __name__ == '__main__':
actor_ref = EchoActor.start()
result = actor_ref.proxy().echo("Привет, Pykka!").get()
print(result)
state = actor_ref.proxy().get_state().get()
print(f"Состояние актора: {state}")
actor_ref.stop()
А теперь представим сценарий, где один актор должен делегировать задачи другому. В Pykka это делается очень просто:
import time
import pykka
class WorkerActor(pykka.ThreadingActor):
def process(self, task):
print(f"[WorkerActor] Обработка задачи: {task}")
time.sleep(1) # имитация работы
return f"Результат задачи: {task}"
class ManagerActor(pykka.ThreadingActor):
def __init__(self):
super().__init__()
self.worker = WorkerActor.start()
def handle_task(self, task):
# делегируем задачу воркеру
result = self.worker.proxy().process(task).get()
return f"Manager получил: {result}"
def on_stop(self):
self.worker.stop()
if __name__ == '__main__':
manager = ManagerActor.start()
tasks = ['task-1', 'task-2', 'task-3']
for task in tasks:
res = manager.proxy().handle_task(task).get()
print(res)
manager.stop()
Pykka позволяет легко комбинировать акторы, и если что‑то пойдёт не так, всё можно отлавливать через стандартные механизмы обработки исключений Python.
Кейсы применения на Ray
Динамически масштабируемый пул воркеров для обработки данных
Представьте, чтос есть большой поток данных (например, веб‑запросы, сообщения из Kafka, телеметрия от IoT‑устройств). Нужно обработать эти данные максимально быстро, при этом:
Балансировать нагрузку: если поток данных увеличивается, мы автоматически создаём больше воркеров.
Оптимизировать вычисления: не держим воркеры в памяти, если работы мало.
Обрабатывать задачи асинхронно: без блокировок и перегрузки.
Решаем с помощью диспетчера, который принимает входящие задачи и решает, сколько воркеров запустить. Пула акторов‑воркеров, которыйобрабатывает задачи параллельно.
Если воркеров слишком мало — создаём больше, если слишком много — убираем лишних.
Код:
import ray
import time
import random
ray.init(ignore_reinit_error=True)
# Воркеры, которые обрабатывают задачи
@ray.remote
class Worker:
def __init__(self, worker_id):
self.worker_id = worker_id
def process(self, task):
execution_time = random.uniform(0.5, 2.0) # Имитация нагрузки
time.sleep(execution_time)
print(f"[Worker-{self.worker_id}] Обработал задачу: {task} за {execution_time:.2f} сек.")
return f"Результат {task} от Worker-{self.worker_id}"
# Актор-диспетчер, который управляет воркерами
@ray.remote
class Dispatcher:
def __init__(self):
self.workers = [] # Пул воркеров
self.task_queue = [] # Очередь задач
def add_task(self, task):
self.task_queue.append(task)
print(f"[Dispatcher] Добавлена задача: {task}")
self._scale_workers()
def _scale_workers(self):
active_tasks = len(self.task_queue)
required_workers = min(10, max(1, active_tasks // 2)) # Держим баланс воркеров
# Создаём новых воркеров, если их не хватает
while len(self.workers) < required_workers:
worker = Worker.remote(len(self.workers))
self.workers.append(worker)
print(f"[Dispatcher] Добавлен новый воркер {len(self.workers)}")
# Убираем лишних воркеров, если работы мало
while len(self.workers) > required_workers:
self.workers.pop()
print(f"[Dispatcher] Удалён воркер, активных: {len(self.workers)}")
def process_tasks(self):
results = []
for task in self.task_queue:
worker = random.choice(self.workers)
results.append(worker.process.remote(task))
self.task_queue = [] # Очищаем очередь
return results
dispatcher = Dispatcher.remote()
# Добавляем 15 задач в диспетчер
for i in range(15):
ray.get(dispatcher.add_task.remote(f"Task-{i}"))
# Запускаем обработку задач
results = ray.get(dispatcher.process_tasks.remote())
print(ray.get(results))
ray.shutdown()
Диспетчер следит за нагрузкой: если задач больше — увеличивает пул воркеров, если меньше — уменьшает. Все задачи обрабатываются асинхронно, а балансировка нагрузки автоматическая, без явных очередей и блокировок.
Акторный пайплайн для машинного обучения
Допустим, есть пайплайн обработки данных для ML. Мы должны:
Загрузить и предобработать данные.
Прогнать их через модель.
Проанализировать результаты и сохранить их.
И всё это асинхронно и распределённо. Если данные летят пачками, нужна очередь задач, и чтобы каждый этап не тормозил последующий.
Как решаем:
Создаём три типа акторов, которые работают в цепочке:
DataLoader — загружает данные.
MLModel — прогоняет данные через модель.
ResultProcessor — анализирует и сохраняет.
Каждый актор работает независимо, а между ними — сообщения.
Код:
import ray
import time
import random
ray.init(ignore_reinit_error=True)
# 1. Актор загрузки данных
@ray.remote
class DataLoader:
def load_data(self, batch_size):
time.sleep(1) # Имитация загрузки
data = [random.random() for _ in range(batch_size)]
print(f"[DataLoader] Загружены данные: {data[:3]}...")
return data
# 2. Актор, который обрабатывает данные с помощью ML-модели
@ray.remote
class MLModel:
def process_data(self, data):
time.sleep(2) # Имитация вычислений
results = [d * 0.5 + random.uniform(-0.1, 0.1) for d in data]
print(f"[MLModel] Обработаны данные, первые 3 результата: {results[:3]}")
return results
# 3. Актор, который анализирует и сохраняет результаты
@ray.remote
class ResultProcessor:
def save_results(self, results):
time.sleep(1) # Имитация сохранения
print(f"[ResultProcessor] Сохранены результаты: {results[:3]}")
return "Результаты сохранены"
# Запуск пайплайна
data_loader = DataLoader.remote()
ml_model = MLModel.remote()
result_processor = ResultProcessor.remote()
# Последовательное выполнение пайплайна через акторов
batch = ray.get(data_loader.load_data.remote(10))
processed = ray.get(ml_model.process_data.remote(batch))
final_result = ray.get(result_processor.save_results.remote(processed))
print(final_result)
ray.shutdown()
Три независимых этапа пайплайна работают через акторов. Никаких блокировок — каждый узел пайплайна работает независимо. Можно легко масштабировать: если данных больше, просто создаём больше DataLoader или MLModel.
Если у вас есть свои примеры или вы хотите поделиться опытом использования акторной модели, смело оставляйте комментарии.
Больше актуальных навыков по архитектуре приложений вы можете получить в рамках практических онлайн-курсов от экспертов отрасли.