[Из песочницы] Паттерны корутин asyncio: за пределами await

habr.png

Предисловие переводчика:
В очередной раз наступив на грабли при работе с python asyncio я отправился на просторы интернета, чтобы найти что-то более приятное, чем сухая документация. Мне попалась статья Yeray Diaz «Asyncio Coroutine Patterns: Beyond await», в которой автор весьма увлекательно рассматривает применение asyncio и делится некоторыми приемами. Поскольку я не нашел ничего такого же цельного на русском языке, то решился её перевести.

Asyncio — конкурентная мечта python программиста: пишешь код, граничащий с синхронным, и позволяешь Python сделать все остальное. Это очередной импорт библиотеки антигравитации: import antigravity

На самом деле все совсем не так, конкурентное программирование — тяжелое занятие и, пока корутины позволяют нам избегать ада обратных вызовов, что может увести вас достаточно далеко, вам все еще нужно думать о создании задач, получении результатов и элегантном перехвате исключений. Печально.

Хорошие новости в том, что все из этого возможно в asyncio. Плохие новости в том, что не всегда сразу очевидно что неправильно и как это исправить. Ниже несколько паттернов, которые я обнаружил во время работы с asyncio.

Прежде чем мы начнем:

Я использовал любимую библиотеку aiohttp для выполнения асинхронных HTTP запросов и Hacker News API, потому что это простой и хорошо известный сайт, который придерживается знакомого сценария использования. Следуя отклику на мою предыдущую статью, я также использовал async/await синтаксис введенный в Python 3.5. Я предполагал, что читатель знаком с идеями, которые здесь описаны. И в конечном счете, все примеры доступны в GitHub репозитории этой статьи.

Хорошо, давайте начнем!


Рекурсивные корутины

Создание и запуск задач тривиально в asyncio. Для таких задач API включает несколько методов в классе AbstractEventLoop, а также функции в библиотеке. Но обычно вы хотите комбинировать результаты от этих задач и обрабатывать их каким-то образом, и рекурсия — это отличный пример данной схемы, а также демонстрирует простоту корутин в сравнении с остальными средствами конкурентности.

Обычный случай для использования asyncio — это создание вебкраулера какого-то вида. Представьте, что мы просто слишком заняты чтобы проверять HackerNews, или может быть вам просто нравится хороший холивар, так что вы хотите реализовать систему, которая извлекает число комментариев для конкретного постна HN и, если оно выше порога, уведомляет вас. Вы немного погуглили и нашли документацию на HN API, просто то что нужно, однако вы заметили в документации следующее:


Хотите узнать общее число комментариев статьи? Обойдите дерево и сосчитайте их.

Вызов принят!

"""
    Рекурсивная функция решает проблему упрощая входные данные до тех пор,
    пока мы не достигнем основного тривиального случая, а затем объединяет результаты
    наверху стека.
    Предположим, что мы хотим посчитать число комментариев определенного
    поста Hacker News рекурсивно агрегируя число его потомков
"""

import asyncio
import argparse
import logging
from urllib.parse import urlparse, parse_qs
from datetime import datetime

import aiohttp
import async_timeout

LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
FETCH_TIMEOUT = 10

parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.')
parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863')
parser.add_argument('--url', type=str, help='URL of a post in HN')
parser.add_argument('--verbose', action='store_true', help='Detailed output')

logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

fetch_counter = 0

async def fetch(session, url):
    """
        Получаем URL страницу используя aiohttp, который возвращает распарсерный JSON ответ.
        Как полагается в документации aiohttp мы используем сессию повторно.
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        async with session.get(url) as response:
            return await response.json()

async def post_number_of_comments(loop, session, post_id):
    """
        Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев.
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
    post_id, url, (datetime.now() - now).total_seconds()))

    if 'kids' not in response:  # Базовый случай. Без комментариев
        return 0

    # считаем число комментариев этого поста, как количество комментариев (на первом уровне)
    number_of_comments = len(response['kids'])

    # создаем рекурсивные задачи для всех комментариев
    log.debug('{:^6} > Fetching {} child posts'.format(
    post_id, number_of_comments))
    tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']]

    # запустим задачи и получим результат
    results = await asyncio.gather(*tasks)

    # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

