Ускорение Python в 2 раза с помощью multiprocessing, async и MapReduce

8702ae7bd2d082ba36d9350507f7bb0a.png

Недостатки Python: Медленная работа и проблема с GIL

Python — один из самых популярных языков программирования в мире благодаря своей простоте и удобству в использовании. Но как и в любом языке у Python есть и свои недостатки. Главный минус это скорость. Одной из причин медленной работы языка является его динамическая природа. Python — интерпретируемый язык, что означает, что код выполняется построчно с использованием интерпретатора Python. Это приводит к тому, что Python работает медленнее, чем компилируемые языки программирования, такие как C++ или Java.

GIL (Global Interpreter Lock) — это особенность интерпретатора Python, которая оказывает влияние на его многопоточность и параллельное выполнение. GIL является блокировкой, которая позволяет только одному потоку Python выполняться в любой конкретный момент времени, даже на многоядерных процессорах.

Везде ли Python медленный?

Если погрузиться глубже в Python, то можно найти замечательную библиотеку Asyncio, которая появилась впервые в версии 3.5 (2015 год). Библиотека позволяет заниматься асинхронным программированием с применением конкурентного выполнения кода, основанного на корутинах (ознакомиться с библиотекой можно по этой ссылке). Она ускоряет Python в IO Bound нагрузке. Например, библиотека asyncpg позволяет работать с Postgres со скоростью 1 миллион строк в секунду! Так что Python и не такой уж и медленный!

А что там с CPU-bound?

Ввод-вывод — то, для чего asyncio создавалась в первую очередь, при написание кода нужно следить, чтобы в сопрограммах не было счетного кода. На первый взгляд, это серьезно ограничивает asyncio, но на самом деле библиотека более универсальна. В asyncio имеется API для взаимодействия с библиотекой Python multiprocessing.

GIL препятствует параллельному выполнению нескольких участков байт-кода. Это означает, что для любых задач, кроме ввода-вывода, многопоточность не дает никакого выигрыша в производительности,  — в отличие от таких языков, как Java и C++. Для распараллеливания счетных задач Python не предлагает никакого решения, но на самом деле решение есть — библиотека multiprocessing. Вместо запуска потоков для распараллеливания работы родительский процесс будет запускать дочерние процессы. В каждом дочернем процессе работает отдельный интерпретатор Python со своей GIL.

Меньше слов больше практики!

Характеристики машины:
Процессор: Ryzen 5 5500U, 6 ядер, 12 потоков, Максимальная частота 4 ГГц
Память: 16 ОЗУ Частота: 2667 МГц, SSD

Напишем функцию. Функция будет совсем простой — подсчет от нуля до большого числа,  –, но она позволит понять, как работает API и какого выигрыша можно достичь.

"""Два параллельных процесса"""
import time
from multiprocessing import Process

def count(count_to: int) -> int:
    start = time.time()
    counter = 0
    while counter < count_to:
        counter = counter + 1
    end = time.time()
    print(f'Закончен подсчет до {count_to} за время {end-start}')
    return counter

if __name__ == "__main__":
    start_time = time.time()

    #Создаём процесс для выполнения функции count
    to_one_hundred_million = Process(target=count, args=(100000000,))
    to_two_hundred_million = Process(target=count, args=(200000000,))
    #Запускаем процесс. Этот метод возвращает управление немедленн
    to_one_hundred_million.start()
    to_two_hundred_million.start()
    #Ждём завершения процесса. 
    #Этот метод блокирует выполнение, пока процесс не завершится
    to_one_hundred_million.join()
    to_two_hundred_million.join()
    end_time = time.time()
    print(f'Полное время работы {end_time-start_time}')

Здесь мы создаем простую функцию count, которая принимает целое число и в цикле увеличивает его на единицу, пока не дойдет до переданной верхней границы. Затем создаем два процесса — один считает до 100 000 000, другой до 200 000 000. Класс Process принимает два аргумента: target — имя подлежащей выполнению функции и args — кортеж передаваемых ей аргументов. Затем для каждого процесса вызывается метод start. Он сразу же возвращает управление и начинает выполнять процесс. В данном примере оба процесса запускаются один за другим, после чего для каждого вызывается метод join. В результате главный процесс блокируется до тех пор, пока оба дочерних не завершатся.

В итоге получаем такие результаты:
Закончен подсчет до 100000000 за время 6.757261514663696
Закончен подсчет до 200000000 за время 12.582566976547241
Полное время работы 12.697719097137451

