Как работает multiprocessing в Python под капотом

53dd4c94d3363a853d53a71497d58e59

Я довольно давно пишу на Python и во многих проектах использовал multiprocessing — пакет стандартной библиотеки языка Python, который предоставляет интерфейс для работы с процессами, очередями, пулами процессов и многими другими удобными инструментами для параллельного программирования. В какой-то момент я понял, что мне не хватает более детального понимания работы этой библиотеки.

Мне захотелось залезть в исходники multiprocessing, разобраться и заодно написать статью. Данная статья в основном рассчитана на новичков в Python и тех, кто хочет подробнее разобраться в том, как именно создаются процессы и пулы в Python и погрузиться в детали реализации.

В статье я не буду рассказывать что такое процессы и зачем они нужны. Почитать самую базу про операционные системы и процессы можно, например, тут и тут. Также важно уточнить, что весь приведенный в статье код соответствует версии Python 3.11.4

Содержание

  1. Создание нового процесса в ОС

  2. Способы создания нового процесса в multiprocessing

  3. Контекст (Context)

  4. Процесс (Process)

    1. Создание процесса через fork

    2. Создание процесса через spawn

    3. Завершение процесса

  5. Пул процессов (ProcessPool)

Создание нового процесса в ОС

Процессы в ОС создаются с помощью системных вызовов — низкоуровневых функций операционной системы, позволяющих пользовательским программам взаимодействовать с ОС.

В UNIX-подобных системах для создания нового процесса используется системный вызов fork. На самом деле существует целое семейство системных вызовов: fork, vfork, clone. Их суть очень похожа и в данной статье нам достаточно рассмотреть только fork.
Системный вызов fork создает копию текущего процесса, возвращает ноль в дочернем процессе и PID ребенка в родительском процессе. Важно сказать, что фактического копирования и выделения памяти при создании процесса не происходит. Вместо этого используется технология copy-on-write, которая создаёт копию страницы памяти только при попытке записи в эту страницу. Это позволяет уменьшить количество потребляемой процессами памяти и значительно ускорить создание процессов.

В Windows для создания процессов используется системный вызов CreateProcess из WinApi. В отличие от UNIX-подобных систем, в Windows в созданный процесс сразу загружается программа, переданная в аргументы системного вызова.

Способы создания нового процесса в multiprocessing

В пакете multiprocessing есть 3 основных метода для создания нового процесса: fork, spawn и forkserver.

fork:

  • Как понятно из названия, использует системный вызов fork для создания нового процесса

  • В multiprocessing является способом создания процессов по умолчанию на POSIX системах кроме macOS

  • Не поддерживается на windows

spawn:

  • Запускает новый процесс, используя переданную в него команду. В нашем случае будет передаваться команда для запуска процесса с интепретатором Python. В аргументы интепретатору передается путь к запускаемому файлу, а также некоторые другие служебные аргументы

  • В multiprocessing является способом создания процессов по умолчанию на macOS и windows

  • Обычно работает медленнее, чем fork

forkserver:

  • Создаётся серверный процесс, который создаёт процессы методом fork. Когда требуется новый процесс, родительский процесс соединяется с сервером и запрашивает у него форк нового процесса. Этот метод сочетает в себе скорость работы fork с хорошей надежностью (так как серверный процесс, от которого создается дочерний, имеет простое состояние)

  • Не поддерживается на windows

В данной статье я разберу только первые два метода, так как они являются самыми часто используемыми.

Контекст (Context)

Первое, с чем предстоит разобраться при погружении в реализацию multiprocessing — это класс контекста. Контекст имеет такое же API, что и сам модуль multiprocessing, однако он позволяет установить и зафиксировать способ создания новых процессов: fork, spawn или forkserver.

import multiprocessing as mp

if __name__ == '__main__':
    ctx_spawn = mp.get_context('spawn')
    ctx_fork = mp.get_context('fork')

Контекст позволяет использовать разные методы создания процесса внутри одной программы. В примере выше все процессы, созданные через объект контекста ctx_spawn, будут созданы методом spawn, а через ctx_fork, соответственно, методом fork.

