Практический гайд по процессам и потокам (и не только) в Python
За то время что я занимаюсь менторством заметил, что большинство вопросов новичков связаны с темами: конкурентность, параллелизм, асинхронность. Подобные вопросы часто задают на собеседованиях, а в работе эти знания позволяют писать более эффективные и производительные системы. Поэтому я решил систематизировать свои знания и понимание темы в виде статьи.
Цель — c помощью примеров рассказать:
О потоках, процессах и корутинах.
О Global Interpreter Lock и накладываемых им ограничениях.
CPU и I/O bound нагрузке.
В чем польза от ThreadPool и ProcessPool.
Задача №1: Работа с JSON или CPU-bound task
Рассмотрим функцию:
# CPU bound task: generate list and dump it into JSON
def work(size):
json.dumps(list(range(size)))
Она делает 2 вещи: генерирует список и превращает его в JSON. Подобный код — отличный пример CPU bound задачи — задачи, скорость выполнения которой зависит от мощности процессора.
Представим что нам нужно выполнить эту тяжеловесную задачу N раз. Последовательный алгоритм будет выглядеть так:
# Do work sequentially, one by one
def sequential(size, count):
for _ in range(count):
work(size)
Очевидно, что последовательное выполнение кода неэффективно. Какие варианты ускорить программу нам доступны?
Вариант №1 — создать по одному потоку на каждый вызов функции и попробовать добиться параллелизма:
# Do work using OS Threads
def run_threads(size, executionUnitsCount):
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(work, [size] * executionUnitsCount)
Вариант №2 — вместо потоков создавать процесс:
# Do work using OS Processes
def run_processes(size, executionUnitsCount):
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(work, [size] * executionUnitsCount)
В обоих случаях используются executor классы из пакета сoncurrent.futures
. Если использовать пакеты multithreading
и multiprocessing
то код бы выглядел бы вот так:
Hidden text
# Do work using OS Threads
def run_threads(size, executionUnitsCount):
threads = [threading.Thread(target=work, args=(size,)) for _ in range(executionUnitsCount)]
for t in threads:
t.start()
for t in threads:
t.join()
# Do work using OS Processes
def run_processes(size, executionUnitsCount):
processes = [multiprocessing.Process(target=work, args=(size,)) for _ in range(executionUnitsCount)]
for p in processes:
p.start()
for p in processes:
p.join()
Код тестирующий производительность всех реализаций:
if __name__ == '__main__':
# Disable GC for better benchmarking and avoid pauses
gc.disable()
jsonSize = 1000000
testCases = [ (jsonSize, i) for i in range(1, 11)]
variants = [sequential, run_threads, run_processes]
for i, t in enumerate(testCases):
size, executionUnitsCount = t
print(f"Parallelism: {executionUnitsCount}, JSON Size: {size}")
for j, variant in enumerate(variants):
start = time.perf_counter()
variant(size, executionUnitsCount)
end = time.perf_counter()
print(f"{variant.__name__}, elapsed: {round(end - start, 2)}")
print()
Результаты:
Результаты выполнения теста. Ось X — количество задач. Ось Y — время выполнения в секундах
Какие выводы можно сделать?
Использование потоков для параллельного исполнения CPU bound задач не дает преимущества, результаты сопоставимы с кодом не использующим потоки и работающим последовательно. С чем это связано? Все дело в GIL — механизме синхронизации потоков, именно он не позволяет программе «набрать мощность» и выполнять потоки параллельно.
На процессы GIL не распространяется и создав дочерние процессы мы начинаем утилизировать ресурсы процессора по максимуму и как следствие видим уменьшение времени исполнения на графике.
Максимальный параллелизм которого можно добиться c помощью процессов равен количеству физических ядер CPU. На графике можно заметить что время исполнения теста Processes растет примерно лесенкой с шагом 4 — это количество физических ядер CPU на моем ноутбуке.
Задача №2: Работа с внешним API или IO-bound task
Помимо CPU bound существуют задачи IO bound. В такой задаче производительность зависит не от процессора, а от подсистемы ввода-вывода, а также устройств с которыми мы взаимодействуем, например файловая система или сеть.
В примере ниже я буду эмулировать IO нагрузку через sleep (представьте что вместо sleep — запрос по HTTP к API вашего любимого сервиса).
def work(latency):
time.sleep(latency)
Задача: сделать N вызовов функции (например, мы сервис агрегатор услуг и ходим к партнерам за данными). Последовательный вариант выполнения задачи будет выглядеть так:
def sequential(latency, count):
for _ in range(count):
work(latency)
Конкурентная реализация на потоках:
# Do work using OS Threads
def run_threads(latency, executionUnitsCount):
with concurrent.futures.ThreadPoolExecutor(max_workers=executionUnitsCount) as executor:
executor.map(work, [latency] * executionUnitsCount)
Конкурентная реализация на процессах:
# Do work using OS Processes
def run_processes(latency, executionUnitsCount):
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(work, [latency] * executionUnitsCount)
Конкурентная реализация на основе asyncio:
def run_async_io(latency, executionUnitsCount):
asyncio.run(async_io_tasks(latency, executionUnitsCount))
async def async_work(latency):
await asyncio.sleep(latency)
async def async_io_tasks(latency, executionUnitsCount):
tasks = [asyncio.create_task(async_work(latency)) for _ in range(executionUnitsCount)]
await asyncio.gather(*tasks)
Бенчмарк (Чтобы показать разницу между подходами тестируем от 1000 до 2000 конкурентных задач) :
if __name__ == '__main__':
testCases = [ (0.1, i) for i in range(1000, 20001, 200)]
variants = [run_threads, run_async_io, run_processes]
for i, t in enumerate(testCases):
latency, executionUnitsCount = t
print(f"Parallelism\: {executionUnitsCount}")
for j, variant in enumerate(variants):
start = time.perf_counter()
r = variant(latency, executionUnitsCount)
end = time.perf_counter()
print(f"{case.__name__}, elapsed: {round(end - start, 2)}")
print()
print("\n".join(map(str,table)))
Результаты:
Результаты выполнения теста. Ось X — количество конкурентных задач (процессов и потоков). Ось Y — время выполнения в секундах
Увеличим масштаб и сравним потоки и корутины:
Результаты выполнения теста. Ось X — количество конкурентных задач (процессов и потоков). Ось Y — время выполнения в секундах
Какие выводы можно сделать?
Процессы в чистом виде непригодны для подобного класса задач, слишком большой оверхед на создание процесса и его убийство, взаимодействовать с ОС так часто — дорогое удовольствие.
Потоки справились намного лучше процессов, и GIL в данной задаче не стал помехой, так как он запрещает «исполняться» нескольким процессом одновременно, а в случае IO нагрузки процессы дольше находятся в ожидании чем в работе. Но оверхед по сравнению с корутинами присутствует так как мы взаимодействуем с ОС и аллоцируем память.
Корутины — абсолютный лидер, за счет того что ими управляет рантайм языка, а не ОС, к тому же с точки зрения памяти корутине нужно ее меньше чем потоку.
Есть ли ограничения по количеству процессов/потоков/корутин?
В примерах выше мы запускали разное количество задач и у любопытного читателя может возникнуть вопрос: Сколько можно создавать процессов и потоков в программе? Чем мы платим за их создание? Ответ довольно прозаичен: мы платим оперативной памятью нашей машины. Для каждого процесса и потока ОС резервирует определенное количество памяти, а так как память конечна то без остановки создавая процесс или поток мы рано или поздно столкнемся с OOM (Out of memory error).
Чтобы избегать подобных ошибок в production системах ограничивается количество активных процессов и потоков с помощью популярных во многих фреймворках механизмов ThreadPool или ProcessPool.
Итоги
В данной статье я показал практическое применение потоков, процессов и корутин. На примерах разобрались в каких задач применим тот или иной инструмент. Для того чтобы упростить код были выбраны простые синтетические примеры, но при этом не противоречащие production коду который мы встречаем ежедневно в работе.
Полный код из статьи доступен на GitHub.
Статьи для дополнительного ознакомления:
Спасибо что прочитали до конца, буду рад любой конструктивной обратной связи в комментариях!