Python реализация парадигмы event-driven с помощью сопрограмм

Статья про то, как с помощью расширенных генераторов Python сделать собственную реализацию сопрограмм, переключающихся по получению событий. Простота кода получившегося модуля вас приятно удивит и прояснит новые и мало используемые возможности языка, которые можно получить, используя такие генераторы. Статья поможет разобраться и с тем, как это устроено в серьезных реализациях: asyncio, tornado, etc.Теоретические моменты и disclaimerПонятие сопрограмма имеет очень широкое толкование, поэтому следует определиться, какими характеристиками они будут обладать в нашей реализации: Выполняются совместно в одном потоке; Выполнение может прерываться для ожидания определенного события; Выполнение может возобновиться после получения ожидаемого события; Может вернуть результат по завершению. Как следствие получаем: событийно-ориентированное программирование без функций обратного вызова и кооперативную многозадачность. Эффект от использования такой парадигмы программирования будет существенным только для задач, реагирующих на неравномерно поступающие события. В первую очередь это задачи обработки I/O: сетевые сервера, пользовательские интерфейсы, и т. п. Другой возможный вариант применения — это задачи расчета состояния персонажей в игровом мире. Но категорически не подойдет для задач, которые производят долгие расчеты.Следует четко понимать, что пока выполняющаяся сопрограмма не прервалась на ожидание события, все остальные находятся в состоянии останова, даже если ожидаемое ими событие уже произошло.Основа всего В Python хорошей основой для всего этого являются генераторы, если их правильно приготовить в прямом и переносном смысле. Точнее расширенные генераторы, API которых окончательно сформировался в версии Python 3.3. В предыдущих версиях не было реализовано возвращение значения (результата) по завершению работы генератора и не было удобного механизма вызова одного генератора из другого. Тем не менее, реализации сопрограмм были и раньше, но из-за ограничений обычных генераторов они были не так «красивы» как то, что получится у нас. Ниже рассмотрены возможности расширенных генераторов, которые нам понадобятся.Передача сообщений в сопрограмму у нас будет построена на возможности задать генератору его состояние. Скопируйте код ниже в окно запущенного интерпретатора Python версии 3.3 и выше. def gen_factory (): state = None while True: print («state:», state) state = yield state

gen = gen_factory () Генератор создан, его надо запустить. >>> next (gen) state: None Получено исходное состояние. Изменим состояние: >>> gen.send («OK») state: OK 'OK' Видим что состояние изменилось и возвращено в результате. Следующие вызовы next будут возвращать уже его.Зачем нам все это? Представьте задачу: передавать привет Петрову раз в две секунды, Иванову раз в три секунды, а всему миру раз в пять секунд. В виде Python кода можно представить как-то так: def hello (name, timeout): while True: sleep (timeout) print («Привет, {}!».format (name))

hello («Петров», 2.0) hello («Иванов», 3.0) hello («Мир», 5.0) Смотрится хорошо, но приветы будет получать только Петров. Однако! Небольшая модификация не влияющая на ясность кода, а даже наоборот — уточняющая нашу мысль, и это уже может заработать как положено. @coroutine def hello (name, timeout): while True: yield from sleep (timeout) print («Привет, {}!».format (name))

hello («Петров», 2.0) hello («Иванов», 3.0) hello («Мир», 5.0) run () Код получился в стиле pythonic way — наглядно иллюстрирует задачу, линейный без калбэков, без лишних наворотов с объектами, любые комментарии в нем излишни. Осталось только реализовать декоратор coroutine, свою версию функции sleep и функцию run. В реализации, конечно, без наворотов не обойдется. Но это тоже pythonic way, прятать за фасадом библиотечных модулей всю магию.Самое интересное Назовем модуль с реализацией незатейливо — concurrency, со смыслом и отражает тот факт, что это фактически, будет реализация кооперативной многозадачности. Понятно, что декоратор должен будет сделать из обычной функции генератор и запустить его (сделать первый вызов next). Конструкция языка yield from пробрасывает вызов в следующий генератор. То есть функция sleep должна создать генератор, в котором можно спрятать всю магию. В генератор, ее вызвавший, вернется только код полученного события. Здесь возвращаемый результат не обрабатывается, код тут может получить по сути только один результат, означающий что тайм-аут истек. Ожидание же ввода-вывода может возвращать разные виды событий, например (чтение/запись/тайм аут). Более того, генераторы порождаемые функциями типа sleep могут вернуть по yield from любой тип данных и соответственно их функционал может быть не ограничен ожиданием событий. Функция run запустит диспетчер событий, его задача — получить событие извне и/или сгенерировать внутри, определить его получателя и собственно отправить.Начнем с декоратора: class coroutine (object): »«Делает из функции сопрограмму на базе расширенного генератора.»« _current = None

