Многопоточные вычисления в PHP: pthreads

Недавно я попробовал pthreads и был приятно удивлен — это расширение, которое добавляет в PHP возможность работать с несколькими самыми настоящими потоками. Никакой эмуляции, никакой магии, никаких фейков — все по-настоящему.

8ajqfk.gif

Я рассматриваю такую задачу. Есть пул заданий, которые надо побыстрее выполнить. В PHP есть и другие инструменты для решения этой задачи, тут они не упоминаются, статья именно про pthreads.

Стоит отметить, что автор расширения, Joe Watkins, в своих статьях предупреждает, что многопоточность — это всегда не просто и надо быть к этому готовым.

Кто не испугался, идем далее.


Что такое pthreads

Pthreads — это объектно-ориентированное API, которое дает удобный способ для организации многопоточных вычислений в PHP. API включает в себя все инструменты, необходимые для создания многопоточных приложений. PHP-приложения могут создавать, читать, писать, исполнять и синхронизировать потоки с помощью объектов классов Threads, Workers и Threaded.


Что внутри pthreads

Иерархия основных классов, которые мы только что упомянули, представлена на диаграмме.
h_1463432662_1438062_84b64df97a.png

Threaded — основа pthreads, дает возможность параллельного запуска кода. Предоставляет методы для синхронизации и другие полезные методы.

Thread. Можно создать поток, отнаследовавшись от Thread и реализовав метод run (). Метод run () начинает выполняться, причем в отдельном потоке, в момент, когда вызывается метод start (). Это можно инициировать только из контекста, который создает поток. Объединить потоки можно тоже только в этом-же контексте.

Worker. Персистентное состояние, которое в большинстве случаев используется разными потоками. Доступно, пока объект находится в области видимости или до принудительного вызова shutdown ().

Помимо этих классов есть еще класс Pool. Pool — пул (контейнер) Worker-ов можно использовать для распределения Threaded объектов по Worker-ам. Pool — наиболее простой и эффективный способ организовать несколько потоков.

Не будем сильно грустить над теорией, а сразу попробуем все это на примере.


Пример

Можно решать разные задачи в несколько потоков. Мне было интересно решить одну конкретную и как мне кажется весьма типовую задачу. Напомню ее еще раз. Есть пул заданий, их надо побыстрее выполнить.

Так давайте приступим. Для этого создадим провайдер данных MyDataProvider (Threaded), он будет один и общий для всех потоков.

/**
 * Провайдер данных для потоков
 */
class MyDataProvider extends Threaded
{
    /**
     * @var int Сколько элементов в нашей воображаемой БД
     */
    private $total = 2000000;

    /**
     * @var int Сколько элементов было обработано
     */
    private $processed = 0;

    /**
     * Переходим к следующему элементу и возвращаем его
     * 
     * @return mixed
     */
    public function getNext()
    {
        if ($this->processed === $this->total) {
            return null;
        }

        $this->processed++;

        return $this->processed;
    }
}

Для каждого потока у нас будет MyWorker (Worker), где будет храниться ссылка на провайдер.

/**
 * MyWorker тут используется, чтобы расшарить провайдер между экземплярами MyWork.
 */
class MyWorker extends Worker
{
    /**
     * @var MyDataProvider
     */
    private $provider;

    /**
     * @param MyDataProvider $provider
     */
    public function __construct(MyDataProvider $provider)
    {
        $this->provider = $provider;
    }

    /**
     * Вызывается при отправке в Pool.
     */
    public function run()
    {
        // В этом примере нам тут делать ничего не надо
    }

    /**
     * Возвращает провайдера
     * 
     * @return MyDataProvider
     */
    public function getProvider()
    {
        return $this->provider;
    }
}

Сама обработка каждой задачи пула, (пусть это будет некая ресурсоемкая операция), наше узкое горлышко, ради которого мы и затеяли многопоточность, будет в MyWork (Threaded).

/**
 * MyWork это задача, которая может выполняться параллельно
 */
class MyWork extends Threaded
{

    public function run()
    {
        do {
            $value = null;

            $provider = $this->worker->getProvider();

            // Синхронизируем получение данных
            $provider->synchronized(function($provider) use (&$value) {
               $value = $provider->getNext();
            }, $provider);

            if ($value === null) {
                continue;
            }

            // Некая ресурсоемкая операция
            $count = 100;
            for ($j = 1; $j <= $count; $j++) {
                sqrt($j+$value) + sin($value/$j) + cos($value);
            }
        }
        while ($value !== null);
    }

}

Обратите внимание, что данные из провайдера забираем в synchronized(). Иначе есть вероятность часть данных обработать более 1 раза, или пропустить часть данных.
Теперь заставим все это работать с помощью Pool.

require_once 'MyWorker.php';
require_once 'MyWork.php';
require_once 'MyDataProvider.php';

