Тестирование параллельных процессов
Вы встречались с ошибками, которые возникают время от времени в продакшне, но никак не воспроизводятся локально? Бывает, изучаешь такой баг и вдруг понимаешь, что он проявляется только при одновременном параллельном выполнении скриптов. Изучив код, понимаешь как это исправить, чтобы такого больше не повторялось. Но на такое исправление хорошо бы написать тест…
В статье я расскажу о своем подходе к тестированию таких ситуаций. А также приведу несколько наглядных (и наверное даже классических) примеров багов, которые удобно протестировать с помощью этого подхода. Все примеры багов живые — то, что встречается в работе.
Забегая вперед сразу скажу, что в конце статьи будет ссылка на github, куда я выложил готовое решение, позволяющее тестировать параллельные консольные процессы легко и просто.
Пример номер один. Параллельное добавление одного и того же
Задача. У нас есть приложение с базой данных (PostgreSQL) и нам надо наладить импорт данных из сторонней системы. Допустим, есть таблица
account (id, name)
и связи идентификаторов с внешней системой в таблице account_import (id, external_id)
. Давайте набросаем простой механизм приема сообщений.При приеме сообщения будем сперва проверять — есть ли такие записи у нас в базе. Если есть, то будем обновлять имеющиеся. Если нет, то будем добавлять в базу.
$data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}'
try {
$connection->beginTransaction();
// Проверим, есть ли такая запись в базе
$stmt = $connection->prepare("SELECT id FROM account_import
WHERE external_id = :external_id");
$stmt->execute([
':external_id' => $data['id'],
]);
$row = $stmt->fetch();
usleep(100000); // 0.1 sec
// Если импортируемая запись в базе есть, то обновим ее
if ($row) {
$stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = (
SELECT id FROM account_import WHERE external_id = :external_id
)");
$stmt->execute([
':name' => $data['name'],
':external_id' => $data['id'],
]);
$accountId = $row['id'];
}
// Иначе создадим новую запись
else {
$stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)");
$stmt->execute([
':name' => $data['name'],
]);
$accountId = $connection->lastInsertId();
$stmt = $connection->prepare("INSERT INTO account_import (id, external_id)
VALUES (:id, :external_id)");
$stmt->execute([
':id' => $accountId,
':external_id' => $data['id'],
]);
}
$connection->commit();
}
catch (\Throwable $e) {
$connection->rollBack();
throw $e;
}
С первого взгляда выглядит хорошо. Но если данные в нашу систему могут передаваться не строго последовательно, тут можем столкнуться с проблемой. Задержка 0.1 секунды в этом примере нам нужна, чтобы гарантированно воспроизвести проблему. Что будет, если выполнить импорт одних и тех же данных параллельно? Вероятно, вместо того, чтобы данные были добавлены, а потом обновлены, будет попытка повторной вставки данных и, как следствие, ошибка нарушения первичного ключа в account_import.
Чтобы исправить ошибку, ее хорошо бы сперва воспроизвести. А лучше всего — написать тест, который воспроизводит ошибку. Я решил для этого запускать команды асинхронно с помощью bash и написал простой скрипт для этого, который можно использовать не только в связке с PHP.
Идея проста — запускаем в фоне несколько экземпляров команд, потом ждем когда все они завершатся, и проверяем коды выполнения. Если среди кодов выполнения есть отличные от нуля, значит мы нашли баг. В упрощенном виде скрипт будет выглядеть так:
# Команда, которую будем проверять
COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import”
# PID-ы запущенных фоновых процессов
pids=()
# Результаты выполнения фоновых процессов
results=()
# Ожидаемые результаты выполнения фоновых процессов (нули)
expects=()
# Запустим процессы в фоне и перенаправим вывод в stderr
for i in $(seq 2)
do
eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2
done
# Ожидаем завершения каждого процесса и сохраняем результаты в $results
for pid in "${pids[@]}"
do
wait $pid
results+=($?)
expects+=(0)
echo -e '<<<' Process $pid finished 1>&2
done
# Сравним полученные результаты с ожидаемыми
result=`( IFS=$', '; echo "${results[*]}" )`
expect=`( IFS=$', '; echo "${expects[*]}" )`
if [ "$result" != "$expect" ]
then
exit 1
fi
Полную версию скрипта выложил на github.
На основе этой команды мы можем дописать к PHPUnit новые assert-ы. Тут уже все проще и я не буду подробно останавливаться на этом. Скажу только, что в вышеупомянутом проекте они реализованы. Чтобы их использовать достаточно подключить трейт AsyncTrait
к вашему тесту.
Напишем такой тест.
use App\Command\Initializer;
use Mnvx\PProcess\AsyncTrait;
use Mnvx\PProcess\Command\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
class ImportCommandTest extends TestCase
{
use AsyncTrait;
public function testImport()
{
$cli = Initializer::create();
$command = $cli->find('app:delete');
// Удаляем запись c external_id = 1,
// чтобы проверить случай параллельного добавления одной и той же записи
$commandTester = new CommandTester($command);
$commandTester->execute([
'externalId' => 1,
]);
$asnycCommand = new Command(
'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', // Тестируемая команда
dirname(__DIR__), // Каталог, из которого будет запускаться команда
2 // Количество запускаемых экземпляров команд
);
// Запуск проверки
$this->assertAsyncCommand($asnycCommand);
}
}
В результате запуска теста получим такой вывод.
$ ./vendor/bin/phpunit
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
F 1 / 1 (100%)
Time: 230 ms, Memory: 6.00MB
There was 1 failure:
1) ImportCommandTest::testImport
Failed asserting that command
echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2)
executed in parallel.
Output:
>>> Process 18143 started
>>> Process 18144 started
Account 25 imported correctly
[Doctrine\DBAL\Exception\UniqueConstraintViolationException]
An exception occurred while executing 'INSERT INTO account_import (id, exte
rnal_id) VALUES (:id, :external_id)' with params ["26", 1]:
SQLSTATE[23505]: Unique violation: 7 ОШИБКА: повторяющееся значение ключа
нарушает ограничение уникальности "account_import_pkey"
DETAIL: Ключ "(external_id)=(1)" уже существует.
-------
app:import
<<< Process 18143 finished
<<< Process 18144 finished
.
/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19
/var/www/pprocess-playground/tests/ImportCommandTest.php:30
FAILURES!
Tests: 1, Assertions: 1, Failures: 1.
Причину мы уже обсудили. Теперь попробуем добавить принудительную блокировку параллельного выполнения фрагмента нашего скрипта (тут используется malkusch/lock).
$mutex = new FlockMutex(fopen(__FILE__, 'r'));
$mutex->synchronized(function () use ($connection, $data) {
// наш код из блока try
});
Тест пройден:
$ ./vendor/bin/phpunit
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
. 1 / 1 (100%)
Time: 361 ms, Memory: 6.00MB
OK (1 test, 1 assertion)
Этот и другие примеры я выложил на github, если вдруг кому-то понадобится.
Пример номер два. Подготовка данных в таблице
Этот пример будет немного интереснее. Допустим, у нас есть таблица пользователей
users (id, name)
и мы желаем хранить в таблице users_active (id)
список активных в настоящий момент пользователей.У нас будет команда, которая каждый раз будет удалять все записи из таблицы users_acitve
и добавлять туда данные заново.
try {
$connection->beginTransaction();
$connection->prepare("DELETE FROM users_active")->execute();
usleep(100000); // 0.1 sec
$connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute();
$connection->commit();
$output->writeln('users_active refreshed ');
}
catch (\Throwable $e) {
$connection->rollBack();
throw $e;
}
Тут только с первого взгляда все хорошо. На самом же деле при параллельном запуске снова получим ошибку.
Напишем тест, чтобы ее воспроизвести.
use Mnvx\PProcess\AsyncTrait;
use Mnvx\PProcess\Command\Command;
use PHPUnit\Framework\TestCase;
class DetectActiveUsersCommandTest extends TestCase
{
use AsyncTrait;
public function testImport()
{
$asnycCommand = new Command(
'./cli app:detect-active-users', // Тестируемая команда
dirname(__DIR__), // Каталог, из которого будет запускаться команда
2 // Количество запускаемых экземпляров команд
);
// Запуск проверки
$this->assertAsyncCommand($asnycCommand);
}
}
Запускаем тест и видим текст ошибки:
$ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
F 1 / 1 (100%)
Time: 287 ms, Memory: 4.00MB
There was 1 failure:
1) DetectActiveUsersCommandTest::testImport
Failed asserting that command
./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2)
executed in parallel.
Output:
>>> Process 24717 started
>>> Process 24718 started
users_active refreshed
<<< Process 24717 finished
[Doctrine\DBAL\Exception\UniqueConstraintViolationException]
An exception occurred while executing 'INSERT INTO users_active (id) VALUES
(3), (5), (6), (10)':
SQLSTATE[23505]: Unique violation: 7 ОШИБКА: повторяющееся значение ключа
нарушает ограничение уникальности "users_active_pkey"
DETAIL: Ключ "(id)=(3)" уже существует.
-------
app:detect-active-users
<<< Process 24718 finished
.
/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19
/var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19
FAILURES!
Tests: 1, Assertions: 1, Failures: 1.
По тексту ошибки понятно, что снова INSERT выполняется параллельно и это приводит к нежелательным последствиям. Попробуем сделать блокировку на уровне записей — добавим строчку после старта транзакции:
$connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute();
Запускаем тест — ошибка ушла. Наш тест запускает два экземпляра процесса. Давайте увеличим в нашем тесте количество экземпляров до 3-х и посмотрим, что будет.
$asnycCommand = new Command(
'./cli app:detect-active-users', // Тестируемая команда
dirname(__DIR__), // Каталог, из которого будет запускаться команда
3 // Количество запускаемых экземпляров команд
);
И снова имеем ту же ошибку. В чем дело, мы же добавили блокировку?! Немного подумав, можно догадаться, что такая блокировка поможет только если в таблице users_active
есть записи. В случае же, когда работают 3 процесса одновременно, получается картина такая — первый процесс получает блокировку. Второй и третий процесс ждут завершения транзакции первого процесса. Как только транзакция будет завершена, продолжат выполняться параллельно и второй и третий процесс, что приведет к нежелательным последствиям.
Чтобы починить, сделаем блокировку более общую. Например,
$connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute();
Либо вместо DELETE
мы могли просто воспользоваться TRUNCATE
, которая блокирует всю таблицу.
Пример номер три. Deadlock
Бывает, что сама по себе команда не приводит к проблемам, но одновременный вызов двух разных команд, работающих с одними и теми же ресурсами приводит к проблемам. Найти причины таких багов бывает нелегко. Но если причина найдена, то тест написать точно стоит, чтобы избежать возвращения проблемы в будущем при внесении изменений в код.
Напишем пару таких команд. Это классический случай, когда возникает взаимная блокировка.
Первая команда сперва обновляет запись с id=1, потом с id=2.
try {
$connection->beginTransaction();
$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();
usleep(100000); // 0.1 sec
$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();
$connection->commit();
$output->writeln('Completed without deadlocks ');
}
catch (\Throwable $e) {
$connection->rollBack();
throw $e;
}
Вторая команда сперва обновляет запись с id=2, потом с id=1.
try {
$connection->beginTransaction();
$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();
usleep(100000); // 0.1 sec
$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();
$connection->commit();
$output->writeln('Completed without deadlocks ');
}
catch (\Throwable $e) {
$connection->rollBack();
throw $e;
}
Тест будет выглядеть так.
use Mnvx\PProcess\AsyncTrait;
use Mnvx\PProcess\Command\CommandSet;
use PHPUnit\Framework\TestCase;
class DeadlockCommandTest extends TestCase
{
use AsyncTrait;
public function testImport()
{
$asnycCommand = new CommandSet(
[ // Тестируемые команды
'./cli app:deadlock-one',
'./cli app:deadlock-two'
],
dirname(__DIR__), // Каталог, из которого будет запускаться команда
1 // Количество запускаемых экземпляров команд
);
// Запуск проверки
$this->assertAsyncCommands($asnycCommand);
}
}
В результате запуска теста увидим причину ошибки:
$ ./vendor/bin/phpunit tests/DeadlockCommandTest.php
PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
F 1 / 1 (100%)
Time: 1.19 seconds, Memory: 4.00MB
There was 1 failure:
1) DeadlockCommandTest::testImport
Failed asserting that commands
./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1)
executed in parallel.
Output:
>>> Process 5481 started: ./cli app:deadlock-one
>>> Process 5481 started: ./cli app:deadlock-two
[Doctrine\DBAL\Exception\DriverException]
An exception occurred while executing 'UPDATE deadlock SET value = value +
1 WHERE id = 1':
SQLSTATE[40P01]: Deadlock detected: 7 ОШИБКА: обнаружена взаимоблокировка
DETAIL: Процесс 5498 ожидает в режиме ShareLock блокировку "транзакция 294
738"; заблокирован процессом 5499.
Процесс 5499 ожидает в режиме ShareLock блокировку "транзакция 294737"; заб
локирован процессом 5498.
HINT: Подробности запроса смотрите в протоколе сервера.
CONTEXT: при изменении кортежа (0,48) в отношении "deadlock"
-------
app:deadlock-two
Completed without deadlocks
<<< Process 5481 finished
<<< Process 5484 finished
.
/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39
/var/www/pprocess-playground/tests/DeadlockCommandTest.php:22
FAILURES!
Tests: 1, Assertions: 1, Failures: 1.
Проблема лечится добавлением блокировки по аналогии с первым примером. Либо пересмотром структуры базы или алгоритма работы с данными.
Резюмируем
При параллельном исполнении кода могут возникать неожиданные ситуации, при исправлении которых полезно написать тесты. Мы рассмотрели несколько таких ситуаций и написали тесты, воспользовавшись pprocess.
Комментарии (2)
25 апреля 2017 в 09:37
0↑
↓
вместо того чтобы писать какую-то муть на пхп, все ваши проблемы с параллельными процессами легко лечатся перенесением логики работы с бд в хранимые процедуры25 апреля 2017 в 09:42
0↑
↓
Хранимые процедуры тоже можно вызывать параллельно, там тоже случаются дедлоки и прочие неожиданности, только их отладить бывает сложнее. А если СУБД MySQL, то нужна особая эрудиция, чтобы найти причину ошибки в хранимой процедуре.