def id_from_HN_url(url):
    """
        Вернем значение `id` URL аргумента запроса если он передан или None.
    """
    parse_result = urlparse(url)
    try:
        return parse_qs(parse_result.query)['id'][0]
    except (KeyError, IndexError):
        return None

if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    post_id = id_from_HN_url(args.url) if args.url else args.id
    loop = asyncio.get_event_loop()

    with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = loop.run_until_complete(
            post_number_of_comments(loop, session, post_id))
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
        log.info("-- Post {} has {} comments".format(post_id, comments))
        loop.close()

Не стесняйтесь попробовать запустить скрипт с флагом » —verbose» для более детального вывода.

    [14:47:32] > Calculating comments took 2.23 seconds and 73 fetches
    [14:47:32] -- Post 8863 has 72 comments

Давайте пропустим шаблонный код и перейдем прямо к рекурсивной корутине. Заметим, что этот код читается практически полностью так, как было бы в случае с синхронным кодом.

async def post_number_of_comments(loop, session, post_id):
    """
        Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев.
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() - now).total_seconds()))

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # считаем число комментариев этого поста, как количество комментариев (на первом уровне)
    number_of_comments = len(response['kids'])

    # создаем рекурсивные задачи для всех комментариев
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # запустим задачи и получим результат
    results = await asyncio.gather(*tasks)

    # сведем комментарии наследников и добавим к количеству комментариев поста
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments


  1. Сначала получим JSON с данными поста.
  2. Рекурсивно обойдем каждого из наследников.
  3. В конце концов достигнем базового случая и вернем ноль,
    когда у поста нет откликов.
  4. При возврате от базового случая прибавим ответы на текущий пост
    к числу наследников и вернем

Это отличный пример того, что Бретт Слаткин описывает как fan-in и fan-out, мы fan-out чтобы получить данные от наследников и fan-in сводим полученные данные, чтобы расчитать число комментариев

В API asyncio есть пару способов для того, чтобы выполнять эти fan-out операции. Здесь я использую функцию gather, которая эффективно ожидает пока все корутины выполнятся и вернут список своих результатов.

Обратим внимание на то, как использование корутины также хорошо соответствует рекурсии с одной любой точкой, в которой присутствует любое число корутин, ожидающих ответы на свои запросы во время вызова функции gather и возобновляющих выполнение после того, как операция ввода/вывода завершится. Это позволяет нам выразить довольно сложное поведение в одной изящной и (легко) читаемой корутине.

«Очень просто» — скажете вы? Ладно, давайте поднимемся на ступеньку выше.


Выстрелил и забыл

Представьте, что вы хотите отправлять себе сообщение на электронную почту с постами, у которых количество комментариев больше определенного значения, и вы хотите сделать это таким же способом как мы обходили дерево постов. Мы можем просто добавить выражение «if» в конец рекурсивной функции чтобы добиться этого:

async def post_number_of_comments(loop, session, post_id):
    """
        Получаем данные для текущего поста и рекурсивно для всех его комментариев.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # Базовый случай. Без комментариев
        return 0

    # считаем комментарии этого поста как число комментариев.
    number_of_comments = len(response['kids'])

    # создаем рекурсивные задачи для всех комментариев
    tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']]

    # запускаем задачи и получаем результаты
    results = await asyncio.gather(*tasks)

    # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    # Логируем если число комментариев больше порогового значения
    if number_of_comments > MIN_COMMENTS:
        await log_post(response)

    return number_of_comments

async def log_post(post):
    """
        Симуляция логирования поста.
    """
    await asyncio.sleep(random() * 3)
    log.info("Post logged")

Да, я использовал asyncio.sleep. Это в последний раз. Обещаю.

[09:41:02] Post logged
[09:41:04] Post logged
[09:41:06] Post logged
[09:41:06] > Calculating comments took 6.35 seconds and 73 fetches
[09:41:06] -- Post 8863 has 72 comments

Это значительно медленнее чем раньше!
Причина в том, что, как мы обсуждали ранее, await приостанавливает выполнение корутины до тех пор, пока future не будет выполнена, но, поскольку нам не нужен результат логирования, нет реальной причины поступать таким образом.

