[Перевод] Сколько ядер CPU можно использовать параллельно в Python?

51c1393d8c9ee7191a77c3ddb2cbe8c4.png

При выполнении параллельной программы, активно задействующей CPU, нам часто необходимо, чтобы пул потоков или процессов имел размер, сопоставимый с количеством ядер CPU на машине. Если потоков меньше, то вы будете использовать все преимущества ядер, если меньше, то программа начнёт работать медленнее, так как несколько потоков будет конкурировать за одно ядро. Ну, или такова ситуация в теории.

Как же проверить, сколько ядер есть у компьютера? И действительно ли это хороший совет?

Оказывается, на удивление сложно определить, сколько потоков выполнять:

  • В стандартной библиотеке Python есть множество API для получения этой информации, но ни одного из них недостаточно.

  • Хуже того, из-за таких функций CPU, как параллельность на уровне команд и одновременной многопоточности (Hyper-threading в CPU Intel), количество ядер, которое можно эффективно использовать, зависит от того кода, который напишете вы!

Давайте разберёмся, почему так сложно определить, сколько ядер CPU может использовать программа, а затем подумаем над потенциальным решением.

Получение количества ядер CPU из Python

Если вы читали документацию стандартной библиотеки Python, то знаете, что в ней есть стандартная функция os.cpu_count(), возвращающая «количество логических CPU». Что значит «логических»? Мы вернёмся к этому чуть позже.

В документации также говорится, что »len(os.sched_getaffinity(0)) получает количество логических CPU, которым ограничен вызывающий поток текущего процесса». Ограничение процесса конкретными ядрами обеспечивается привязкой планировщика (scheduler affinity).

К сожалению, этого API тоже недостаточно. Например, в Linux есть  API cgroups, используемый для реализации Docker и других контейнерных систем; он имеет множество способов ограничения нагрузки на CPU. В примере ниже мы ограничиваем CPU эквивалентом 2,25 ядер; механизм отличается, но результат будет схожим:

$ docker run -i -t --cpus=2.25 python:3.12-slim
Python 3.12.1 (main, Dec  9 2023, 00:21:37) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> os.cpu_count()
20
>>> len(os.sched_getaffinity(0))
20

Одновременно мы можем использовать эквивалент 2,25 ядер, но ни один API не знает об этом.

Что такое логический CPU?

Опции операционной системы — лишь начало наших проблем, но прежде чем рассматривать пример, мы должны понять, что такое физические и логические ядра CPU. В моём компьютере установлен процессор Intel i7–12700K, обладающий:

  • 12 физическими ядрами (8 высокопроизводительных ядер и 4 менее мощных).

  • 20 логическими ядрами.

Современные ядра CPU способны параллельно исполнять несколько команд. Но что произойдёт, если CPU «зависнет», ожидая загрузки каких-то данных из ОЗУ? Он не сможет выполнять никакую работу, пока этого не произойдёт.

Чтобы позволить использовать эти потенциально впустую растрачиваемые ресурсы, вычислительные ресурсы физического ядра CPU могут предоставляться операционной системе как несколько ядер. В моём CPU каждое из восьми быстрых ядер можно представить как два ядра, что даёт суммарно 16 логических ядер. Пары логических ядер будут совместно использовать вычислительные ресурсы одного физического ядра. Например, если логическое ядро не полностью использует внутренние АЛУ, допустим, потому что оно ожидает загрузки из памяти, то код, выполняемый парным логическим ядром, всё равно сможет использовать эти простаивающие ресурсы.

Эта технология называется одновременной многопоточностью (Hyper-threading в терминологии Intel). Если у вас PC, то эту функцию часто можно отключить в BIOS.

Это объяснение очень неточное, и настоящая реализация может быть разной на различных моделях CPU, даже от одного производителя. Но главное здесь то, что логические ядра не совпадают с физическими ядрами, и этого объяснения достаточно для наших целей.

Итак, у нас возникает новый вопрос. Если забыть о привязке планировщика и тому подобном, нужно ли нам использовать количество физических или логических ядер в качестве размера пула потоков?

Неожиданный пример параллельности

Рассмотрим две функции, компилируемые в машинный код при помощи Numba. Чтобы обеспечить параллельность, мы отключим GIL.

Обе функции делают одно и то же, но одна гораздо быстрее другой. Мы можем запустить эти функции параллельно в нескольких потоках и теоретически получать линейный рост производительности, пока не закончатся ядра, просто благодаря параллельной обработке большего количества изображений.

from numba import njit
import numpy as np

@njit(nogil=True)
def slow_threshold(img, noise_threshold):
    noise_threshold = img.dtype.type(noise_threshold)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            result[i, j] = img[i, j] // 256
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            if result[i, j] < noise_threshold // 256:
                result[i, j] = 0
    return result