$threads = 8;

// Создадим провайдер. Этот сервис может например читать некие данные
// из файла или из БД
$provider = new MyDataProvider();

// Создадим пул воркеров
$pool = new Pool($threads, 'MyWorker', [$provider]);

$start = microtime(true);

// В нашем случае потоки сбалансированы. 
// Поэтому тут хорошо создать столько потоков, сколько процессов в нашем пуле.
$workers = $threads;
for ($i = 0; $i < $workers; $i++) {
    $pool->submit(new MyWork());
}

$pool->shutdown();

printf("Done for %.2f seconds" . PHP_EOL, microtime(true) - $start);

Получается довольно элегантно на мой взгляд. Этот пример я выложил на гитхаб.

Вот и все! Ну почти все. На самом деле есть то, что может огорчить пытливого читателя. Все это не работает на стандартном PHP, скомпилированным с опциями по умолчанию. Чтобы насладиться многопоточностью, надо, чтобы в вашем PHP был включен ZTS (Zend Thread Safety).


Настройка PHP

В документации сказано, что PHP должен быть скомпилирован с опцией --enable-maintainer-zts. Я не пробовал сам компилировать, вместо этого нашел пакет для Debian, который и установил себе.

sudo add-apt-repository ppa:ondrej/php-zts
sudo apt update
sudo apt-get install php7.0-zts php7.0-zts-dev

Таким образом у меня остался прежний PHP, который запускается из консоли обычным образом, с помощью команды php. Соответственно, веб сервер использует его-же. И появился еще один PHP, который можно запускать из консоли через php7.0-zts.

После этого можно ставить расширение pthreads.

git clone https://github.com/krakjoe/pthreads.git
./configure
make -j8
sudo make install
echo "extension=pthreads.so" > /etc/pthreads.ini
sudo cp pthreads.ini /etc/php/7.0-zts/cli/conf.d/pthreads.ini

Вот теперь все. Ну… почти все. Представьте, что вы написали мультипоточный код, а PHP на машине у коллеги не настроен соответствующим образом? Конфуз, не правда ли? Но выход есть.


pthreads-polyfill

Тут снова спасибо Joe Watkins за пакет pthreads-polyfill. Суть решения такова: в этом пакете содержатся те-же классы, что и в расширении pthreads, они позволяют выполниться вашему коду, даже если не установлено расширение pthreads. Просто код будет выполнен в один поток.
Чтобы это заработало, вы просто подключаете через composer этот пакет и больше ни о чем не думаете. Там происходит проверка, установлено ли расширение. Если расширение установлено, то на этом работа polyfill заканчивается. Иначе подключаются классы-«заглушки», чтобы код работал хотя бы в 1 поток.


Проверим

Давайте теперь посмотрим, действительно ли обработка происходит в несколько потоков и оценим выигрыш от использования этого подхода.
Я буду менять значение $threads из примера выше и смотреть, что получается.

Информация о процессоре, на котором запускал тесты

$ lscpu
CPU(s):                8
Потоков на ядро:       2
Ядер на сокет:         4
Model name:            Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz

Посмотрим диаграмму загрузки ядер процессора. Тут все соответствует ожиданиям.

$threads = 1

$threads = 1

$threads = 2

$threads = 2

$threads = 4

$threads = 4

$threads = 8

$threads = 8

А теперь самое главное, ради чего все это. Сравним время выполнения.


$threads Примечание Время выполнения, секунд
PHP без ZTS
1 без pthreads, без polyfill 265.05
1 polyfill 298.26
PHP с ZTS
1 без pthreads, без polyfill 37.65
1 68.58
2 26.18
3 16.87
4 12.96
5 12.57
6 12.07
7 11.78
8 11.62

Из первых двух строк видно, что при использовании polyfill мы потеряли примерно 13% производительности в этом примере, это относительно линейного кода на совсем простом PHP «без всего».

Далее, PHP с ZTS. Не обращайте внимание на такую большую разницу во времени выполнения в сравнении с PHP без ZTS (37.65 против 265.05 секунд), я не пытался привести к общему знаменателю настройки PHP. В случае без ZTS у меня включен XDebug например.

Как видно, при использовании 2-х потоков скорость выполнения программы примерно в 1.5 раза выше, чем в случае с линейным кодом. При использовании 4-х потоков — в 3 раза.

Можно обратить внимание, что хоть процессор и 8-ядерный, время выполнения программы почти не менялось, если использовалось более 4 потоков. Похоже, это связано с тем, что физических ядра у моего процессора 4. Для наглядности изобразил табличку в виде диаграммы.

h_1463351999_2264822_d5568fd65e.png


Резюме

В PHP возможна вполне элегантная работа с многопоточностью с использованием расширения pthreads. Это дает ощутимый прирост производительности.

© Habrahabr.ru