Нам нужно «выстрелить и забыть» нашей корутиной, а поскольку мы не можем ждать ее завершения используя await, нам нужен другой путь для запуска выполнения корутины без ее ожидания. Быстро взглянув на API asyncio найдем функцию ensure_future, которая запланирует запуск корутины, обернет её в объект Task и вернёт. Помня, что раньше корутина была запланирована, цикл событий будет контролировать результат работы нашей корутины в какой-то момент в будущем, когда другая корутина будет в состоянии ожидания.

Здорово, давайте заменим await log_post следующим образом:

async def post_number_of_comments(loop, session, post_id):
    """
    Получаем данные для текущего поста и рекурсивно для всех его комментариев.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # считаем комментарии этого поста как число комментариев.
    number_of_comments = len(response['kids'])

    # создаем рекурсивные задачи для всех комментариев
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # планируем задачи и получаем результаты
    results = await asyncio.gather(*tasks)

    # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    # Логируем если число комментариев больше порогового значения
    if number_of_comments > MIN_COMMENTS:
        asyncio.ensure_future(log_post(response))

    return number_of_comments

[09:42:57] > Calculating comments took 1.69 seconds and 73 fetches
[09:42:57] -- Post 8863 has 72 comments
[09:42:57] Task was destroyed but it is pending!
task:  wait_for=()]>>
[09:42:57] Task was destroyed but it is pending!
task:  wait_for=()]>>
[09:42:57] Task was destroyed but it is pending!
task:  wait_for=()]>>

Кхм, устрашающее Task was destroyed but it is pending! преследующее пользователей asyncio по всему миру. Хорошие новости в том, что мы вернулись ко времени, которое получили ранее (1.69 с.), плохие новости в том, что asyncio не нравится выход за пределы «выстрелил-и-забыл».

Проблема в том, что мы принудительно закрываем цикл событий после того, как получаем результат работы корутины post_number_of_comments, не оставляя нашей задаче log_post времени для завершения.

У нас есть две возможности:
мы либо позволяем циклу событий работать бесконечно, используя run_forever, и вручную прерываем работу скрипта, или мы используем метод all_tasks класса Task для того, чтобы найти все работающие задачи и ждем когда закончится расчет количества комментариев.

Давайте попробуем выйти из этой ситуации быстро внеся изменения после нашего вызова post_number_of_comments:

if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    post_id = id_from_HN_url(args.url) if args.url else args.id
    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = loop.run_until_complete(
            post_number_of_comments(loop, session, post_id))
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
        log.info("-- Post {} has {} comments".format(post_id, comments))

    pending_tasks = [
        task for task in asyncio.Task.all_tasks() if not task.done()]
    loop.run_until_complete(asyncio.gather(*pending_tasks))
    loop.close()
[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches
[09:47:29] — Post 8863 has 72 comments
[09:47:30] Post logged
[09:47:31] Post logged
[09:47:32] Post logged

Сейчас мы уверены, что логирующие задачи завершены.
Предположение, что метод all_tasks отлично работает в случаях, с которыми мы имеем дело, — отличная идея, когда задачи выполняются соответствующим образом в нашем цикле событий, но в более сложных случаях может быть любое количество выполняющихся задач, источник которых может находится за пределами нашего кода.

Другой подход — навести порядок после того, как мы самостоятельно регистрируем абсолютно все корутины, которые мы планировали запустить и позволяем выполнится, отложенным ранее,
как только завершится подсчет комментариев. Как вы знаете функция ensure_future возвращает объект Task, Мы можем использовать это для регистрации наших задач с низким приоритетом. Давайте просто определим список task_registry и сохраним в нем futures:

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    # Log if number of comments is over a threshold
    if number_of_comments > MIN_COMMENTS:
        # Add the future to the registry
        task_registry.append(asyncio.ensure_future(log_post(response)))

    return number_of_comments

# (... ommitted code ...) #

if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    post_id = id_from_HN_url(args.url) if args.url else args.id
    loop = asyncio.get_event_loop()

    task_registry = []  # define our task registry

    with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = loop.run_until_complete(
            post_number_of_comments(loop, session, post_id))
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
        log.info("-- Post {} has {} comments".format(post_id, comments))

    pending_tasks = [task for task in task_registry if not task.done()]
    loop.run_until_complete(asyncio.gather(*pending_tasks))
    loop.close()
[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches
[09:53:46] — Post 8863 has 72 comments
[09:53:46] Post logged
[09:53:48] Post logged
[09:53:49] Post logged

Мы извлекаем следующий урок — asyncio не следует рассматривать как распределенную очередь задач типа Celery. Все задачи запускаются в одном потоке и циклом событий необходимо управлять соответственно позволяя выделить время для завершения задач.

Что приводит к другому общепринятому паттерну:


Периодически запускаемые корутины

Продолжая с нашим примером о HN (и мы ранее провели отличную работу), мы решили,
что решительно важно рассчитывать число комментариев к публикации HN как только они становятся доступны и пока они находятся в списке 5 последних записей.

Быстрый взгляд на API HN показывает конечную точку, которая возвращает 500 последних записей. Отлично, так мы можем просто опрашивать эту конечную точку для получения новых публикаций и расчета числа комментариев к ним, скажем раз в пять секунд.

Хорошо, поскольку теперь мы переходим к периодическому опросу, мы можем просто использовать бесконечный while цикл, ожидать выполнения задачи опроса (вызывать await), и засыпать(вызывать sleep) на необходимый промежуток времени. Я внес несколько незначительных изменений чтобы получить топ записей вместо обращения по непосредственному URL поста.

"""
An example of periodically scheduling coroutines using an infinite loop of
awaiting and sleeping.
"""

import asyncio
import argparse
import logging
from datetime import datetime

import aiohttp
import async_timeout

LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10

parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.')
parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll')
parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for')
parser.add_argument('--verbose', action='store_true', help='Detailed output')

logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

fetch_counter = 0

async def fetch(session, url):
    """
        Получаем URL страницу используя aiohttp, который возвращает распарсенный JSON ответ.
        Как полагается в документации aiohttp мы используем сессию повторно.
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        async with session.get(url) as response:
            return await response.json()