@njit(nogil=True)
def fast_threshold(img, noise_threshold):
    noise_threshold = np.uint8(noise_threshold // 256)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            value = img[i, j] >> 8
            value = (
                0 if value < noise_threshold else value
            )
            result[i, j] = value
    return result

Мы запустим функцию для изображения и замерим, как долго она выполняется:

rng = np.random.default_rng(12345)

def make_image(size=256):
    noise = rng.integers(0, high=1000, size=(size, size), dtype=np.uint16)
    signal = rng.integers(0, high=5000, size=(size, size), dtype=np.uint16)
    # A noisy, hard to predict image:
    return noise | signal

NOISY_IMAGE = make_image()
assert np.array_equal(
    slow_threshold(NOISY_IMAGE, 1000),
    fast_threshold(NOISY_IMAGE, 1000)
)

Вот сколько времени требуется для выполнения каждой из функций на одном ядре:

%timeit slow_threshold(NOISY_IMAGE, 1000)
90.6 µs ± 77.7 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

и

%timeit fast_threshold(NOISY_IMAGE, 1000)
24.6 µs ± 10.8 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Вам интересно, почему быстрая функция намного быстрее? Возможно, стоит прочитать книгу об оптимизации низкоуровневого кода, над которой я работаю.

Масштабируемся до нескольких ядер

Теперь, когда у нас есть пара функций, можно обработать список изображений при помощи пула потоков; при этом каждый поток будет обрабатывать по 10 изображений за раз:

from multiprocessing.dummy import Pool as ThreadPool

def apply_in_thread_pool(
    num_threads, function, images
):
    with ThreadPool(num_threads) as pool:
        result = pool.map(
            lambda img: function(img, 1000),
            images,
            chunksize=10
        )
        assert len(result) == len(images)

Далее мы создадим графики времени выполнения при разных количествах потоков различных функций при помощи библиотеки benchit (можно также использовать perfplot, но учтите, что она имеет лицензию GPL):

import benchit
benchit.setparams(rep=1)

# Через каждый поток будет обработано 4000 изображений:
IMAGES = [make_image() for _ in range(4000)]

def slow_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, slow_threshold, IMAGES)

def fast_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, fast_threshold, IMAGES)

# Измеряем показатели двух функций с использованием от 1 до 24 потоков:
timings = benchit.timings(
    [slow_threshold_in_pool, fast_threshold_in_pool],
    range(1, 25),
    input_name="Number of threads"
)
timings.plot(logy=True, logx=False)

Graph showing slow vs fast runtime. Both variants have a run time that declines as the number of threads increases, up to a certain minimum runtime. Past that minimum, adding more threads actually slows things down. Slow and fast have different optimal number of threads, though!

График времени исполнения медленной и быстрой функций. У обоих вариантов время исполнения снижается с увеличением количества потоков до определённого минимального времени. Ниже этого минимума добавление потоков замедляет работу. Однако у медленной и быстрой функций оптимальное количество потоков различается!

Обратите внимание, что время исполнения снижается с увеличением количества потоков… до определённой точки. После неё время исполнения снова начинает повышаться. Этого мы и ожидали. Но есть и нечто неожиданное: оптимальное количество потоков для двух функций различается.

timings.to_dataframe().idxmin(axis="rows")

Функции

Оптимальное количество потоков

slow_threshold

19

fast_threshold

9

Оптимальный уровень параллельности также зависит от вашего кода

Медленная функция, по сути, способна использовать преимущества всех логических ядер. Возможно, один поток не использует полностью всю доступную вычислительную мощь конкретного физического ядра, так что логические ядра обеспечивают бОльшую параллельность.

И наоборот, более быстрая функция может использовать преимущества не более чем девяти ядер; при увеличении их количества начинается замедление. Возможно, она упирается в какое-то другое узкое место, не в вычислительные ресурсы, например, в пропускную способность памяти.

Не существует размера пула потоков, оптимального для обеих функций.

Другой подход: эмпирические измерения

При определении оптимального количества потоков мы столкнулись с несколькими проблемами:

  1. Сложно получить точное количество ядер с учётом всевозможных способов, которыми операционная система способна ограничивать используемые ресурсы CPU.

  2. Оптимальный уровень параллельности, то есть количество потоков зависит от рабочей нагрузки.

  3. Количество ядер — не единственное узкое место.

  4. Бонусная проблема: если вы исполняете код в облаке, то используете «vCPU», что бы это ни значило. Например, в разных инстансах используются разные модели CPU.

Есть и другой подход: определять оптимальное количество потоков эмпирически, во время исполнения. В показанном выше примере мы измеряли оптимальное количество потоков для конкретного фрагмента кода. Если у вас длительная задача по обработке данных, долго исполняющую один и тот же код в нескольких потоках, то можно сделать так же. То есть потратить немного времени в начале процесса, чтобы эмпирически замерить оптимальное количество потоков, возможно, с какими-то эвристиками для компенсации шума.

Если в среде исполнения вы используете эмпирические измерения, то вам не нужно беспокоиться о том, почему оптимально конкретное количество потоков. Вне зависимости от оборудования, конфигурации операционной системы или облачной среды вы будете использовать оптимальный уровень параллельности.

© Habrahabr.ru