Тестирование параллельных процессов

    image

    Вы встречались с ошибками, которые возникают время от времени в продакшне, но никак не воспроизводятся локально? Бывает, изучаешь такой баг и вдруг понимаешь, что он проявляется только при одновременном параллельном выполнении скриптов. Изучив код, понимаешь как это исправить, чтобы такого больше не повторялось. Но на такое исправление хорошо бы написать тест…

    В статье я расскажу о своем подходе к тестированию таких ситуаций. А также приведу несколько наглядных (и наверное даже классических) примеров багов, которые удобно протестировать с помощью этого подхода. Все примеры багов живые — то, что встречается в работе.

    Забегая вперед сразу скажу, что в конце статьи будет ссылка на github, куда я выложил готовое решение, позволяющее тестировать параллельные консольные процессы легко и просто.

    Пример номер один. Параллельное добавление одного и того же


    Задача. У нас есть приложение с базой данных (PostgreSQL) и нам надо наладить импорт данных из сторонней системы. Допустим, есть таблица account (id, name) и связи идентификаторов с внешней системой в таблице account_import (id, external_id). Давайте набросаем простой механизм приема сообщений.

    При приеме сообщения будем сперва проверять — есть ли такие записи у нас в базе. Если есть, то будем обновлять имеющиеся. Если нет, то будем добавлять в базу.

    $data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}'
    
    try {
        $connection->beginTransaction();
    
        // Проверим, есть ли такая запись в базе
        $stmt = $connection->prepare("SELECT id FROM account_import 
            WHERE external_id = :external_id");
        $stmt->execute([
            ':external_id' => $data['id'],
        ]);
        $row = $stmt->fetch();
    
        usleep(100000); // 0.1 sec
    
        // Если импортируемая запись в базе есть, то обновим ее
        if ($row) {
            $stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = (
                SELECT id FROM account_import WHERE external_id = :external_id
            )");
            $stmt->execute([
                ':name' => $data['name'],
                ':external_id' => $data['id'],
            ]);
            $accountId = $row['id'];
        }
        // Иначе создадим новую запись
        else {
            $stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)");
            $stmt->execute([
                ':name' => $data['name'],
            ]);
            $accountId = $connection->lastInsertId();
    
            $stmt = $connection->prepare("INSERT INTO account_import (id, external_id) 
                VALUES (:id, :external_id)");
            $stmt->execute([
                ':id' => $accountId,
                ':external_id' => $data['id'],
            ]);
        }
    
        $connection->commit();
    }
    catch (\Throwable $e) {
        $connection->rollBack();
        throw $e;
    }

    С первого взгляда выглядит хорошо. Но если данные в нашу систему могут передаваться не строго последовательно, тут можем столкнуться с проблемой. Задержка 0.1 секунды в этом примере нам нужна, чтобы гарантированно воспроизвести проблему. Что будет, если выполнить импорт одних и тех же данных параллельно? Вероятно, вместо того, чтобы данные были добавлены, а потом обновлены, будет попытка повторной вставки данных и, как следствие, ошибка нарушения первичного ключа в account_import.

    Чтобы исправить ошибку, ее хорошо бы сперва воспроизвести. А лучше всего — написать тест, который воспроизводит ошибку. Я решил для этого запускать команды асинхронно с помощью bash и написал простой скрипт для этого, который можно использовать не только в связке с PHP.

    Идея проста — запускаем в фоне несколько экземпляров команд, потом ждем когда все они завершатся, и проверяем коды выполнения. Если среди кодов выполнения есть отличные от нуля, значит мы нашли баг. В упрощенном виде скрипт будет выглядеть так:

    # Команда, которую будем проверять
    COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import”
    
    # PID-ы запущенных фоновых процессов
    pids=()
    
    # Результаты выполнения фоновых процессов
    results=()
    
    # Ожидаемые результаты выполнения фоновых процессов (нули)
    expects=()
    
    # Запустим процессы в фоне и перенаправим вывод в stderr
    for i in $(seq 2)
    do
      eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2
    done
    
    # Ожидаем завершения каждого процесса и сохраняем результаты в $results
    for pid in "${pids[@]}"
    do
      wait $pid
      results+=($?)
      expects+=(0)
      echo -e '<<<' Process $pid finished 1>&2
    done
    
    # Сравним полученные результаты с ожидаемыми
    result=`( IFS=$', '; echo "${results[*]}" )`
    expect=`( IFS=$', '; echo "${expects[*]}" )`
    if [ "$result" != "$expect" ]
    then
      exit 1
    fi

    Полную версию скрипта выложил на github.

    На основе этой команды мы можем дописать к PHPUnit новые assert-ы. Тут уже все проще и я не буду подробно останавливаться на этом. Скажу только, что в вышеупомянутом проекте они реализованы. Чтобы их использовать достаточно подключить трейт AsyncTrait к вашему тесту.

    Напишем такой тест.

    use App\Command\Initializer;
    use Mnvx\PProcess\AsyncTrait;
    use Mnvx\PProcess\Command\Command;
    use PHPUnit\Framework\TestCase;
    use Symfony\Component\Console\Tester\CommandTester;
    
    class ImportCommandTest extends TestCase
    {
        use AsyncTrait;
    
        public function testImport()
        {
            $cli = Initializer::create();
            $command = $cli->find('app:delete');
    
            // Удаляем запись c external_id = 1, 
            // чтобы проверить случай параллельного добавления одной и той же записи
            $commandTester = new CommandTester($command);
            $commandTester->execute([
                'externalId' => 1,
            ]);
    
            $asnycCommand = new Command(
                'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', // Тестируемая команда
                dirname(__DIR__), // Каталог, из которого будет запускаться команда
                2 // Количество запускаемых экземпляров команд
            );
            // Запуск проверки
            $this->assertAsyncCommand($asnycCommand);
        }
    }

    В результате запуска теста получим такой вывод.

    $ ./vendor/bin/phpunit
    PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
    
    F                                                                   1 / 1 (100%)
    
    Time: 230 ms, Memory: 6.00MB
    
    There was 1 failure:
    
    1) ImportCommandTest::testImport
    Failed asserting that command 
    	echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2)
    executed in parallel.
    Output:
    
    
    >>> Process 18143 started
    >>> Process 18144 started
    Account 25 imported correctly
    
                                                                                   
      [Doctrine\DBAL\Exception\UniqueConstraintViolationException]                 
      An exception occurred while executing 'INSERT INTO account_import (id, exte  
      rnal_id) VALUES (:id, :external_id)' with params ["26", 1]:                  
      SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа   
      нарушает ограничение уникальности "account_import_pkey"                      
      DETAIL:  Ключ "(external_id)=(1)" уже существует.                            
                                                                                   
    -------
    
    app:import
    
    <<< Process 18143 finished 
    <<< Process 18144 finished 
    
    .
    
    /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19
    /var/www/pprocess-playground/tests/ImportCommandTest.php:30
    
    FAILURES!
    Tests: 1, Assertions: 1, Failures: 1.

    Причину мы уже обсудили. Теперь попробуем добавить принудительную блокировку параллельного выполнения фрагмента нашего скрипта (тут используется malkusch/lock).

    $mutex = new FlockMutex(fopen(__FILE__, 'r'));
    $mutex->synchronized(function () use ($connection, $data) {
        // наш код из блока try
    });
    

    Тест пройден:

    $ ./vendor/bin/phpunit
    PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
    
    .                                                                   1 / 1 (100%)
    
    Time: 361 ms, Memory: 6.00MB
    
    OK (1 test, 1 assertion)

    Этот и другие примеры я выложил на github, если вдруг кому-то понадобится.

    Пример номер два. Подготовка данных в таблице


    Этот пример будет немного интереснее. Допустим, у нас есть таблица пользователей users (id, name) и мы желаем хранить в таблице users_active (id) список активных в настоящий момент пользователей.

    У нас будет команда, которая каждый раз будет удалять все записи из таблицы users_acitve и добавлять туда данные заново.

    try {
        $connection->beginTransaction();
    
        $connection->prepare("DELETE FROM users_active")->execute();
    
        usleep(100000); // 0.1 sec
    
        $connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute();
    
        $connection->commit();
        $output->writeln('<info>users_active refreshed</info>');
    }
    catch (\Throwable $e) {
        $connection->rollBack();
        throw $e;
    }

    Тут только с первого взгляда все хорошо. На самом же деле при параллельном запуске снова получим ошибку.

    Напишем тест, чтобы ее воспроизвести.

    use Mnvx\PProcess\AsyncTrait;
    use Mnvx\PProcess\Command\Command;
    use PHPUnit\Framework\TestCase;
    
    class DetectActiveUsersCommandTest extends TestCase
    {
        use AsyncTrait;
    
        public function testImport()
        {
            $asnycCommand = new Command(
                './cli app:detect-active-users', // Тестируемая команда
                dirname(__DIR__), // Каталог, из которого будет запускаться команда
                2 // Количество запускаемых экземпляров команд
            );
            // Запуск проверки
            $this->assertAsyncCommand($asnycCommand);
        }
    }

    Запускаем тест и видим текст ошибки:

    $ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php
    PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
    
    F                                                                   1 / 1 (100%)
    
    Time: 287 ms, Memory: 4.00MB
    
    There was 1 failure:
    
    1) DetectActiveUsersCommandTest::testImport
    Failed asserting that command 
    	./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2)
    executed in parallel.
    Output:
    
    
    >>> Process 24717 started
    >>> Process 24718 started
    users_active refreshed
    <<< Process 24717 finished 
    
                                                                                   
      [Doctrine\DBAL\Exception\UniqueConstraintViolationException]                 
      An exception occurred while executing 'INSERT INTO users_active (id) VALUES  
       (3), (5), (6), (10)':                                                       
      SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа   
      нарушает ограничение уникальности "users_active_pkey"                        
      DETAIL:  Ключ "(id)=(3)" уже существует.                                     
    
    -------
    
    app:detect-active-users
    
    <<< Process 24718 finished 
    
    .
    
    /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19
    /var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19
    
    FAILURES!
    Tests: 1, Assertions: 1, Failures: 1.

    По тексту ошибки понятно, что снова INSERT выполняется параллельно и это приводит к нежелательным последствиям. Попробуем сделать блокировку на уровне записей — добавим строчку после старта транзакции:

    $connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute();
    

    Запускаем тест — ошибка ушла. Наш тест запускает два экземпляра процесса. Давайте увеличим в нашем тесте количество экземпляров до 3-х и посмотрим, что будет.

    $asnycCommand = new Command(
        './cli app:detect-active-users', // Тестируемая команда
        dirname(__DIR__), // Каталог, из которого будет запускаться команда
        3 // Количество запускаемых экземпляров команд
    );
    

    И снова имеем ту же ошибку. В чем дело, мы же добавили блокировку?! Немного подумав, можно догадаться, что такая блокировка поможет только если в таблице users_active есть записи. В случае же, когда работают 3 процесса одновременно, получается картина такая — первый процесс получает блокировку. Второй и третий процесс ждут завершения транзакции первого процесса. Как только транзакция будет завершена, продолжат выполняться параллельно и второй и третий процесс, что приведет к нежелательным последствиям.

    Чтобы починить, сделаем блокировку более общую. Например,

    $connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute();

    Либо вместо DELETE мы могли просто воспользоваться TRUNCATE, которая блокирует всю таблицу.

    Пример номер три. Deadlock


    Бывает, что сама по себе команда не приводит к проблемам, но одновременный вызов двух разных команд, работающих с одними и теми же ресурсами приводит к проблемам. Найти причины таких багов бывает нелегко. Но если причина найдена, то тест написать точно стоит, чтобы избежать возвращения проблемы в будущем при внесении изменений в код.

    Напишем пару таких команд. Это классический случай, когда возникает взаимная блокировка.

    Первая команда сперва обновляет запись с id=1, потом с id=2.

    try {
        $connection->beginTransaction();
    
        $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();
    
        usleep(100000); // 0.1 sec
    
        $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();
    
        $connection->commit();
        $output->writeln('<info>Completed without deadlocks</info>');
    }
    catch (\Throwable $e) {
        $connection->rollBack();
        throw $e;
    }

    Вторая команда сперва обновляет запись с id=2, потом с id=1.

    try {
        $connection->beginTransaction();
    
        $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();
    
        usleep(100000); // 0.1 sec
    
        $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();
    
        $connection->commit();
        $output->writeln('<info>Completed without deadlocks</info>');
    }
    catch (\Throwable $e) {
        $connection->rollBack();
        throw $e;
    }

    Тест будет выглядеть так.

    use Mnvx\PProcess\AsyncTrait;
    use Mnvx\PProcess\Command\CommandSet;
    use PHPUnit\Framework\TestCase;
    
    class DeadlockCommandTest extends TestCase
    {
        use AsyncTrait;
    
        public function testImport()
        {
            $asnycCommand = new CommandSet(
                [   // Тестируемые команды
                    './cli app:deadlock-one',
                    './cli app:deadlock-two'
                ],
                dirname(__DIR__), // Каталог, из которого будет запускаться команда
                1 // Количество запускаемых экземпляров команд
            );
            // Запуск проверки
            $this->assertAsyncCommands($asnycCommand);
        }
    }

    В результате запуска теста увидим причину ошибки:

    $ ./vendor/bin/phpunit tests/DeadlockCommandTest.php
    PHPUnit 6.1.1 by Sebastian Bergmann and contributors.
    
    F                                                                   1 / 1 (100%)
    
    Time: 1.19 seconds, Memory: 4.00MB
    
    There was 1 failure:
    
    1) DeadlockCommandTest::testImport
    Failed asserting that commands 
    	./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1)
    executed in parallel.
    Output:
    
    
    >>> Process 5481 started: ./cli app:deadlock-one
    >>> Process 5481 started: ./cli app:deadlock-two
    
                                                                                   
      [Doctrine\DBAL\Exception\DriverException]                                    
      An exception occurred while executing 'UPDATE deadlock SET value = value +   
      1 WHERE id = 1':                                                             
      SQLSTATE[40P01]: Deadlock detected: 7 ОШИБКА:  обнаружена взаимоблокировка   
      DETAIL:  Процесс 5498 ожидает в режиме ShareLock блокировку "транзакция 294  
      738"; заблокирован процессом 5499.                                           
      Процесс 5499 ожидает в режиме ShareLock блокировку "транзакция 294737"; заб  
      локирован процессом 5498.                                                    
      HINT:  Подробности запроса смотрите в протоколе сервера.                     
      CONTEXT:  при изменении кортежа (0,48) в отношении "deadlock"                
    
    -------
    
    app:deadlock-two
    
    Completed without deadlocks
    <<< Process 5481 finished 
    <<< Process 5484 finished 
    
    .
    
    /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39
    /var/www/pprocess-playground/tests/DeadlockCommandTest.php:22
    
    FAILURES!
    Tests: 1, Assertions: 1, Failures: 1.

    Проблема лечится добавлением блокировки по аналогии с первым примером. Либо пересмотром структуры базы или алгоритма работы с данными.

    Резюмируем


    При параллельном исполнении кода могут возникать неожиданные ситуации, при исправлении которых полезно написать тесты. Мы рассмотрели несколько таких ситуаций и написали тесты, воспользовавшись pprocess.
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 11
    • –5
      вместо того чтобы писать какую-то муть на пхп, все ваши проблемы с параллельными процессами легко лечатся перенесением логики работы с бд в хранимые процедуры
      • +3

        Хранимые процедуры тоже можно вызывать параллельно, там тоже случаются дедлоки и прочие неожиданности, только их отладить бывает сложнее. А если СУБД MySQL, то нужна особая эрудиция, чтобы найти причину ошибки в хранимой процедуре.

        • +4

          Плохой совет, код в хранимке не продебажить, чтобы его обновлять и поддерживать нужны танцы, версионирования нормального нет. А проблема не уйдёт, внутри хранимок тоже надо организовывать синхронизацию потоков.

        • 0
          А почему бы в первом примере не написать SELECT FOR UPDATE. Разве это не исключит ошибку?
          • 0

            Если данных еще нет в базе, то FOR UPDATE не залочит запись, так как ее еще нет. Поэтому ошибка останется.

          • +1

            В postgresql есть с недавних пор INSERT ... ON CONFLICT DO, который хорошо решает вышеописанные проблемы.
            Тем не менее, спасибо за рассказанный способ тестирования.

            • 0
              В MySQL сто лет как есть INSERT… ON DUPLICATE KEY UPDATE, но суть статьи не в этом.
              • 0
                в mysql есть INSERT ... ON DUPLICATE KEY IGNORE|UPDATE но статья как я понимаю не об этом
                • 0
                  Это ж как надо не любить читать комментарии, а сразу отвечать
                  • 0
                    Проблема в одобрении комментариев: Grogina мог написать комментарий раньше sumanai, но вот при одобрении комментария меняется дата на время одобрения.
                    Разработчики явно знают про этот баг, но исправлять не спешат.
              • 0

                А почему не писать сразу Thread safe, что б потом не отлавливать racecondition?

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.