Обе функции count суммарно заняли бы чуть больше 19 с, но наше приложение завершилось за 12 с. То есть, по сравнению с последовательной версией, мы выиграли примерно 7 с. Это дает неплохой выигрыш в производительности, но выглядит неэлегантно, потому что мы должны вызывать start и join для каждого запущенного процесса. Тем более мы не знаем какой процесс закончится первым.

Использование пула процессов

В прошлом примере мы вручную создавали процессы и вызывали их методы start и join для запуска и ожидания завершения. У этого подхода обнаружилось несколько недостатков, от качества кода до невозможности получить результаты, возвращенные процессом. Модуль multiprocessing предлагает API, позволяющий справиться с этими проблемами,  — пулы процессов.

Рассмотрим пример:

"""Работа пула процессов"""
from multiprocessing import Pool

def say_hello(name: str) -> str:
    return f'Привет, {name}'

if __name__ == "__main__":
    with Pool() as process_pool:#Создание пула процессов
        hi_jeff = process_pool.apply(say_hello, args=('Jeff',))
        hi_john = process_pool.apply(say_hello, args=('John',))
        print(hi_jeff)
        print(hi_john)

Здесь мы создаем пул процессов в предложении with Pool () as process_pool. Это контекстный менеджер, потому что после завершения работы с пулом нужно корректно остановить созданные Python процессы. Если этого не сделать, то возникает риск утечки ценного ресурса — процессов. При создании этого пула автоматически создается столько процессов, сколько имеется процессорных ядер на данной машине. Количество ядер можно получить от функции multiprocessing.cpu_count (). Если это не годится, то можно передать функции Pool () произвольное целое число в аргументе processes. Но обычно значение по умолчанию является неплохой отправной точкой.  Далее мы применяем метод пула процессов apply, чтобы выполнить функцию say_hello в отдельном процессе.  Работать-то работает, но есть проблема. Метод apply блокирует выполнение, пока функция не завершится. Это значит, что если бы каждое обращение к say_hello занимало 10 с, то программа работала бы примерно 20 с, потому что выполнение последовательное и все усилия сделать ее параллельной пошли насмарку. Эту проблему можно решить, воспользовавшись методом пула процессов apply_async (это асинхронный вариант Pool.apply (). Он не дожидается результата завершения работы функции.)

Использование исполнителей пула процессов с asyncio

Мы видели, как использовать пулы процессов для конкурентного выполнения счетных операций. Такие пулы хороши для простых случаев, но Python предлагает абстракцию поверх пула процессов в модуле concurrent.futures. Абстрактный класс Executor, в котором определены два метода для асинхронного выполнения работы. Первый из них, submit, принимает вызываемый объект и возвращает объект Future — это эквивалент метода Pool.apply_async. Второй метод называется map. Он принимает вызываемый объект и список аргументов, после чего асинхронно выполняет объект с каждым из этих аргументов. Возвращается итератор по результатам вызовов.
Для демонстрации работы ProcessPoolExecutor снова возьмем пример с подсчетом чисел и выполним его для нескольких малых и нескольких больших границ, чтобы посмотреть, как появляются результаты.

"""Исполнители пула процессов"""
import time
from concurrent.futures import ProcessPoolExecutor

def count(count_to: int) -> int:
    start = time.time()
    counter = 0
    while counter < count_to:
        counter = counter + 1

    end = time.time()
    print(f'Закончен подсчет до {count_to} за время {end - start}')
    return counter

if __name__ == "__main__":
    with ProcessPoolExecutor() as process_pool:
        numbers = [1, 3, 5, 22, 100000000]
        for result in process_pool.map(count, numbers):
            print(result)

Закончен подсчет до 1 за время 0.0
Закончен подсчет до 3 за время 0.0
Закончен подсчет до 5 за время 0.0
1
Закончен подсчет до 22 за время 0.0
3
5
22
Закончен подсчет до 100000000 за время 6.688919544219971
100000000

Как и раньше, объект ProcessPoolExecutor создан под управлением контекстного менеджера. Количество ресурсов по умолчанию тоже равно числу процессорных ядер, как и в случае пула процессов. После этого мы используем метод process_pool.map для выполнения функции count, указывая список верхних границ. Запустив эту программу, мы увидим, что обращения к count с небольшими верхними границами заканчиваются быстро и печатаются почти мгновенно. Но обращение с границей 100000000 занимает куда больше времени и печатается после небольших чисел. Хотя кажется, что программа выполняет подсчёт чисел одновременно, но на самом деле порядок итераций детерминирован и определяется тем, в каком порядке следуют числа в списке numbers. Это значит, что если бы первым числом было 100000000, то пришлось бы ждать завершения соответствующего вызова, и только потом появилась бы возможность напечатать другие результаты, хотя они и были вычислены раньше.