async def post_number_of_comments(loop, session, post_id):
    """
        Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # считаем число комментариев этого поста, как количество комментариев (на первом уровне)
    number_of_comments = len(response['kids'])

    # создаем рекурсивные задачи для всех комментариев
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # планируем задачи и получаем результаты
    results = await asyncio.gather(*tasks)

    # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

async def get_comments_of_top_stories(loop, session, limit, iteration):
    """
        Получаем последние публикации на HN.
    """
    response = await fetch(session, TOP_STORIES_URL)
    tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]]
    results = await asyncio.gather(*tasks)
    for post_id, num_comments in zip(response[:limit], results):
        log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration))

async def poll_top_stories_for_comments(loop, session, period, limit):
    """
        Периодически опрашиваем для получения новых статей и  числа комментариев.
    """
    global fetch_counter
    iteration = 1
    while True:
        now = datetime.now()
        log.info("Calculating comments for top {} stories. ({})".format(limit, iteration))
        await get_comments_of_top_stories(loop, session, limit, iteration)

        log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
        log.info("Waiting for {} seconds...".format(period))
        iteration += 1
        fetch_counter = 0
        await asyncio.sleep(period)

if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(loop=loop) as session:
        loop.run_until_complete(
            poll_top_stories_for_comments(
                loop, session, args.period, args.limit))

    loop.close()
[10:14:03] Calculating comments for top 5 stories. (1)
[10:14:06] Post 13848196 has 31 comments (1)
[10:14:06] Post 13849430 has 37 comments (1)
[10:14:06] Post 13849037 has 15 comments (1)
[10:14:06] Post 13845337 has 128 comments (1)
[10:14:06] Post 13847465 has 27 comments (1)
[10:14:06] > Calculating comments took 2.96 seconds and 244 fetches
[10:14:06] Waiting for 5 seconds…
[10:14:11] Calculating comments for top 5 stories. (2)
[10:14:14] Post 13848196 has 31 comments (2)
[10:14:14] Post 13849430 has 37 comments (2)
[10:14:14] Post 13849037 has 15 comments (2)
[10:14:14] Post 13845337 has 128 comments (2)
[10:14:14] Post 13847465 has 27 comments (2)
[10:14:14] > Calculating comments took 3.04 seconds and 244 fetches
[10:14:14] Waiting for 5 seconds…

Отлично, но есть незначительная проблема: если вы обратили внимание на временную отметку,
то задача не запускается строго раз в 5 секунд, она запускается через 5 секунд после того как _завершится выполнение get_comments_of_topstories. Снова последствия использования await и блокирование пока мы не получим наши результаты обратно.
Эти особенности не создают проблемы в случае, когда задаче требуется больше времени чем пять секунд. Также, кажется ошибочным использовать _run_untilcomplete когда корутина спроектирована как бесконечная.

Хорошие новости в том, что теперь мы эксперты по _ensurefuture, и можем просто впихнуть ее в код вместо использования await

async def poll_top_stories_for_comments(loop, session, period, limit):
    """
        Периодически опрашиваем для получения новых статей и  числа комментариев.
    """
    global fetch_counter
    iteration = 1
    while True:
        now = datetime.now()
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))
        asyncio.ensure_future(
            get_comments_of_top_stories(loop, session, limit, iteration))

        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
        log.info("Waiting for {} seconds...".format(period))
        iteration += 1
        fetch_counter = 0
        await asyncio.sleep(period)

    [10:55:40] Calculating comments for top 5 stories. (1)
    [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches
    [10:55:40] Waiting for 5 seconds…
    [10:55:43] Post 13848196 has 32 comments (1)
    [10:55:43] Post 13849430 has 48 comments (1)
    [10:55:43] Post 13849037 has 16 comments (1)
    [10:55:43] Post 13845337 has 129 comments (1)
    [10:55:43] Post 13847465 has 29 comments (1)
    [10:55:45] Calculating comments for top 5 stories. (2)
    [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches
    [10:55:45] Waiting for 5 seconds…
    [10:55:48] Post 13848196 has 32 comments (2)
    [10:55:48] Post 13849430 has 48 comments (2)
    [10:55:48] Post 13849037 has 16 comments (2)
    [10:55:48] Post 13845337 has 129 comments (2)
    [10:55:48] Post 13847465 has 29 comments (2)

Кхм… Ладно, хорошие новости в том, что временная отметка располагается точно через пять секунд, но что за 0.00 секунд и нет выборок И затем следующая итерация занимает ноль секунд и 260 выборок?

Это одно из последствий ухода от await, теперь мы больше не блокируем корутину и просто переходим на следующую строку, которая печатает ноль секунд и, в первый раз, ноль извлеченных сообщений. Это довольно мелкие задачи, поскольку мы можем жить без сообщений, но что если нам нужны результаты выполнения задач?

Тогда, мой друг, нам нужно прибегнуть к… callback-ам (поеживаемся ((()

Я знаю, знаю, весь смысл корутин в том, чтобы избежать callback — ов, но это потому, что драматический подзаголовок статьи — «За пределами await». Мы больше не на территории await, у нас приключения с ручным запуском задач что приводит к нашему сценарию использования. Что это вам дает? spoiler

Как мы обсуждали ранее _ensurefuture возвращает объект Future, к которому мы мы можем добавить callback используя _add_donecallback.

Прежде чем мы это сделаем, и чтобы иметь корректный подсчет выборок (fetches) мы приходим к тому, что мы должны инкапсулировать нашу корутину извлечения в класс URLFetcher. В таком случае создаем экземпляр для каждой задачи, чтобы у нас был корректный подсчет выборок. Также удаляем глобальную переменную, которая все равно вносила баг:

"""
    Пример периодического запуска корутин с использованием бесконечного
    цикла событий для запуска задач при помощи ensure_future и вызова sleep.
    Добавляя обратный вызов к объекту future, который возвращает функция ensure_future,
    мы можем корректно выводить статистику прошедшего времени и выборок,
    используя новый класс URLFetcher
    Но читаемость ухудшается.