Под каждый из методов создания процессов реализован класс контекста, наследованный от BaseContext. Эти классы отличаются только классом процесса, который используется для порождения дочерних процессов:

class ForkContext(BaseContext):  
    _name = 'fork'  
    Process = ForkProcess  # Все созданные процессы будут иметь тип ForkProcess
  
class SpawnContext(BaseContext):  
    _name = 'spawn'  
    Process = SpawnProcess  # Все созданные процессы будут иметь тип SpawnProcess

BaseContext, в свою очередь, просто реализует API как у пакета multiprocessing. Исходный код можно посмотреть тут.

Для создания процессов можно и не использовать контекст, а напрямую создавать multiprocessing.Process. В этом случае просто используется контекст по умолчанию, который лежит в глобальной переменной _default_context. Дефолтный контекст определяется в зависимости от типа вашей ОС. Для windows и macOS — это spawn, а для остальных — fork. Это можно наглядно увидеть в исходном коде:

if sys.platform != 'win32':
	# тут объявляются классы контекстов и процессов для POSIX систем
	...

	_concrete_contexts = {  
	    'fork': ForkContext(),  
	    'spawn': SpawnContext(),  
	    'forkserver': ForkServerContext(),  
	}  
	if sys.platform == 'darwin':  
	    # на macOS используется spawn по умолчанию
		_default_context = DefaultContext(_concrete_contexts['spawn'])  
	else:  
		# на всех остальных POSIX системах используется fork
	    _default_context = DefaultContext(_concrete_contexts['fork'])
else:
	# тут объявляются класс контекста и процесса для windows
	...
	
	_concrete_contexts = {  
	    'spawn': SpawnContext(),  
	}  
	# на windows используется spawn по умолчанию
	_default_context = DefaultContext(_concrete_contexts['spawn'])

Внимательные читатели заметили класс DefaultContext— этот класс контекста просто использует дефолтный для текущей ОС метод создания процессов.

Процесс (Process)

Рассмотрим самый простой пример создания процесса с помощью модуля multiprocessing и попробуем разобраться, что происходит под капотом.

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

В этом примере класс Process принимает параметры target — функция, которую нужно запустить в дочернем процессе, и args — параметры для этой функции. Давайте посмотрим на объявление класса Process:

class Process(process.BaseProcess):  
    _start_method = None 
     
    @staticmethod  
    def _Popen(process_obj):  
        return _default_context.get_context().Process._Popen(process_obj)  
  
    @staticmethod  
    def _after_fork():  
        return _default_context.get_context().Process._after_fork()

Process является классом процесса, используемым по умолчанию, он наследуется от BaseProcess — базового для всех процессов. Еще есть классы ForkProcess, SpawnProcess, ForkServerProcess — реализации под определённый метод создания процесса. Все классы реализуют статический метод _Popen для создания нового процесса и _after_fork для чистки после создания дочернего процесса.
Можно заметить, что в Process метод создания процесса не установлен. Это потому, что он берется из контекста по умолчанию. _default_context.get_context() возвращает объект типа ForkContext, SpawnContext или ForkServerContext. У контекста есть атрибут Process, у которого уже вызываются методы _Popen и _after_fork, имеющие конкретную реализацию под тип запуска.

Вернемся к нашему примеру. При вызове p.start() внутри класса процесса вызывается статический метод _Popen(). В нём инициализируется класс Popen (см. код ниже), который отвечает за создание процесса и взаимодействие с ним.

class ForkProcess(process.BaseProcess):  
    _start_method = 'fork'  
    @staticmethod  
    def _Popen(process_obj):  
        from .popen_fork import Popen  
        return Popen(process_obj)

class SpawnProcess(process.BaseProcess):  
    _start_method = 'spawn'  
    @staticmethod  
    def _Popen(process_obj):  
        from .popen_spawn_posix import Popen  
        return Popen(process_obj)  
  
    @staticmethod  
    def _after_fork():  
        # process is spawned, nothing to do  
        pass

Создание процесса через fork

Сначала рассмотрим как создаются процессы методом fork. Как мы уже знаем, при старте процесса инициализируется класс Popen, который имеет реализацию под каждый из способов создания процессов. Для fork реализацию можно найти в файле popen_fork.py.