Исполнители пула процессов в сочетании с циклом событий

Познакомившись с тем, как работают исполнители пула процессов, посмотрим, как включить их в цикл событий asyncio. Это позволит нам использовать функции gather* и as_completed**, для управления несколькими процессами.
*asyncio.gather () — Функция позволяет вызывающей стороне группировать объекты, допускающие ожидание. Эти объекты, после группировки, можно запустить в конкурентном режиме.
**asyncio.as_completed () — Функция запускает и ждет выполнения переданных в нее задач/awaitable-объектов, и как только появляются результаты у какой-нибудь задачи, в реальном времени, начинает возвращать их в итераторе

Имея пул, мы можем использовать специальный метод цикла событий asyncio -run_in_executor. Этот метод принимает выполняемый объект и исполнитель (пула процессов или пула потоков), после чего исполняет этот объект внутри пула и возвращает допускающий ожидание объект, который можно использовать в предложении await или передать какой-нибудь функции API, например gather.

Давайте еще раз реализуем предыдущий пример, теперь с исполнителем пула процессов. Мы подадим исполнителю несколько задач подсчета и будем ждать их завершения с помощью gather. Метод run_ in_executor принимает только вызываемый объект и не позволяет задать аргументы функции. Эту трудность мы обойдем, воспользовавшись частичным применением функции, чтобы сконструировать обращения к count, не нуждающиеся в аргументах

"""Исполнитель пула процессов с asyncio"""
import asyncio
from asyncio.events import AbstractEventLoop
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import List

def count(count_to: int) -> int:
    counter = 0
    while counter < count_to:
        counter = counter + 1
    return counter

async def main():
    with ProcessPoolExecutor() as process_pool:
        #Создать частично применяемую функцию count с фиксированным аргументом
        loop: AbstractEventLoop = asyncio.get_running_loop()
        nums = [1, 3, 5, 22, 100000000]
        #Сформировать все обращения к пулу процессов, поместив их в список
        calls: List[partial[int]] = [partial(count, num) for num in nums]
        call_coros = []

        for call in calls:
            call_coros.append(loop.run_in_executor(process_pool, call))
        #Ждать получения результатов
        results = await asyncio.gather(*call_coros)

        for result in results:
            print(result)

if __name__ == "__main__":
    asyncio.run(main())

Сначала мы, как и раньше, создаем исполнитель пула процессов.  Затем получаем цикл событий asyncio, поскольку run_in_executor — метод класса AbstractEventLoop. Затем с помощью частичного применения функции вызываем count с каждым числом из списка nums в качестве аргумента, поскольку прямой вызов с аргументом невозможен. Сформированные вызовы функции count можно передать исполнителю. Мы обходим эти вызовы в цикле, вызывая loop.run_in_executor для каждого и сохраняя полученные в ответ объекты, допускающие ожидание, в списке call_coros. Затем передаем этот список функции asyncio.gather и ждем завершения всех вызовов.

Решение задачи с помощью MapReduce и asyncio

Теперь подходим к самому сладкому. Чтобы понять, задачи какого типа можно решить применением техники MapReduce, рассмотрим гипотетическую проблему. А затем применим полученные знания к решению похожей задачи с большим набором данных, находящимся в свободном доступе. Предположим, что наш сайт получает большой объем текстовых данных через поле «Отзывы и предложения» в форме на портале технической поддержки. Поскольку наш сайт пользуется успехом, объем этого набора данных, содержащего отзывы клиентов, уже измеряется терабайтами и с каждым днем растет.

Простое решение — запустить один процесс и в цикле обработать все комментарии, запоминая, сколько раз встретилось каждое слово. Это будет работать, но, поскольку набор данных велик, его последовательный просмотр может занять много времени.

Именно для такого рода задач предназначена технология MapReduce. В модели программирования MapReduce большой набор данных сначала разбивается на меньшие части. Затем мы можем решить задачу для поднабора данных, а не для всего набора — это называется отображением (mapping), поскольку мы «отображаем» данные на частичный результат. После того как задачи для всех поднаборов решены, мы можем объединить результаты в окончательный ответ. Этот шаг называется редукцией (reducing), потому что «редуцируем» (сводим) несколько ответов в один. Подсчет частоты вхождения слов в большой набор текстовых данных — каноническая задача MapReduce. Если набор данных достаточно велик, то его разбиение на меньшие части может дать выигрыш в производительности, поскольку все операции отображения можно выполнять параллельно, как показано на этом рисунке:

Большой набор данных разбивается на разделы, после чего функция отображения  порождает промежуточные результаты, которые затем объединяются в окончательный

Большой набор данных разбивается на разделы, после чего функция отображения порождает промежуточные результаты, которые затем объединяются в окончательный

Чтобы лучше понять, как работает MapReduce, разберем конкретный пример. Предположим, что имеется файл, каждая строка которого содержит текстовые данные. Мы хотим подсчитать, сколько раз в этом наборе встречается каждое слово.
Сначала нужно разбить данные на меньшие порции. Для простоты примем за порцию одну строку. Затем нужно определить операцию отображения. Поскольку нам нужны частоты слов, будем разбивать строку текста по пробелам. Это даст нам массив слов в строке. После этого его можно обойти в цикле, запоминая различные встретившиеся слова в словаре.
Наконец, нужно определить операцию редукции. Она принимает один или несколько результатов операций отображения и объединяет их в окончательный ответ. В этом примере нам нужно взять два построенных операцией отображения словаря и объединить их в один. Если слово существует в обоих словарях, счетчики его вхождений складываются; если нет, мы копируем счетчик вхождений в результирующий словарь. Определив операции, мы можем применить операцию map к каждой строке текста, а затем операцию reduce к пара результатов отображения. Взглянем на код, который делает это для наших строк.

"""Однопоточная модель MapReduce"""
import functools
from typing import Dict

def map_frequency(text: str) -> Dict[str, int]:
    words = text.split(' ')
    frequencies = {}
    for word in words:
        if word in frequencies:
            #Если слово уже есть в словаре, то прибавить единицу к счетчику
            frequencies[word] = frequencies[word] + 1
        else:
            # Если слова еще нет в словаре, счетчик равным единице
            frequencies[word] = 1 
    return frequencies

def merge_dictionaries(first: Dict[str, int], second: Dict[str, int]) -> Dict[str, int]:
    merged = first
    for key in second:
        if key in merged:
            # Если слово встречается в обоих словарях, сложить счетчики
            merged[key] = merged[key] + second[key]
        else:
            # Если слово не встречается в обоих словарях, скопировать счетчик
            merged[key] = second[key]
    return merged

lines = [
"Я люблю вечерний пир",
"Где веселье председатель",
"А свобода мой кумир",
"За столом законодатель",
"Где до утра слово пей!"
"Заглушает крики песен",
"Где просторен круг гостей",
"А кружок бутылок тесен"
]
# Для каждой строки текста выполнить операцию map
mapped_results = [map_frequency(line) for line in lines]

for result in mapped_results:
    print(result)
# Редуцировать все промежуточные счетчики в окончательный результат
print(functools.reduce(merge_dictionaries, mapped_results))

К каждой строке текста мы применяем операцию map, что дает счетчики частот для каждой строки. Затем частичные результаты отображения можно объединить. Мы применяем нашу функцию слияния merge_dictionaries в сочетании с библиотечной функцией functools. reduce. В результате получается следующая картина:

{'Я': 1, 'люблю': 1, 'вечерний': 1, 'пир': 1}
{'Где': 1, 'веселье': 1, 'председатель': 1}
{'А': 1, 'свобода': 1, 'мой': 1, 'кумир': 1}
{'За': 1, 'столом': 1, 'законодатель': 1}
{'Где': 1, 'до': 1, 'утра': 1, 'слово': 1, 'пей!': 1, 'Заглушает': 1, 'крики': 1, 'песен': 1}
{'Где': 1, 'просторен': 1, 'круг': 1, 'гостей': 1}
{'А': 1, 'кружок': 1, 'бутылок': 1, 'тесен': 1}
Финальный результат:
{'Я': 1, 'люблю': 1, 'вечерний': 1, 'пир': 1, 'Где': 3, 'веселье': 1, 'председатель': 1, 'А': 2, 'свобода': 1, 'мой': 1, 'кумир': 1, 'За': 1, 'столом': 1, 'законодатель': 1, 'до': 1, 'утра': 1, 'слово': 1, 'пей!': 1, 'Заглушает': 1, 'крики': 1, 'песен': 1, 'просторен': 1, 'круг': 1, 'гостей': 1, 'кружок': 1, 'бутылок': 1, 'тесен': 1}