def __init__(self, callable): self._callable = callable

def __call__(self, *args, **kwargs): corogen = self._callable (*args, **kwargs) cls = self.__class__ if cls._current is None: try: cls._current = corogen next (corogen) finally: cls._current = None return corogen Он выполнен в виде класса, типичный прием, как и обещал, он создает и запускает генератор. Конструкция с _current добавлена для того, чтобы избежать запуска генератора, если декорированная функция, его создающая вызывается внутри тела другого генератора. В этом случае первый вызов будет и так сделан. Так же это поможет разобраться, в какой генератор должно быть передано событие, чтобы оно попало по цепочке в генератор, созданный функцией sleep. def sleep (timeout): »«Приостанавливает выполнение до получения события «таймаут истек».»« corogen = coroutine._current dispatcher.setup_timeout (corogen, timeout) revent = yield return revent Здесь видим вызов dispatcher.setup_sleep, это сообщает диспетчеру событий, что генератор такой-то ожидает событие «тайм-аут» по истечению заданного параметром timeout количества секунд. from collections import deque from time import time, sleep as sys_sleep

class Dispatcher (object): »«Объект реализующий диспечер событий.»« def __init__(self): self._pending = deque () self._deadline = time () + 3600.0

def setup_timeout (self, corogen, timeout): deadline = time () + timeout self._deadline = min ([self._deadline, deadline]) self._pending.append ([corogen, deadline]) self._pending = deque (sorted (self._pending, key=lambda a: a[1]))

def run (self): »«Запускает цикл обработки событий.»« while len (self._pending) > 0: timeout = self._deadline — time () self._deadline = time () + 3600.0 if timeout > 0: sys_sleep (timeout) while len (self._pending) > 0: if self._pending[0][1] <= time(): corogen, _ = self._pending.popleft() try: coroutine._current = corogen corogen.send("timeout") except StopIteration: pass finally: coroutine._current = None else: break

dispatcher = Dispatcher () run = lambda: dispatcher.run () В коде диспетчера событий тоже нет ничего необычного. Куда передавать события определяется с помощью переменной класса coroutine._current. При загрузке модуля создается экземпляр класса, в рабочей реализации это конечно же должен быть синглетон. Класс collections.deque задействован вместо списка, так как побыстрее и полезен своим методом popleft. Ну вот собственно и все, и нет какой-то особой магии. Вся она на поверку спрятана еще глубже, в реализации расширенных генераторов Python. Их остается только правильно приготовить.Файл: concurrency.py # concurrency.py from collections import deque from time import time, sleep as sys_sleep

class coroutine (object): »«Делает из функции сопрограмму на базе расширенного генератора.»« _current = None

def __init__(self, callable): self._callable = callable

def __call__(self, *args, **kwargs): corogen = self._callable (*args, **kwargs) cls = self.__class__ if cls._current is None: try: cls._current = corogen next (corogen) finally: cls._current = None return corogen

def sleep (timeout): »«Приостанавливает выполнение до получения события «таймаут истек».»« corogen = coroutine._current dispatcher.setup_timeout (corogen, timeout) revent = yield return revent

class Dispatcher (object): »«Объект реализующий диспечер событий.»« def __init__(self): self._pending = deque () self._deadline = time () + 3600.0

def setup_timeout (self, corogen, timeout): deadline = time () + timeout self._deadline = min ([self._deadline, deadline]) self._pending.append ([corogen, deadline]) self._pending = deque (sorted (self._pending, key=lambda a: a[1]))

def run (self): »«Запускает цикл обработки событий.»« while len (self._pending) > 0: timeout = self._deadline — time () self._deadline = time () + 3600.0 if timeout > 0: sys_sleep (timeout) while len (self._pending) > 0: if self._pending[0][1] <= time(): corogen, _ = self._pending.popleft() try: coroutine._current = corogen corogen.send("timeout") except StopIteration: pass finally: coroutine._current = None else: break

dispatcher = Dispatcher () run = lambda: dispatcher.run () Файл: sample.py # sample.py from concurency import coroutine, sleep, run

@coroutine def hello (name, timeout): while True: yield from sleep (timeout) print («Привет, {}!».format (name))

hello («Петров», 2.0) hello («Иванов», 3.0) hello («Мир», 5.0) run () Outro Если тема интересная, можно продолжить в сторону реализации ожидания событий ввода/вывода с асинхронным TCP Echo сервером в качестве примера. С реальным диспетчером событий, реализованным в виде динамической библиотеки написанной на другом, более быстром, чем Python языке.

© Habrahabr.ru