Массовая асинхронная обработка запросов с последующей синхронной CPU-bound логикой

c49a73ea12d20636ffd18d38c526b149

В данной статье мы рассмотрим задачу массовой асинхронной обработки запросов с последующей синхронной и ресурсоёмкой (CPU-bound) логикой. Главная сложность в том, что асинхронный код отлично справляется с большим количеством запросов к внешним сервисам, но CPU-bound вычисления в той же среде могут существенно снизить пропускную способность. Решение — вынести тяжёлую обработку в отдельный пул процессов.

Суть задачи

  1. Имеется список кортежей (url, json_data), которых может быть до 1 000 000:

request_samples = [
    ("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}),
    ("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}),
    # ... до 1 000 000 подобных кортежей
]
  1. Для каждого такого кортежа необходимо:

  • Отправить запрос на указанный url с параметрами json=json_data и получить base_resp.

  • Из base_resp извлечь данные для трёх сервисных запросов (к url_1, url_2, url_3).

  • Получить ответы resp1, resp2, resp3 от этих сервисов.

  • Передать resp1, resp2, resp3 в функцию business_logic (resp1, resp2, resp3), которая выполняет синхронную и CPU-bound обработку.

  • Результат business_logic добавляется к итоговому списку.

  1. Цель — выполнить всю обработку максимально эффективно, не блокируя асинхронный event loop при выполнении тяжелой CPU-bound логики.

Почему ProcessPoolExecutor?

  • CPU-bound обработка: Если операция действительно нагружает CPU, то использование ThreadPoolExecutor может быть неэффективным из-за GIL (Global Interpreter Lock) в CPython.

  • ProcessPoolExecutor запускает обработку в отдельных процессах, что позволяет выполнять CPU-bound задачи параллельно на разных ядрах, минуя ограничения GIL.

Архитектура решения

  • Используем asyncio и aiohttp для асинхронной загрузки данных.

  • После получения трех ответов (resp1, resp2, resp3) для каждого запроса используем ProcessPoolExecutor и run_in_executor, чтобы выполнить business_logic в отдельном процессе.

  • Собираем результаты в единый список.

import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor

def business_logic(resp1, resp2, resp3):
    # Синхронная, CPU-bound обработка данных.
    # Здесь может быть сложная логика:
    # обработка списков, вычисления, агрегации и т.д.
    aggregated = {
        "item_count": len(resp1["items"]) + len(resp2["items"]) + len(resp3["items"]),
        "details": resp1["details"] + resp2["details"] + resp3["details"]
    }
    return aggregated

async def fetch(session, url, json_data):
    async with session.post(url, json=json_data) as response:
        return await response.json()

async def process_single_request(session, url, json_payload, url_1, url_2, url_3, executor):
    # 1. Запрос к основному API
    base_resp = await fetch(session, url, json_payload)

    # Предположим, извлекаем из base_resp список товаров
    items_for_services = base_resp.get("data", {}).get("items", [])
    service_payload = {"items": items_for_services}

    # 2. Запросы к трем сервисным API
    resp1 = await fetch(session, url_1, service_payload)
    resp2 = await fetch(session, url_2, service_payload)
    resp3 = await fetch(session, url_3, service_payload)

    # 3. Вызов синхронной CPU-bound логики в отдельном процессе
    loop = asyncio.get_event_loop()
    aggregated_result = await loop.run_in_executor(
        executor, business_logic, resp1, resp2, resp3
    )

    return aggregated_result

async def main(request_samples, url_1, url_2, url_3, max_parallel=1000):
    results = []
    conn = aiohttp.TCPConnector(limit=max_parallel)

    # Инициализируем пул процессов
    # Число процессов можно подобрать на основе числа CPU ядер
    executor = ProcessPoolExecutor(max_workers=4)

    async with aiohttp.ClientSession(connector=conn) as session:
        tasks = [
            asyncio.create_task(
                process_single_request(session, url, payload, url_1, url_2, url_3, executor)
            )
            for url, payload in request_samples
        ]

        for fut in asyncio.as_completed(tasks):
            result = await fut
            results.append(result)

    return results

if __name__ == "__main__":
    # Пример исходных данных
    request_samples = [
        ("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}),
        ("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}),
        # ... и далее до 1 000 000
    ]

    url_1 = "https://service-a.example.com/details"
    url_2 = "https://service-b.example.com/prices"
    url_3 = "https://service-c.example.com/inventory"

    final_results = asyncio.run(main(request_samples, url_1, url_2, url_3))
    # final_results теперь будет содержать список агрегированных результатов, по одному на каждую запись из request_samples

Итоги и преимущества

  • Асинхронный ввод-вывод обеспечивает высокую пропускную способность при работе с внешними API.

  • ProcessPoolExecutor решает проблему CPU-bound логики, позволяя эффективно параллелить тяжелые вычисления.

  • Подобный конвейер можно масштабировать, изменяя количество процессов, размер порций запросов и добавляя дополнительные оптимизации (кеширование, rate limiting и т.д.). Таким образом, предложенный подход позволяет эффективно совмещать массовую асинхронную загрузку данных с синхронными, ресурсоёмкими вычислительными задачами.

© Habrahabr.ru