class Popen(object):  
    method = 'fork'  
  
    def __init__(self, process_obj):  
        util._flush_std_streams()  
        self.returncode = None  
        self.finalizer = None  
        # при инициализации вызывается метод _launch
        self._launch(process_obj)

	# другие методы
	# ...

	def _launch(self, process_obj):  
	    code = 1  
	    # создаем две пары дескриптором
	    parent_r, child_w = os.pipe()
	    child_r, parent_w = os.pipe()  
	    self.pid = os.fork()  # создаем новый процесс
	    if self.pid == 0:
		    # в эту ветвь заходит в дочернем процессе
	        try:  
	            os.close(parent_r)  
	            os.close(parent_w)  
	            code = process_obj._bootstrap(parent_sentinel=child_r)  
	        finally:  
	            os._exit(code)  
	    else:  
		    # в эту ветвь заходит в родительском процессе
	        os.close(child_w)  
	        os.close(child_r)  
	        self.finalizer = util.Finalize(self, util.close_fds,  
	                                       (parent_r, parent_w,))  
	        self.sentinel = parent_r

process_obj тут — это объект процесса multiprocessing.Process.

Для создания процесса используется функция os.fork(), которая использует системный вызов fork. os.fork(), как и системный вызов fork, возвращает ноль в дочернем процессе и PID ребенка в родительском процессе. fork полностью копирует родительский процесс, поэтому выполнение программы продолжается с той же инструкции.
В дочернем процессе вызывается метод _bootstrap объекта процесса, внутри которого как раз и вызывается нужная пользовательская функция.

Теперь мы посмотрели весь путь от p.start() до запуска таргетной функции. Давайте резюмируем. Внутри p.start() берется дефолтный контекст и создаётся класс Popen, имеющий реализацию под каждый из методов создания новых процессов. В случае с fork для создания нового процесса используется функция os.fork(), создающая копию родительского процесса. В дочернем процессе вызывается таргетная функция, а родительский процесс чистит ненужные объекты и завершает выполнение метода p.start().

Создание процесса через spawn

Как запускается процесс при использовании SpawnProcess? В пакете multiprocessing есть реализации метода spawn как под windows, так и под POSIX системы. Смысл их работы один и тот же, просто используются разные интерфейсы для работы с ОС. Поэтому для простоты рассмотрим реализацию под POSIX системы. Весь код процесса почти совпадает с реализацией для fork, основное отличие в классе Popen, используемом для создания новых процессов:

class Popen(popen_fork.Popen):  
    method = 'spawn'  
    DupFd = _DupFd  
  
    def __init__(self, process_obj):  
        self._fds = []  
        super().__init__(process_obj)  
  
    # другие методы
	# ...
	
    def _launch(self, process_obj):  
        from . import resource_tracker  
        tracker_fd = resource_tracker.getfd()  
        self._fds.append(tracker_fd)  
        prep_data = spawn.get_preparation_data(process_obj._name)  
        fp = io.BytesIO()  
        set_spawning_popen(self)  
        try:  
            reduction.dump(prep_data, fp)  # сериализация словаря с информацией о процессе
            reduction.dump(process_obj, fp) # сериализация объекта процесса
        finally:  
            set_spawning_popen(None)  
  
        parent_r = child_w = child_r = parent_w = None  
        try:  
            parent_r, child_w = os.pipe() # создаем 4 дескриптора для обмена данными
            child_r, parent_w = os.pipe()  
            cmd = spawn.get_command_line(tracker_fd=tracker_fd,  
                                         pipe_handle=child_r)  
            self._fds.extend([child_r, child_w])  
            self.pid = util.spawnv_passfds(spawn.get_executable(),  
                                           cmd, self._fds)  
            self.sentinel = parent_r  
            with open(parent_w, 'wb', closefd=False) as f:  
                f.write(fp.getbuffer())  
        finally:  
            fds_to_close = []  
            for fd in (parent_r, parent_w):  
                if fd is not None:  
                    fds_to_close.append(fd)  
            self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)  
  
            for fd in (child_r, child_w):  
                if fd is not None:  
                    os.close(fd)

