Какая асинхронность должна была бы быть в Python

?v=1

В последние несколько лет ключевое слово async и семантика асинхронного программирования проникла во многие популярные языки программирования: JavaScript, Rust, C#, и многие другие. Конечно, в Python тоже есть async/await, они появились в Python 3.5.

В этой статье хочу обсудить проблемы асинхронного кода, порассуждать об альтернативах и предложить новый подход поддерживать и синхронные, и асинхронные приложения одновременно.

Цвет функций


Когда в язык программирования включают асинхронные функции, он по сути раскалывается надвое. Появляются красные функции (или асинхронные), а некоторые функции остаются синими (синхронными).

Основная проблема в том, что синие функции не могут вызывать красные, но красные потенциально могут вызвать синие. В Python, например, это частично так: асинхронные функции могут вызывать только синхронные неблокирующие функции. Но определить по описанию, блокирующая функция или нет, невозможно. Python же скриптовый язык.

Этот раскол приводит к разделению языка на два подмножества: синхронное и асинхронное. Python 3.5 вышел больше пяти лет назад, но async все еще поддерживается далеко не так хорошо, как синхронные возможности Python.

Больше о цветах функции можно прочитать в этой замечательной статье.

Дублирование кода


Разные цвета функций на практике означают дублирование кода.

Представьте, вы разрабатываете CLI-инструмент для извлечения размера веб-страницы и хотите поддерживать и синхронный, и асинхронный способы его работы. Например, это нужно, если вы пишете библиотеку и не знаете, как будет использоваться ваш код. И речь не только о библиотеках PyPI, но и о собственных библиотеках с общей логикой для разных сервисов, написанных, например, на Django и aiohttp. Хотя, конечно, независимые приложения в основном пишутся или только синхронно, или только асинхронно.

Начнём с синхронного псевдокода:

def fetch_resource_size(url: str) -> int:
    response = client_get(url)
    return len(response.content)


Выглядит хорошо. Теперь посмотрим на асинхронный аналог:

async def fetch_resource_size(url: str) -> int:
    response = await client_get(url)
    return len(response.content)


В целом, это тот же самый код, но с добавлением слов async и await. И я это не выдумал — сравните примеры кода в туториале по httpx:
Там точно такая же картина.

Абстракция и композиция


Получается, нужно переписать весь синхронный код и расставить тут и там async и await, чтобы программа стала асинхронной.

В решении этой проблемы могут помочь два принципа. Во-первых, перепишем императивный псевдокод в функциональный. Это позволит увидеть картину более ясно.

def fetch_resource_size(url: str) -> Abstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


Вы спросите, что это за метод .map, что он делает. Так в функциональном стиле происходит композиция сложных абстракций и чистых функций. Это позволяет создать новую абстракцию с новым состоянием из существующей. Предположим, client_get(url) изначально возвращает Abstraction[Response], а вызов .map(lambda response: len(response.content)) преобразует ответ в требуемый экземпляр Abstraction[int].

Становится понятно, что делать дальше. Обратите внимание, как легко мы перешли от нескольких независимых шагов к последовательному вызову функций. К тому же мы изменили тип ответа: теперь функция возвращает некоторую абстракцию.

Перепишем код для работы с асинхронной версией:

def fetch_resource_size(url: str) -> AsyncAbstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


Единственное, что отличается, — это тип возвращаемого значения — AsyncAbstraction. В остальном код остался точно таким же. Больше не нужно использовать ключевые слова async и await. await не используется вообще (ради этого всё и затевалось), а без него нет смысла и в async.

Последнее, что требуется, это решить, какой клиент нам нужен: асинхронный или синхронный.

