Тестирование параллельных процессов

image

Вы встречались с ошибками, которые возникают время от времени в продакшне, но никак не воспроизводятся локально? Бывает, изучаешь такой баг и вдруг понимаешь, что он проявляется только при одновременном параллельном выполнении скриптов. Изучив код, понимаешь как это исправить, чтобы такого больше не повторялось. Но на такое исправление хорошо бы написать тест…

В статье я расскажу о своем подходе к тестированию таких ситуаций. А также приведу несколько наглядных (и наверное даже классических) примеров багов, которые удобно протестировать с помощью этого подхода. Все примеры багов живые — то, что встречается в работе.

Забегая вперед сразу скажу, что в конце статьи будет ссылка на github, куда я выложил готовое решение, позволяющее тестировать параллельные консольные процессы легко и просто.

Пример номер один. Параллельное добавление одного и того же


Задача. У нас есть приложение с базой данных (PostgreSQL) и нам надо наладить импорт данных из сторонней системы. Допустим, есть таблица account (id, name) и связи идентификаторов с внешней системой в таблице account_import (id, external_id). Давайте набросаем простой механизм приема сообщений.

При приеме сообщения будем сперва проверять — есть ли такие записи у нас в базе. Если есть, то будем обновлять имеющиеся. Если нет, то будем добавлять в базу.

$data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}'

try {
    $connection->beginTransaction();

    // Проверим, есть ли такая запись в базе
    $stmt = $connection->prepare("SELECT id FROM account_import 
        WHERE external_id = :external_id");
    $stmt->execute([
        ':external_id' => $data['id'],
    ]);
    $row = $stmt->fetch();

    usleep(100000); // 0.1 sec

    // Если импортируемая запись в базе есть, то обновим ее
    if ($row) {
        $stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = (
            SELECT id FROM account_import WHERE external_id = :external_id
        )");
        $stmt->execute([
            ':name' => $data['name'],
            ':external_id' => $data['id'],
        ]);
        $accountId = $row['id'];
    }
    // Иначе создадим новую запись
    else {
        $stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)");
        $stmt->execute([
            ':name' => $data['name'],
        ]);
        $accountId = $connection->lastInsertId();

        $stmt = $connection->prepare("INSERT INTO account_import (id, external_id) 
            VALUES (:id, :external_id)");
        $stmt->execute([
            ':id' => $accountId,
            ':external_id' => $data['id'],
        ]);
    }

    $connection->commit();
}
catch (\Throwable $e) {
    $connection->rollBack();
    throw $e;
}

С первого взгляда выглядит хорошо. Но если данные в нашу систему могут передаваться не строго последовательно, тут можем столкнуться с проблемой. Задержка 0.1 секунды в этом примере нам нужна, чтобы гарантированно воспроизвести проблему. Что будет, если выполнить импорт одних и тех же данных параллельно? Вероятно, вместо того, чтобы данные были добавлены, а потом обновлены, будет попытка повторной вставки данных и, как следствие, ошибка нарушения первичного ключа в account_import.

Чтобы исправить ошибку, ее хорошо бы сперва воспроизвести. А лучше всего — написать тест, который воспроизводит ошибку. Я решил для этого запускать команды асинхронно с помощью bash и написал простой скрипт для этого, который можно использовать не только в связке с PHP.

Идея проста — запускаем в фоне несколько экземпляров команд, потом ждем когда все они завершатся, и проверяем коды выполнения. Если среди кодов выполнения есть отличные от нуля, значит мы нашли баг. В упрощенном виде скрипт будет выглядеть так:

# Команда, которую будем проверять
COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import”

# PID-ы запущенных фоновых процессов
pids=()

# Результаты выполнения фоновых процессов
results=()

# Ожидаемые результаты выполнения фоновых процессов (нули)
expects=()

# Запустим процессы в фоне и перенаправим вывод в stderr
for i in $(seq 2)
do
  eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2
done

# Ожидаем завершения каждого процесса и сохраняем результаты в $results
for pid in "${pids[@]}"
do
  wait $pid
  results+=($?)
  expects+=(0)
  echo -e '<<<' Process $pid finished 1>&2
done

# Сравним полученные результаты с ожидаемыми
result=`( IFS=$', '; echo "${results[*]}" )`
expect=`( IFS=$', '; echo "${expects[*]}" )`
if [ "$result" != "$expect" ]
then
  exit 1
fi

Полную версию скрипта выложил на github.

На основе этой команды мы можем дописать к PHPUnit новые assert-ы. Тут уже все проще и я не буду подробно останавливаться на этом. Скажу только, что в вышеупомянутом проекте они реализованы. Чтобы их использовать достаточно подключить трейт AsyncTrait к вашему тесту.

Напишем такой тест.

use App\Command\Initializer;
use Mnvx\PProcess\AsyncTrait;
use Mnvx\PProcess\Command\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;

class ImportCommandTest extends TestCase
{
    use AsyncTrait;

    public function testImport()
    {
        $cli = Initializer::create();
        $command = $cli->find('app:delete');

        // Удаляем запись c external_id = 1, 
        // чтобы проверить случай параллельного добавления одной и той же записи
        $commandTester = new CommandTester($command);
        $commandTester->execute([
            'externalId' => 1,
        ]);

        $asnycCommand = new Command(
            'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', // Тестируемая команда
            dirname(__DIR__), // Каталог, из которого будет запускаться команда
            2 // Количество запускаемых экземпляров команд
        );
        // Запуск проверки
        $this->assertAsyncCommand($asnycCommand);
    }
}

В результате запуска теста получим такой вывод.

$ ./vendor/bin/phpunit
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.

F                                                                   1 / 1 (100%)

Time: 230 ms, Memory: 6.00MB

There was 1 failure:

1) ImportCommandTest::testImport
Failed asserting that command 
	echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2)