process_obj тут также является объектом процесса multiprocessing.Process.

Разберем основные части:

prep_data = spawn.get_preparation_data(process_obj._name)

Сначала собирается информация о родительском процессе, которая потребуется дочернему процессу для десериализации объекта родителя. Эта информация включает в себя путь к интерпретируемому файлу, аргументы запуска, директорию, откуда запускался файл, и др.

reduction.dump(prep_data, fp)
reduction.dump(process_obj, fp)

Затем сериализуется словарь с информацией о родительском процессе и сам объект процесса. reduction.dump внутри вызывает стандартный pickle.dump. Pickle — модуль языка Python, позволяющий преобразовывать объекты языка в поток байтов (сериализовывать) и, соответственно, десериализовывать. Этот модуль используется в пакете multiprocessing для передачи Python-объектов между процессами.

cmd = spawn.get_command_line(tracker_fd=tracker_fd,  
							 pipe_handle=child_r)  
self._fds.extend([child_r, child_w])  
self.pid = util.spawnv_passfds(spawn.get_executable(),  
							   cmd, self._fds) 

Дальше создается команда для запуска интепретатора, в которую передаются все нужные аргументы. Затем util.spawnv_passfds запускает эту команду в новом процессе. Также туда передаются файловые дескрипторы, которые должны остаться открытыми в новом процессе.

with open(parent_w, 'wb', closefd=False) as f:  
	f.write(fp.getbuffer())  

Затем в дочерний процесс передаётся сериализованная информация о процессе. В дочернем процессе десериализуется переданный объект процесса и запускается таргетная функция. Profit!

Давайте углубимся и посмотрим как именно создается процесс в функции util.spawnv_passfds:

# Start a program with only specified fds kept open  
def spawnv_passfds(path, args, passfds):  
    import _posixsubprocess  
    import subprocess  
    passfds = tuple(sorted(map(int, passfds)))  
    errpipe_read, errpipe_write = os.pipe()  
    try:  
        return _posixsubprocess.fork_exec(  
            args, [path], True, passfds, None, None,  
            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,  
            False, False, -1, None, None, None, -1, None,  
            subprocess._USE_VFORK)  
    finally:  
        os.close(errpipe_read)  
        os.close(errpipe_write)

Для создания нового процесса используется метод fork-exec. Создание нового процесса выполняется двумя системными вызовами. Сначала fork создает дочерний процесс, который копирует процесс родителя. Затем в дочернем процессе вызывается системный вызов exec (на самом деле системного вызова exec нет, под этим мы будем иметь в виду семейство из нескольких подобных системных вызовов, так как их суть в одном и том же). Exec запускает новый исполняемый файл в контексте уже существующего процесса, заменяя предыдущий исполняемый файл. Таким образом, исполняемый файл меняется внутри одного процесса.

Завершение процесса

Вернемся к изначальному примеру:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Теперь мы знаем, что происходит, когда мы создаем процесс и вызываем у него метод start(). А что происходит при вызове join()?

При вызове join() родительский процесс дожидается завершения дочернего. Сначала происходит несколько проверок, а затем вызывается метод wait у уже знакомого нам класса Popen:

class Popen(object):
	# другие методы
	# ...
	
	def poll(self, flag=os.WNOHANG):  
	    if self.returncode is None:  
	        try:  
		        # если процесс еще не завершился, то ждем
	            pid, sts = os.waitpid(self.pid, flag)  
	        except OSError:  
	            # Child process not yet created. See #1731717  
	            # e.errno == errno.ECHILD == 10            return None  
	        if pid == self.pid:  
	            self.returncode = os.waitstatus_to_exitcode(sts)  
	    return self.returncode  
	  
	def wait(self, timeout=None):  
		# проверяем завершился ли уже процесс
	    if self.returncode is None:  
	        if timeout is not None:  
	            from multiprocessing.connection import wait  
	            if not wait([self.sentinel], timeout):  
	                return None  
	        # This shouldn't block if wait() returned successfully.  
	        return self.poll(os.WNOHANG if timeout == 0.0 else 0)  
	    return self.returncode