def fetch_resource_size(
    client_get: Callable[[str], AbstactionType[Response]],
    url: str,
) -> AbstactionType[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


client_get теперь является аргументом вызываемого типа, который получает на вход строку URL-адреса и возвращает некоторый тип AbstractionType над объектом Response. AbstractionType — либо Abstraction, либо AsyncAbstraction из предыдущих примеров.

Когда передаем Abstraction, код работает синхронно, когда AsyncAbstraction — тот же самый код автоматически начинает работать асинхронно.

IOResult и FutureResult


К счастью, в dry-python/returns уже есть правильные абстракции.

Позвольте представить вам типобезопасный, дружелюбный к mypy, не зависящий от фреймворка, полностью написанный на Python инструмент. В нём есть потрясающие, удобные, замечательные абстракции, которые можно использовать абсолютно в любом проекте.

Синхронный вариант


Сначала поставим зависимости, чтобы получить воспроизводимый пример.

pip install returns httpx anyio


Далее превратим псевдокод в рабочий код на Python. Начнем с синхронного варианта.

from typing import Callable
 
import httpx
 
from returns.io import IOResultE, impure_safe
 
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => >


Потребовалось изменить пару моментов, чтобы получился рабочий код:

  • Использовать IOResultE — функциональный способ обработки ошибок синхронного IO (исключения не всегда подходят). Типы, основанные на Result, позволяют имитировать исключения, но с раздельными значениями Failure(). Успешные выходы при этом оборачиваются в тип Success. Обычно никому нет дела до исключений, а нам есть.
  • Использовать httpx, который может работать с синхронными и асинхронными запросами.
  • Использовать функцию impure_safe, чтобы преобразовывать тип, который возвращает httpx.get, в абстракцию IOResultE.


Асинхронный вариант


Попробуем сделать всё то же самое в асинхронном коде.

from typing import Callable
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
 
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => >
# => >


Видите: результат точно такой же, но теперь код работает асинхронно. При этом его основная часть не изменилась. Однако нужно обратить внимание вот на что:

  • Синхронный IOResultE изменился на асинхронный FutureResultE, impure_safe — на future_safe. Работает так же, но возвращает другую абстракцию: FutureResultE.
  • Используется AsyncClient из httpx.
  • Результирующее значение FutureResult необходимо запустить, потому что красные функции не могут вызывать сами себя.
  • Утилита anyio используется, чтобы показать, что этот подход работает с любой асинхронной библиотекой: asyncio, trio, curio.


Два в одном


Покажу, как объединить синхронную и асинхронную версию в одном типобезопасном API.

Higher Kinded Types и type-class для работы с IO ещё не вышли в релиз (они появятся в 0.15.0), поэтому проиллюстрирую на обычном @overload:

from typing import Callable, Union, overload
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
from returns.io import IOResultE, impure_safe
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    """Sync case."""
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    """Async case."""
 
def fetch_resource_size(
    client_get: Union[
        Callable[[str], IOResultE[httpx.Response]],
        Callable[[str], FutureResultE[httpx.Response]],
    ],
    url: str,
) -> Union[IOResultE[int], FutureResultE[int]]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


С помощью декораторов @overload описываем, какие входные данные разрешены и какой при этом будет тип возвращаемого значения. Прочитать подробнее о декораторе @overload можно в другой моей статье.

Вызов функции с синхронным или асинхронным клиентом выглядит так:

# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => >
 
# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => >
# => >


Как видите, fetch_resource_size в синхронном варианте сразу возвращает IOResult и выполняет его. В то время как в асинхронном варианте требуется event-loop, как для обычной корутины. anyio используется для вывода результатов.

У mypy к этому коду никаких замечаний нет:

» mypy async_and_sync.py
Success: no issues found in 1 source file


Посмотрим, что будет, если что-нибудь испортить.

---lambda response: len(response.content),
+++lambda response: response.content,


mypy легко находит новые ошибки:

» mypy async_and_sync.py
async_and_sync.py:33: error: Argument 1 to "map" of "IOResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Argument 1 to "map" of "FutureResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Incompatible return value type (got "bytes", expected "int")


Ловкость рук и никакой магии: чтобы написать асинхронный код с правильными абстракциями, нужна только старая добрая композиция. А вот то, что у нас получается один и тот же API для разных типов, — по-настоящему здорово. Например, это позволяет абстрагироваться от того, как работают HTTP-запросы: синхронно или асинхронно.

Надеюсь, этот пример наглядно доказал, какими на самом деле классными могут быть асинхронные программы. А если попробуете dry-python/returns, то найдете еще много интересного. В новой версии мы уже сделали необходимые примитивы для работы с Higher Kinded Types и все необходимые интерфейсы. Код выше теперь можно переписать так:

from typing import Callable, TypeVar

import anyio
import httpx

from returns.future import future_safe
from returns.interfaces.specific.ioresult import IOResultLike2
from returns.io import impure_safe
from returns.primitives.hkt import Kind2, kinded

_IOKind = TypeVar('_IOKind', bound=IOResultLike2)

@kinded
def fetch_resource_size(
    client_get: Callable[[str], Kind2[_IOKind, httpx.Response, Exception]],
    url: str,
) -> Kind2[_IOKind, int, Exception]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => >

# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => >
# => >


Смотрите ветку `master`, там это уже работает.

Больше возможностей dry-python


Расскажу о нескольких других полезных фичах dry-python, которыми я больше всего горжусь.

from returns.curry import curry, partial
 
def example(a: int, b: str) -> float:
    ...
 
reveal_type(partial(example, 1))
# note: Revealed type is 'def (b: builtins.str) -> builtins.float'
 
reveal_type(curry(example))
# note: Revealed type is 'Overload(def (a: builtins.int) -> def (b: builtins.str) -> builtins.float, def (a: builtins.int, b: builtins.str) -> builtins.float)'


Это позволяет использовать @curry, например, вот так:

@curry
def example(a: int, b: str) -> float:
    return float(a + len(b))
 
assert example(1, 'abc') == 4.0
assert example(1)('abc') == 4.0


За счёт кастомного mypy-плагина можно строить функциональные пайплайны, возвращающие типы.

from returns.pipeline import flow
assert flow(
    [1, 2, 3],
    lambda collection: max(collection),
    lambda max_number: -max_number,
) == -3


Обычно в типизированном коде очень неудобно работать с лямбдами, из-за того что их аргументы всегда типа Any. Вывод mypy решает эту проблему.

С его помощью нам теперь известно, что lambda collection: max(collection) типа Callable[[List[int]], int], а lambda max_number: -max_number просто Callable[[int], int]. Во flow можно передать любое количество аргументов, и все они будут отлично работать. Всё благодаря плагину.


Абстракцию над FutureResult, о которой мы говорили ранее, можно использовать для того, чтобы явно передать зависимости в асинхронные программы в функциональном стиле.

Планы на будущее


Прежде чем наконец-то выпустить версию 1.0, нам предстоит решить несколько важных задач:

  • Реализовать Higher Kinded Types или их эмуляцию (issue).
  • Добавить надлежащие type-классы, чтобы реализовать необходимые абстракции (issue).
  • Возможно, попробовать компилятор mypyc, что потенциально позволит компилировать типизированные аннотированные Python-программы в двоичный файл. Тогда код с dry-python/returns будет работать в несколько раз быстрее (issue).
  • Исследовать новые способы написания функционального кода на Python, например, такие как «do-notation».


Выводы


С помощью композиции и абстракции можно решить любую проблему. В этой статье мы рассмотрели, как решить проблему цветов функций и писать простой, читаемый и гибкий код, который работает. И сделать проверку типов.

Пробуйте dry-python/returns и подключайтесь к Russian Python Week: на конференции core-разработчик dry-python Pablo Aguilar проведет воркшоп по использованию dry-python для написания бизнес-логики.

© Habrahabr.ru