executed in parallel.
Output:


>>> Process 18143 started
>>> Process 18144 started
Account 25 imported correctly

                                                                               
  [Doctrine\DBAL\Exception\UniqueConstraintViolationException]                 
  An exception occurred while executing 'INSERT INTO account_import (id, exte  
  rnal_id) VALUES (:id, :external_id)' with params ["26", 1]:                  
  SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа   
  нарушает ограничение уникальности "account_import_pkey"                      
  DETAIL:  Ключ "(external_id)=(1)" уже существует.                            
                                                                               
-------

app:import

<<< Process 18143 finished 
<<< Process 18144 finished 

.

/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19
/var/www/pprocess-playground/tests/ImportCommandTest.php:30

FAILURES!
Tests: 1, Assertions: 1, Failures: 1.

Причину мы уже обсудили. Теперь попробуем добавить принудительную блокировку параллельного выполнения фрагмента нашего скрипта (тут используется malkusch/lock).

$mutex = new FlockMutex(fopen(__FILE__, 'r'));
$mutex->synchronized(function () use ($connection, $data) {
    // наш код из блока try
});

Тест пройден:
$ ./vendor/bin/phpunit
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.

.                                                                   1 / 1 (100%)

Time: 361 ms, Memory: 6.00MB

OK (1 test, 1 assertion)

Этот и другие примеры я выложил на github, если вдруг кому-то понадобится.

Пример номер два. Подготовка данных в таблице


Этот пример будет немного интереснее. Допустим, у нас есть таблица пользователей users (id, name) и мы желаем хранить в таблице users_active (id) список активных в настоящий момент пользователей.

У нас будет команда, которая каждый раз будет удалять все записи из таблицы users_acitve и добавлять туда данные заново.

try {
    $connection->beginTransaction();

    $connection->prepare("DELETE FROM users_active")->execute();

    usleep(100000); // 0.1 sec

    $connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute();

    $connection->commit();
    $output->writeln('users_active refreshed');
}
catch (\Throwable $e) {
    $connection->rollBack();
    throw $e;
}

Тут только с первого взгляда все хорошо. На самом же деле при параллельном запуске снова получим ошибку.

Напишем тест, чтобы ее воспроизвести.

use Mnvx\PProcess\AsyncTrait;
use Mnvx\PProcess\Command\Command;
use PHPUnit\Framework\TestCase;

class DetectActiveUsersCommandTest extends TestCase
{
    use AsyncTrait;

    public function testImport()
    {
        $asnycCommand = new Command(
            './cli app:detect-active-users', // Тестируемая команда
            dirname(__DIR__), // Каталог, из которого будет запускаться команда
            2 // Количество запускаемых экземпляров команд
        );
        // Запуск проверки
        $this->assertAsyncCommand($asnycCommand);
    }
}

Запускаем тест и видим текст ошибки:

$ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.

F                                                                   1 / 1 (100%)

Time: 287 ms, Memory: 4.00MB

There was 1 failure:

1) DetectActiveUsersCommandTest::testImport
Failed asserting that command 
	./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2)
executed in parallel.
Output:


>>> Process 24717 started
>>> Process 24718 started
users_active refreshed
<<< Process 24717 finished 

                                                                               
  [Doctrine\DBAL\Exception\UniqueConstraintViolationException]                 
  An exception occurred while executing 'INSERT INTO users_active (id) VALUES  
   (3), (5), (6), (10)':                                                       
  SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа   
  нарушает ограничение уникальности "users_active_pkey"                        
  DETAIL:  Ключ "(id)=(3)" уже существует.                                     

-------

app:detect-active-users

<<< Process 24718 finished 

.

/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19
/var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19

FAILURES!
Tests: 1, Assertions: 1, Failures: 1.

По тексту ошибки понятно, что снова INSERT выполняется параллельно и это приводит к нежелательным последствиям. Попробуем сделать блокировку на уровне записей — добавим строчку после старта транзакции:

$connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute();

Запускаем тест — ошибка ушла. Наш тест запускает два экземпляра процесса. Давайте увеличим в нашем тесте количество экземпляров до 3-х и посмотрим, что будет.