Там проверяется, завершился ли уже процесс — если да, то просто возвращается returncode дочернего процесса. Если нет, то дожидается завершения дочернего процесса, используя функцию os.waitpid().

Пул процессов

Помимо процессов, пакет multiprocessing предоставляет множество полезных классов и функций для работы с процессами. Один из самых часто используемых инструментов — пул процессов. Пул (англ. Pool) позволяет распараллелить выполнение функции на множестве значений, используя для этого несколько процессов.

При создании объекта Pool указывается количество рабочих процессов, которые будут выполнять задачи. Также ему можно передать контекст, который будет использован для запуска процессов. В рамках данной статьи нас интересуют три основных метода класса Pool:

  • Pool.apply() — вызывает функцию с аргументами

  • Pool.apply_async() — асинхронный вариант Pool.apply(). То есть apply_async() не дожидается результата завершения работы функции

  • Pool.map() — многопроцессорный аналог встроенной функции map(), которая применяет функцию к любой последовательности, поддерживающей итерирование, и возвращает список результатов работы этой функции.

  • Pool.map_async() — асинхронный вариант map()

Пример использования пула процессов:

import time  
from multiprocessing.pool import Pool  
  
def wait_and_return(x):  
    time.sleep(1)  
    return x  
  
if __name__ == "__main__":  
    with Pool(4) as pool:  
        result = pool.map(wait_and_return, [1,2,3,4])  
        print(result)

Программа выведет [1, 2, 3, 4]. Но несмотря на то, что суммарное время ожидания должно равняться четырем секундам, программа отработает за одну секунду. Это связано с тем, что аргументы для функции распределяются по четырем рабочим процессам, которые выполняются параллельно.

Пул процессов имеет очередь заданий, в которую добавляются новые задания при вызове методов apply_async(), map() и др. Если упростить, то можно сказать, что из этой очереди рабочие процессы забирают задания. В качестве очереди используется multiprocessing.Queue, которая позволяет безопасно передавать данные между процессами. После выполнения задания рабочий процесс складывает результат в общую для всех воркеров очередь результатов.

Разберем метод apply_async(). Он принимает функцию с аргументами для нее и складывает в очередь заданий. Метод возвращает объект класса ApplyResult. По своей сути данный класс имеет тип Future — инкапсулированный результат выполнения некоторой операции, которая еще не завершилась. При попытке получить результат (ApplyResult.get()) процесс блокируется до момента, когда операция завершится и будет получен результат.

map() принимает функцию и список аргументов, на которых необходимо запустить функцию. Метод также возвращает футуру MapResult, результат которой можно получить через блокирующий метод MapResult.get(). Задания разбиваются на чанки, которые отправляются в очередь с заданиями. Важно уточнить, что в очереди лежат не отдельные задания, а списки из заданий. Соответственно каждый рабочий процесс из очереди вытаскивает не одно отдельное задание, а список (чанк) заданий. Для apply_async() размер чанка равен одному.

Давайте разберемся, как именно создаются рабочие процессы и распределяются задания. При инициализации пула создаётся три потока:

  • _worker_handler — в этом потоке создаются процессы и мониторится количество текущих рабочих процессов. Когда какие-то из рабочих процессов завершаются, то этот поток создаёт новые, чтобы количество рабочих процессов всегда равнялось установленному при инициализации значению.

  • _task_handler — в этом потоке обрабатываются задания из очереди, в которую кладутся переданные пользователем задания при вызове apply_async(), map() и других методов. Затем задания из этой очереди передаются в другую очередь, которую уже обрабатывают рабочие процессы. Рабочие процессы складывают результаты в одну очередь с результатами работы.

  • _result_handler — в этом потоке собираются результаты завершенных заданий из общей очереди и записываются в объект результата ApplyResult (или MapResult).

Заключение

Я надеюсь, что данная статья помогла вам получить более глубокое понимание устройства процессов и использования пакета multiprocessing в Python. Помните, что понимание основ работы с процессами открывает новые возможности для оптимизации и ускорения выполнения ваших программ. Спасибо за внимание и удачи!

Полезные материалы

© Habrahabr.ru