"""

import asyncio
import argparse
import logging
from datetime import datetime

import aiohttp
import async_timeout

LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10

parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.')
parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll')
parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for')
parser.add_argument('--verbose', action='store_true', help='Detailed output')

logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

class URLFetcher():
    """
        Обеспечивает подсчет URL выборок для определенной задачи
    """

    def __init__(self):
        self.fetch_counter = 0

    async def fetch(self, session, url):
        """
        Получаем URL страницу используя aiohttp, который возвращает распарсерный JSON ответ.
        Как полагается в документации aiohttp мы используем сессию повторно.
        """
        with async_timeout.timeout(FETCH_TIMEOUT):
            self.fetch_counter += 1
            async with session.get(url) as response:
                return await response.json()

async def post_number_of_comments(loop, session, fetcher, post_id):
    """
        Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetcher.fetch(session, url)

    # Базовый случай. Нет комментариев.
    if response is None or 'kids' not in response:
        return 0

    # считаем число комментариев этого поста, как количество комментариев (на первом уровне)
    number_of_comments = len(response['kids'])

    # создаем рекурсивные задачи для всех комментариев
    tasks = [post_number_of_comments(
        loop, session, fetcher, kid_id) for kid_id in response['kids']]

    # sпланируем задачи и получаем результаты
    results = await asyncio.gather(*tasks)

    # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