$asnycCommand = new Command(
    './cli app:detect-active-users', // Тестируемая команда
    dirname(__DIR__), // Каталог, из которого будет запускаться команда
    3 // Количество запускаемых экземпляров команд
);

И снова имеем ту же ошибку. В чем дело, мы же добавили блокировку?! Немного подумав, можно догадаться, что такая блокировка поможет только если в таблице users_active есть записи. В случае же, когда работают 3 процесса одновременно, получается картина такая — первый процесс получает блокировку. Второй и третий процесс ждут завершения транзакции первого процесса. Как только транзакция будет завершена, продолжат выполняться параллельно и второй и третий процесс, что приведет к нежелательным последствиям.

Чтобы починить, сделаем блокировку более общую. Например,

$connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute();

Либо вместо DELETE мы могли просто воспользоваться TRUNCATE, которая блокирует всю таблицу.

Пример номер три. Deadlock


Бывает, что сама по себе команда не приводит к проблемам, но одновременный вызов двух разных команд, работающих с одними и теми же ресурсами приводит к проблемам. Найти причины таких багов бывает нелегко. Но если причина найдена, то тест написать точно стоит, чтобы избежать возвращения проблемы в будущем при внесении изменений в код.

Напишем пару таких команд. Это классический случай, когда возникает взаимная блокировка.

Первая команда сперва обновляет запись с id=1, потом с id=2.

try {
    $connection->beginTransaction();

    $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();

    usleep(100000); // 0.1 sec

    $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();

    $connection->commit();
    $output->writeln('Completed without deadlocks');
}
catch (\Throwable $e) {
    $connection->rollBack();
    throw $e;
}

Вторая команда сперва обновляет запись с id=2, потом с id=1.

try {
    $connection->beginTransaction();

    $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();

    usleep(100000); // 0.1 sec

    $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();

    $connection->commit();
    $output->writeln('Completed without deadlocks');
}
catch (\Throwable $e) {
    $connection->rollBack();
    throw $e;
}

Тест будет выглядеть так.

use Mnvx\PProcess\AsyncTrait;
use Mnvx\PProcess\Command\CommandSet;
use PHPUnit\Framework\TestCase;

class DeadlockCommandTest extends TestCase
{
    use AsyncTrait;

    public function testImport()
    {
        $asnycCommand = new CommandSet(
            [   // Тестируемые команды
                './cli app:deadlock-one',
                './cli app:deadlock-two'
            ],
            dirname(__DIR__), // Каталог, из которого будет запускаться команда
            1 // Количество запускаемых экземпляров команд
        );
        // Запуск проверки
        $this->assertAsyncCommands($asnycCommand);
    }
}

В результате запуска теста увидим причину ошибки:

$ ./vendor/bin/phpunit tests/DeadlockCommandTest.php
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.

F                                                                   1 / 1 (100%)

Time: 1.19 seconds, Memory: 4.00MB

There was 1 failure:

1) DeadlockCommandTest::testImport
Failed asserting that commands 
	./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1)
executed in parallel.
Output:


>>> Process 5481 started: ./cli app:deadlock-one
>>> Process 5481 started: ./cli app:deadlock-two

                                                                               
  [Doctrine\DBAL\Exception\DriverException]                                    
  An exception occurred while executing 'UPDATE deadlock SET value = value +   
  1 WHERE id = 1':                                                             
  SQLSTATE[40P01]: Deadlock detected: 7 ОШИБКА:  обнаружена взаимоблокировка   
  DETAIL:  Процесс 5498 ожидает в режиме ShareLock блокировку "транзакция 294  
  738"; заблокирован процессом 5499.                                           
  Процесс 5499 ожидает в режиме ShareLock блокировку "транзакция 294737"; заб  
  локирован процессом 5498.                                                    
  HINT:  Подробности запроса смотрите в протоколе сервера.                     
  CONTEXT:  при изменении кортежа (0,48) в отношении "deadlock"                

-------

app:deadlock-two

Completed without deadlocks
<<< Process 5481 finished 
<<< Process 5484 finished 

.

/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39
/var/www/pprocess-playground/tests/DeadlockCommandTest.php:22

FAILURES!
Tests: 1, Assertions: 1, Failures: 1.

Проблема лечится добавлением блокировки по аналогии с первым примером. Либо пересмотром структуры базы или алгоритма работы с данными.

Резюмируем


При параллельном исполнении кода могут возникать неожиданные ситуации, при исправлении которых полезно написать тесты. Мы рассмотрели несколько таких ситуаций и написали тесты, воспользовавшись pprocess.

Комментарии (2)

  • 25 апреля 2017 в 09:37

    0

    вместо того чтобы писать какую-то муть на пхп, все ваши проблемы с параллельными процессами легко лечатся перенесением логики работы с бд в хранимые процедуры
    • 25 апреля 2017 в 09:42

      0

      Хранимые процедуры тоже можно вызывать параллельно, там тоже случаются дедлоки и прочие неожиданности, только их отладить бывает сложнее. А если СУБД MySQL, то нужна особая эрудиция, чтобы найти причину ошибки в хранимой процедуре.

© Habrahabr.ru