Разобравшись с основами технологии MapReduce на модельной задаче, посмотрим, как применить ее к реальному набору данных, для которого библиотека multiprocessing может дать выигрыш в производительности.

Большой набор данных

Нам нужен достаточно большой набор данных, чтобы продемонстрировать все преимущества сочетания MapReduce с библиотекой multiprocessing. Если набор данных слишком мал, то мы, скорее всего, увидим не преимущества, а падение производительности из-за накладных расходов на управление процессами.
Набор данных Google Books Ngram достаточен для наших целей.

Набор данных Google Books Ngram состоит из n-грамм, взятых из 8  000  000 книг, начиная с 1500 года. Это более шести процентов от всех изданных в мире книг. Подсчитано, сколько раз каждая уникальная n-грамма встречается в текстах, и результаты сгруппированы по годам. В этом наборе данных присутствуют n-граммы для n от 1 до 5, представленные в формате с табуляторами. Каждая строка набора содержит n-грамму, год ее появления, сколько раз она встречалась и в скольких книгах. Рассмотрим первые несколько строк набора униграмм для слова aardvark:

Aardvark 1822 2 1
Aardvark 1824 3 1
Aardvark 1827 10 7

Это означает, что в 1822 году слово aardvark (трубкозуб) дважды встретилось в одной книге. А в 1827 году оно встретилось десять раз в семи книгах. В наборе данных есть гораздо больше строк для слова aardvark (например, в 2007 году оно встретилось 1200 раз), что доказывает все более частое упоминание трубкозубов в литературе с течением времени. В этом примере мы подсчитаем количество вхождений одиночных слов (униграмм), начинающихся с буквы a. Этот набор данных занимает приблизительно 1,8 Гб. Мы агрегируем данные по количеству упоминаний каждого слова в литературе, начиная с 1500 года. И воспользуемся этим для ответа на вопрос: «Сколько раз слово aardvark встречалось в литературе с 1500-го года?» (Скачать файл можно по этой ссылке или этой)

Применение asyncio для отображения и редукции

Чтобы было с чем сравнивать, напишем сначала синхронную версию подсчета частот слов. Прежде всего нужно загрузить весь набор данных в память. А затем построить словарь, в котором будет храниться отображение слов на количество вхождений. Для каждой строки файла проверяется, имеется ли уже данное слово в словаре. Если да, то его счетчик в словаре увеличивается на единицу, а иначе слово добавляется в словарь со счетчиком, равным 1.

"""Подсчет частот слов"""
import gzip
import time

freqs = {}

with gzip.open('googlebooks-eng-all-1gram-20120701-a.gz', 'rb') as f:
    lines = f.read().decode('utf-8').splitlines()

    start = time.time()
    for line in lines:
        data = line.split('\t')
        word = data[0]
        count = int(data[2])
        if word in freqs:
            freqs[word] = freqs[word] + count
        else:
            freqs[word] = count

    print(f"Aardvark встречается {freqs['Aardvark']} раз.")
    end = time.time()

print(f'{end-start:.4f}')

Во время, затраченное на счетную операцию, мы будем включать только время подсчета частот, не учитывая время загрузки файла. Было затрачено 47.8429 секунды.

Посмотрим, удастся ли улучшить этот результат, воспользовавшись библиотеками multiprocessing и asyncio. При запуске на машине с меньшим числом ядер или других ресурсов результат может быть иным. Первым делом разобьем наш набор данных на меньшие порции. Определим генератор разбиения, который принимает большой список данных и выделяет порции произвольного размера.

def partition(data: List, 
              chunk_size: int) -> Generator[List[str], None, None]:
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

Этот генератор можно использовать для создания порций размера chunk_size. Нам он понадобится для создания порций, передаваемых функциям отображения, которые затем будут запущены параллельно. Далее определим функцию отображения. Она мало чем отличается от функции из предыдущего примера, разве что адаптирована к нашему набору данных

def map_frequencies(chunk: List[str]) -> Dict[str, int]:
    counter = {}
    for line in chunk:
        word, _, count, _ = line.split('\t')
        if counter.get(word):
            counter[word] = counter[word] + int(count)
        else:
            counter[word] = int(count)
    return counter

Пока что оставим ту же операцию редукции, что в предыдущем примере. Теперь у нас есть все необходимое для распараллеливания операций отображения. Мы создадим пул процессов, разобьем данные на порции и для каждой порции выполним функцию map_frequencies, забрав ресурс («исполнитель») из пула. Остается только один вопрос: как выбрать размер порции?  