async def get_comments_of_top_stories(loop, session, limit, iteration):
    """
        Получаем топ публикаций HN.
    """
    fetcher = URLFetcher()  # create a new fetcher for this task
    response = await fetcher.fetch(session, TOP_STORIES_URL)
    tasks = [post_number_of_comments(
        loop, session, fetcher, post_id) for post_id in response[:limit]]
    results = await asyncio.gather(*tasks)
    for post_id, num_comments in zip(response[:limit], results):
        log.info("Post {} has {} comments ({})".format(
            post_id, num_comments, iteration))
    return fetcher.fetch_counter  # return the fetch count

async def poll_top_stories_for_comments(loop, session, period, limit):
    """
        Периодически опрашиваем для получения новых статей и  числа комментариев.
    """
    iteration = 1
    while True:
        log.info("Calculating comments for top {} stories. ({})".format(limit, iteration))

        future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration))

        now = datetime.now()

        def callback(fut):
            fetch_count = fut.result()
            log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format(
                    (datetime.now() - now).total_seconds(), fetch_count))

        future.add_done_callback(callback)

        log.info("Waiting for {} seconds...".format(period))
        iteration += 1
        await asyncio.sleep(period)

if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(loop=loop) as session:
        loop.run_until_complete(
            poll_top_stories_for_comments(
                loop, session, args.period, args.limit))

    loop.close()
    [12:23:40] Calculating comments for top 5 stories. (1)
    [12:23:40] Waiting for 5 seconds...
    [12:23:43] Post 13848196 has 38 comments (1)
    [12:23:43] Post 13849430 has 72 comments (1)
    [12:23:43] Post 13849037 has 19 comments (1)
    [12:23:43] Post 13848283 has 64 comments (1)
    [12:23:43] Post 13847465 has 34 comments (1)
    [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches
    [12:23:45] Calculating comments for top 5 stories. (2)
    [12:23:45] Waiting for 5 seconds...
    [12:23:47] Post 13848196 has 38 comments (2)
    [12:23:47] Post 13849430 has 72 comments (2)
    [12:23:47] Post 13849037 has 19 comments (2)
    [12:23:47] Post 13848283 has 64 comments (2)
    [12:23:47] Post 13847465 has 34 comments (2)
    [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches
    [12:23:50] Calculating comments for top 5 stories. (3)
    [12:23:50] Waiting for 5 seconds...

Хорошо, уже лучше, но давайте сфокусируемся на секции callback:

async def poll_top_stories_for_comments(loop, session, period, limit):
    """
        Периодически опрашиваем для получения новых статей и  числа комментариев.
    """
    iteration = 1
    while True:
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))

        future = asyncio.ensure_future(
            get_comments_of_top_stories(loop, session, limit, iteration))

        now = datetime.now()

        def callback(fut):
            fetch_count = fut.result()
            log.info(
                '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                    (datetime.now() - now).total_seconds(), fetch_count))

        future.add_done_callback(callback)

        log.info("Waiting for {} seconds...".format(period))
        iteration += 1
        await asyncio.sleep(period)

Обратим внимание на то, что callback функции необходимо принимать один аргумент, в котором передается сам объект future. Мы также возвращаем количество выборок (fetch) из экземпляра URLFetcher как результат _get_comments_of_topstories и получаем эти данные как результат future.

Видите? Я говорил вам, что это будет неплохо, зато здесь точно нет await.

Пока мы обсуждаем callback-и, при неизбежных скитаниях по документации API asyncio вам посчастливилось найти пару методов в AbstractBaseLoop с именами _calllater и _callat,
которые выглядят как что-то удобное для реализации периодической корутины. И вы будете правы, их можно использовать, нам просто нужно внести изменения в poll_top_stories_for_comments:


def poll_top_stories_for_comments(loop, session, period, limit, iteration=0):
    """
    Периодическая функция для запуска get_comments_of_top_stories.
    """
    log.info("Calculating comments for top {} stories ({})".format(
        limit, iteration))

    future = asyncio.ensure_future(
        get_comments_of_top_stories(loop, session, limit, iteration))

    now = datetime.now()

    def callback(fut):
        fetch_count = fut.result()
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_count))

    future.add_done_callback(callback)

    log.info("Waiting for {} seconds...".format(period))

    iteration += 1
    loop.call_later(
        period,
        partial(  # or call_at(loop.time() + period)
            poll_top_stories_for_comments,
            loop, session, period, limit, iteration
        )
    )

