Как интегрировать синхронный код в асинхронный. Инструкция
Когда создаешь новое приложение, особенно если оно должно быстро обрабатывать данные, использование библиотеки asyncio — это хороший выбор. Она позволяет работать с неблокирующими библиотеками, asyncpg и aiohttp. Однако чаще всего программисты работают с уже существующим кодом, который использует блокирующие библиотеки. Поэтому большую часть времени может занять адаптация и модернизация старого кода, так как асинхронный код не дружит с синхронным (им мешает GIL). GIL (Global Interpreter Lock) — это механизм, который предотвращает одновременное выполнение нескольких потоков в Python. Это означает, что даже если у вас есть многопоточное приложение, только один поток может выполнять Python-код в любой момент времени. Поэтому можно запускать дополнительный поток для выполнения операции ввода-вывода.
Погружение в суть
В Python для работы с потоками (параллельным выполнением задач) используется модуль threading
. Он позволяет запускать несколько задач одновременно, что может быть полезно, когда нужно выполнять операции ввода-вывода, такие, как чтение или запись файлов. Python работает с одним потоком в каждый момент времени из-за глобальной блокировки (GIL). Это значит, что только один участок кода может выполняться одновременно. Однако во время операций ввода-вывода GIL может быть освобождён, и это позволяет использовать многопоточность более эффективно. Давайте рассмотрим простой пример, где мы создадим несколько потоков и посмотрим, как работает Thread.
from threading import Thread
import time
def synchronous_code():
time.sleep(2) # Имитация задержки синхронного кода
def main():
start = time.time()
# Создание потоков
threads = [Thread(target=synchronous_code) for _ in range(10)]
# Запуск потоков
for thread in threads:
thread.start()
# Ожидание завершения всех потоков
for thread in threads:
thread.join()
end = time.time()
print(f'Выполнение запросов завершено за {end - start:.4f} с')
if __name__ == "__main__":
main()
Код завершается за 2 секунды, так как мы запускаем два параллельных потока. Конечно, не очень удобно использовать for для создания, запуска и завершения потоков, поэтому можно использовать пул потоков.
from concurrent.futures import ThreadPoolExecutor, wait
import time
def synchronous_code():
time.sleep(2) # Имитация задержки в 2 секунды, например, для выполнения какого-то запроса
def main():
start = time.time()
with ThreadPoolExecutor() as executor: # Создаем пул потоков
# Отправляем 10 задач на выполнение в пул потоков, каждая задача вызывает функцию
synchronous_code
futures = [executor.submit(synchronous_code) for _ in range(10)]
# Ожидаем завершения всех задач
wait(futures)
end = time.time()
print(f'Выполнение запросов завершено за {end - start:.4f} с')
if __name__ == "__main__":
main()
Этот код тоже завершился за 2 секунды, но если мы создадим не 10 задач, а 17 задач, то код будет выполнен за 4 секунды, но если выполнить 16 задач, то код по-прежнему будет выполнен за 2 секунды. А почему так? В python 3.8 значение max_workers
по умолчанию равно min (32, os.cpu_count () + 4). Это значение сохраняет не менее 5 рабочих потоков для задач, связанных с вводом/выводом. Оно использует не более 32 ядер процессора для задач, связанных с процессором, которые освобождают GIL. Это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах. Так, например, у меня 12 логических ядер + 4 по формуле. В итоге в пуле 16 потоков.
А что будет, если…?
Что будет, если потоки будут ломиться к глобальной переменной? А ничего хорошего не будет. Мы погрузимся в состояние гонки. Это ситуация, когда два или более потоков (или процессов) одновременно пытаются получить доступ к общему ресурсу, и результат зависит от того, в каком порядке они выполняются. Это может привести к непредсказуемым результатам и ошибкам.
from concurrent.futures import ThreadPoolExecutor, wait
import time
help_me = 0
def synchronous_code():
global help_me
for _ in range(100):
current_value = help_me
time.sleep(0.0001)
help_me = current_value + 1
def main():
global help_me
with ThreadPoolExecutor() as executor:
futures = [executor.submit(synchronous_code) for _ in range(10)]
wait(futures)
print(f"Фактическое значение: {help_me}")
if __name__ == "__main__":
main()
Результат всегда будет не предсказуемый. В этом коде он будет ± 100. Небольшая задержка в коде (time.sleep(0.0001))
добавлена с целью увеличить вероятность гонок данных между потоками. Без задержки потоки могут выполняться слишком быстро, и результат может быть более предсказуемым (например, все потоки могут завершиться до того, как другие потоки успеют начать работу). Задержка позволяет потокам «пересекаться» и взаимодействовать друг с другом, что создает условия для гонок данных. Состояние гонки можно избежать блокировкой.
Избегаем гонки
Многопоточный код подвержен состоянию гонки при использовании разделяемых данных, потому что управлять порядком выполнения потоков мы не можем. Всякий раз, когда два потока или процесса потенциально могут изменить разделяемый элемент данных, следует использовать блокировку для синхронизации доступа (да, да, такой парадокс, чтобы синхронный код интегрировать в асинхронный, нужно синхронизировать многопоточный код).
Для этого нам понадобится реализация класса Lock
в модуле threading
. Нужно лишь импортировать Lock
из threading
и окружить критические секции вызовами with lock
.
from concurrent.futures import ThreadPoolExecutor, wait
import time
import threading
help_me = 0
lock = threading.Lock() # Создаем блокировку
def synchronous_code():
global help_me
for _ in range(100):
with lock: # Захватываем блокировку перед изменением переменной
current_value = help_me
time.sleep(0.0001)
help_me = current_value + 1
def main():
global help_me
with ThreadPoolExecutor() as executor:
futures = [executor.submit(synchronous_code) for _ in range(10)]
wait(futures)
print(f"Фактическое значение: {help_me}")
if __name__ == "__main__": # Исправлено на правильное имя
main()
Создаем неблокирующий REST API
Парадигма REST широко используется в современной разработке веб-приложений. REST API пригоден для взаимодействия с любыми клиентами: от мобильного телефона до браузера, нужно лишь изменить представление данных на стороне клиента. При проектировании REST API мы будем возвращать данные в формате JSON, поскольку так чаще всего и делают, но в принципе можно выбрать любой формат, отвечающий конкретным потребностям. Напишем на Aiohttp свой простой api:
from aiohttp import web
from aiohttp.web_response import Response
from aiohttp.web_request import Request
import random
routes = web.RouteTableDef()
@routes.get('/random')
async def handle(request: Response) -> Request:
result = random.randint(1, 1000)
return web.json_response(data={'random_values': result})
if __name__ == '__main__':
app = web.Application()
app.add_routes(routes)
web.run_app(app, host='127.0.0.1', port=8000)
Тут более-менее все очевидно, создаём роутер, выдаём случайное число и отправляем json
формат. Допустим, у нас есть синхронная функция, которая блокирует наш поток.(Возможно это будет взаимодействие с БД).
def blocking_task():
time.sleep(1)
value = random.randint(1, 1000)
return value
Ситуация не из приятных. Функция выполняется 1 секунду. Это очень долго и если 100 пользователей решат одновременно достучаться до нашего API, то самый последний запрос будет выполняться долго. Тут нам помогает ThreadPoolExecutor
. Это удобный инструмент из модуля concurrent.futures
, который позволяет легко управлять пулом потоков для выполнения задач параллельно. Он особенно полезен, когда необходимо выполнять множество операций ввода-вывода или сетевых запросов, которые могут блокировать выполнение программы.
def blocking_task():
# Эта функция выполняет блокирующую задачу, которая занимает 1 секунду
time.sleep(1)
value = random.randint(1, 1000)
return value
async def run_in_executor(executor):
# Получаем текущий цикл событий
loop = asyncio.get_event_loop()
# Запускаем блокирующую задачу в пуле потоков и ждем ее завершения
result = await loop.run_in_executor(executor, blocking_task)
return result
@routes.get('/random')
async def handle(request: Response) -> Request:
# Запускаем блокирующую задачу и ждем результат
result = await run_in_executor(executor)
return web.json_response(data={'random_values': result})
В нашу программу мы добавили одну функцию run_in_executor
. Что под капотом этой функции?
loop = asyncio.get_event_loop()
Здесь мы получаем текущий цикл событий. Цикл событий управляет выполнением асинхронных задач и обработкой событий.
result = await loop.run_in_executor(executor, blocking_task)
loop.run_in_executor(executor, blocking_task)
принимает два аргумента:
— executor
— это объект ThreadPoolExecutor, который управляет пулом потоков.
— blocking_task
— это функция, которую мы хотим выполнить в отдельном потоке.
Функция run_in_executor
запускает blocking_task
в одном из потоков пула и возвращает результат выполнения этой функции.
Давайте посмотрим еще один пример, чтобы продемонстрировать, как можно обрабатывать несколько параллельных запросов, не блокируя основной поток. В этом примере мы создадим API, который будет выполнять несколько блокирующих задач одновременно, используя asyncio.gather()
. Функция asyncio.gather
позволяет группировать объекты, допускающие ожидание (то есть async функции). Эти объекты после группировки можно запустить в конкурентном режиме.
Предположим, у нам нужно вызвать синхронную функцию несколько раз и мы хотим вернуть результаты всех этих операций в одном ответе.
@routes.get('/random')
async def handle_batch(request: Request) -> Response:
with ThreadPoolExecutor() as executor:
# Создаем список задач
tasks = [run_in_executor(executor) for _ in range(3)]
# Запускаем все задачи параллельно и ждем их завершения
results = await asyncio.gather(*tasks)
return web.json_response(data={'random_values_1': results[0],
'random_values_2': results[1],
'random_values_3': results[2],
})
Также можно вызывать несколько разных синхронных функций.
def blocking_task():
time.sleep(.5)
value = random.randint(1, 1000)
return value
def blocking_task_1():
time.sleep(1)
value = random.randint(1000, 2000)
return value
async def run_in_executor_1(executor):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_task)
return result
async def run_in_executor_2(executor):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_task_1)
return result
async def run_in_executor_main(executor):
values = await asyncio.gather(
run_in_executor_1(executor),
run_in_executor_2(executor)
)
return {values[0] : values[1]}
@routes.get('/random')
async def handle_batch(request: Request) -> Response:
with ThreadPoolExecutor() as executor:
tasks = [run_in_executor_main(executor) for _ in range(3)]
results = await asyncio.gather(*tasks)
return web.json_response(data={'random_values_1': results[0],
'random_values_2': results[1],
'random_values_3': results[2],
})
Наш код выглядит менее привлекательным, но он результативнее! Используя asyncio.gather()
, мы можем эффективно обрабатывать несколько параллельных запросов к нашему API без блокировки основного потока. Это позволяет значительно повысить производительность приложения, особенно при наличии множества блокирующих операций. Теперь наш REST API способен обрабатывать множество запросов одновременно, что делает его более отзывчивым и эффективным.
Итоги
Использование ThreadPoolExecutor
для запуска синхронных функций в асинхронном коде открывает новые горизонты для разработки высокопроизводительных приложений. Мы рассмотрели ключевые аспекты, начиная с основ работы ThreadPoolExecutor
. Это позволяет эффективно управлять потоками и ресурсами. Также мы обсудили, как можно предотвратить состояния гонки, что критически важно для обеспечения целостности данных и стабильности приложения. Создание неблокирующего REST API демонстрирует, как можно интегрировать синхронные операции в асинхронную архитектуру, обеспечивая при этом отзывчивость и масштабируемость. Применяя эти принципы на практике, вы сможете улучшить производительность ваших приложений и создать более надежные решения для пользователей.
Автор статьи: Алексей Любимов
НЛО прилетело и оставило здесь промокод для читателей нашего блога:
-15% на заказ любого VDS (кроме тарифа Прогрев) — HABRFIRSTVDS.