Простого ответа на этот вопрос не существует. Есть эвристическое правило — сбалансированный подход:  порция не должна быть ни слишком большой, ни слишком маленькой. Маленькой она не должна быть потому, что созданные порции сериализуются (в формате pickle) и раздаются исполнителям, после чего исполнители десериализуют  их. Процедура сериализации и десериализации может занимать много времени, сводя на нет весь выигрыш, если производится слишком часто. Например, размер порции, равный 2, — заведомо неудачное решение, потому что потребовалось бы почти 1 000 000 операций сериализации и десериализации.

Но и слишком большая порция — тоже плохо, поскольку это не даст нам в полной мере задействовать возможности компьютера. Например, если имеется 10 ядер, но всего две порции, то мы ничем не загружаем восемь ядер, которые могли бы работать параллельно.
Для этого примера я выбрал размер порции 20 000. Если вы будете применять такой подход в своих задачах обработки данных, то пробуйте разные размеры порций, пока не найдете подходящий для имеющейся машины и набора данных, или разработайте эвристический алгоритм определения правильного размера. Теперь объединим все эти части с пулом процессов и методом run_in_executor цикла событий для распараллеливания операций отображения.

"""Распараллеливание с помощью MapReduce и пула процессов"""
import asyncio
import concurrent.futures
import functools
import time
from typing import Dict, List
import gzip
from typing import Generator


def partition(data: List, 
              chunk_size: int) -> Generator[List[str], None, None]:
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

def map_frequencies(chunk: List[str]) -> Dict[str, int]:
    counter = {}
    for line in chunk:
        word, _, count, _ = line.split('\t')
        if counter.get(word):
            counter[word] = counter[word] + int(count)
        else:
            counter[word] = int(count)
    return counter

def merge_dictionaries(first: Dict[str, int], second: Dict[str, int]) -> Dict[str, int]:
    merged = first
    for key in second:
        if key in merged:
            merged[key] = merged[key] + second[key]
        else:
            merged[key] = second[key]
    return merged


async def main(partition_size: int):
    with gzip.open('googlebooks-eng-all-1gram-20120701-a.gz', 'rb') as f:
        contents = f.read().decode('utf-8').splitlines()
        loop = asyncio.get_running_loop()
        tasks = []
        start = time.time()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            for chunk in partition(contents, partition_size):
                #Для каждой порции выполнить операцию отображения в отдельном процессе
                task = loop.run_in_executor(pool, 
                                            functools.partial(map_frequencies, chunk))
                tasks.append(task)
            #Ждать завершения всех операций отображения
            intermediate_results = await asyncio.gather(*tasks)
            #Редуцировать промежуточные результаты в окончательный
            final_result = functools.reduce(merge_dictionaries, intermediate_results)
        print(f"Aardvark встречается {final_result['Aardvark']} раз.")
        end = time.time()
        print(f'Время MapReduce: {(end - start):.4f} секунд')

if __name__ == "__main__":
    asyncio.run(main(partition_size=20000))

В сопрограмме main мы создаем пул процессов и разбиваем данные на порции. Для каждой порции функция map_frequencies исполняется в отдельном процессе. Затем с помощью asyncio.gather ждем завершения построения промежуточных словарей. Когда все операции отображения завершатся, мы выполняем операцию редукции для получения окончательного результата.

Эта программа завершается примерно за 23.5326 с, т. е. мы получили выигрыш в 2 раза по сравнению с последовательной версией!!!. Неплохо, учитывая, что дополнительного кода мы написали не так уж много! Можете поэкспериментировать с машинами, имеющими больше ядер, и посмотреть, нельзя ли еще улучшить производительность этого алгоритма.

Какой по итогу вывод?

В этой статье я рассмотрел способы выполнять несколько задач одновременно в Python, используя пул процессов. Мы изучили, как создавать пул процессов и запускать функции параллельно. Мы также рассмотрели использование asyncio для одновременного выполнения задач и ожидания результатов. Мы обсудили применение модели программирования MapReduce с использованием пула процессов и asyncio для эффективного решения задач. Многопоточную программу которая ускорила код в 2 раза!

Я надеюсь, что каждый читатель этой статьи найдет что-то ценное для себя: новую информацию или обновит свои прошлые знания.

Вдохновлялся книгой: Asyncio и конкурентное программирование на Python

© Habrahabr.ru