if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    loop = asyncio.get_event_loop()
    with aiohttp.ClientSession(loop=loop) as session:
        poll_top_stories_for_comments(
            loop, session, args.period, args.limit)

        loop.run_forever()

    loop.close()

Результаты аналогичны полученным ранее. Обратим внимание на несколько изменений:


  • Мы ушли от бесконечного цикла и засыпания к функции планирующей запустить себя после выполнения, во время которого вызывается _ensurefuture на нашей корутине запланированной выполнится.
    (Примечание переводчика: применили рекурсивный подход)
  • Поскольку _poll_top_stories_forcomments используется на каждой итерации цикла для того, чтобы запланировать запуск самой себя, мы естественно должны использовать функцию _runforever цикла событий, чтобы он был всегда запущен.

Ладно, это все хорошо и даже превосходно, но, что если Бог недоступен — наше соединение разорвалось посередине выполнения задачи? Что произойдет с нашей замечательной системой? Давайте проведем такую симуляцию добавив выброс исключения после нескольких запросов URL:


MAXIMUM_FETCHES = 5

class URLFetcher():
    """
        Обеспечивает подсчет URL выборок для определенной задачи
    """

    def __init__(self):
        self.fetch_counter = 0

    async def fetch(self, session, url):
        """
        Получаем URL страницу используя aiohttp, который возвращает распарсерный JSON ответ.
        Как полагается в документации aiohttp мы используем сессию повторно.
        """
        with async_timeout.timeout(FETCH_TIMEOUT):
            self.fetch_counter += 1
            if self.fetch_counter > MAXIMUM_FETCHES:
                raise Exception('BOOM!')

            async with session.get(url) as response:
                return await response.json()

    [12:51:00] Calculating comments for top 5 stories. (1)
    [12:51:00] Waiting for 5 seconds…
    [12:51:01] Exception in callback poll_top_stories_for_comments..callback() at 05_periodic_coroutines.py:121
    handle: .callback() at 05_periodic_coroutines.py:121>
    Traceback (most recent call last):
     File "/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run
     self._callback(*self._args)
     File "05_periodic_coroutines.py”, line 122, in callback
     fetch_count = fut.result()
     File "05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories
     results = await asyncio.gather(*tasks)
     File "05_periodic_coroutines.py”, line 69, in post_number_of_comments
     response = await fetcher.fetch(session, url)
     File "05_periodic_coroutines.py”, line 58, in fetch
     raise Exception(‘BOOM!’)
    Exception: BOOM!
    [12:51:05] Calculating comments for top 5 stories. (2)
    [12:51:05] Waiting for 5 seconds…

Не так здорово, правда?

Что делать? Переходить к следующей части этой серии, где я исследую возможности, которые у нас есть для обработки ошибок и остальных задач: Паттерны asyncio корутин: Ошибки и Отмена

© Habrahabr.ru