[Из песочницы] Управление действиями процессов. Не превышение лимита RPS (QPS) API
Структурно-функциональная схема модуля
Хочу рассказать о разработанном и используемом в продакшне модуле Publisher Pulsar (github), который позволяет синхронизировать действия процессов.
Например, есть множество (десятки или сотни) процессов, независимо друг от друга обращающихся к API Google Analytics с одного IP.
При этом, GA установлен лимит в 10 queries per second с одного IP.
Если не регулировать обращения к API, то постоянно будет возвращаться ошибка превышения лимита некоторыми процессами, и либо они не смогут выполнить свою задачу без данных API, либо будут снова и снова пытаться получить данные в цикле, создавая проблему для других процессов, увеличивая количество ошибок. То есть будет хаос и отсутствие прогнозируемости в части процента корректно получаемых данных и процента ошибок при обращениях к API.
И постольку возникает задача избежать превышения лимита RPS (QPS), чтобы все процессы могли корректно получать данные.
Данный модуль как раз справляется с этим (возможно существуют другие реализации сходной логики для PHP, но они не были найдены).
Система призвана функционировать подобно пульсару — совершать регулярные («пульсирующие») рассылки подписчикам.
Общую структуру действий по использованию модуля можно описать так:
1. Указать параметры и запустить Пульсар как демон.
2. Настроить код процесса (Сервиса), обращающегося к API (прим. — выполняющего любое действие, которое необходимо синхронизировать), для коннекта к Пульсару, чтобы прежде выполнения действия (например, совершения запроса к API) процесс обращался бы к Пульсару и ждал разрешения на выполнение действия. И только после получения разрешения выполнял бы его.
В результате Пульсар согласно настройкам одновременно разрешает быть подписчиками только [например] 10 процессам (которые вышли из FIFO стека; т.е. 10-ти разрешили стать подписчиками, а остальные N находятся в ZMQ очереди).
И после того, как необходимое количество процессов стало подписчиками, им высылается разрешение, после которого они могут совершить свое действие (например, обратиться к API).
Таким образом, лимит будет соблюдаться независимо от количества параллельно работающих процессов (в пределах возможностей стека ZMQ).
3. После этого подписчик (исполнитель) должен послать в Пульсар сообщение о выполненном действии — присутствуют ли ошибки или все в порядке.
Т.к. если при выполнении действия присутствуют ошибки, связанные с количеством одновременно выполненных действий, то Пульсар может скорректировать свое поведение — временно, до нормализации ситуации (исчезновения ошибок) уменьшить число подписчиков, увеличить интервал между публикациями (разрешениями действий), или даже на время прекратить работу (в случае ошибки, требующей перерыва в действиях; например, превышения суточного лимита 403 DailyLimitExceeded).
1) Настройка и запуск Пульсара:
$pulsar = new \React\PublisherPulsar\Pulsar();
$publisherPulsarDto = new \React\PublisherPulsar\Inventory\PublisherPulsarDto();
$publisherPulsarDto->setPulsationIterationPeriod(1); // количество секунд между публикациями (в результате размер будет не меньшим, чем указанный в этом параметре; и может быть большим при некоторых условиях)
$publisherPulsarDto->setSubscribersPerIteration(10); // количество подписчиков, которым высылается разрешение на действие (в т.ч. одновременное; и это одновременность или не одновременность зависит уже от кода процесса-исполнителя/подписчика)
$publisherPulsarDto->setModuleName('react:pulsar-ga'); //произвольное имя
$publisherPulsarDto->setReplyStackCommandName('php artisan react:pulsar-reply-stack'); // Вызов субсидиарного скрипта, выполняющего роль стека для исполнителей. Код этого скрипта не требует настроек, он приведен чуть ниже. В данном случае указан путь вызова консольной команды Laravel
$publisherPulsarDto->setPerformerContainerActionMaxExecutionTime(7); // количество секунд ожидания результирующих сообщений от исполнителей для возможной коррекции поведения
$publisherPulsarDto->setLogger(\Log::getMonolog()); // чтобы использовать имеющиеся StreamHandlers. Если не сделать set, то создаст новый Logger с выводом информации в STDOUT
$publisherPulsarDto->setMaxWaitReplyStackResult(7); // количество секунд ожидания подключения нужного количества подписчиков, указанного в свойстве subscribersPerIteration выше. Если за это время нужное количество не подключится к Стеку, то Пульсар запустит процесс имитации подключения исполнителей, чтобы добрать нужное количество в виде "фантомов" и продолжить работу
$pulsarSocketsParams = new \React\PublisherPulsar\Inventory\PulsarSocketsParamsDto();
//могут быть любые свободные порты
$pulsarSocketsParams->setReplyToReplyStackSocketAddress('tcp://127.0.0.1:6261');
$pulsarSocketsParams->setPushToReplyStackSocketAddress('tcp://127.0.0.1:6262');
$pulsarSocketsParams->setPublishSocketAddress('tcp://127.0.0.1:6263');
$pulsarSocketsParams->setPullSocketAddress('tcp://127.0.0.1:6264');
$pulsarSocketsParams->setReplyStackSocketAddress('tcp://127.0.0.1:6265');
$publisherPulsarDto->setPulsarSocketsParams($pulsarSocketsParams);
$pulsar->setPublisherPulsarDto($publisherPulsarDto);
$pulsar->manage();
Код скрипта ReplyStack:
$replyStack = new \React\PublisherPulsar\ReplyStack();
$replyStack->startCommunication();
Note: важно, чтобы Пульсар был запущен раньше процессов, подключающихся к нему, иначе процессы будут стучаться в пустоту по адресам, которые еще не связаны с Пульсаром, и просто зависнут в ожидании ответа, который не придет никогда.
2) Настройка кода исполнителя (подписчика):
Включаем объект Performer пакета модуля в код (в виде свойства, если код процесса на ООП) процесса:
$performerDto = new \React\PublisherPulsar\Inventory\PerformerDto();
$performerDto->setModuleName("PerformerCommand"); // для понимания в логах какой тип исполнителей выполняет действие
$performer = new \React\PublisherPulsar\Performer($performerDto);
$performerSocketParams = new \React\PublisherPulsar\Inventory\PerformerSocketsParamsDto();
//эти адреса должны соответствовать адресам Пульсара в рамках ZMQ-парности (Publish/Subscribe, Push/Pull, Request/Reply)
$performerSocketParams->setPublisherPulsarSocketAddress('tcp://127.0.0.1:6273');
$performerSocketParams->setPushPulsarSocketAddress('tcp://127.0.0.1:6274');
$performerSocketParams->setRequestPulsarRsSocketAddress('tcp://127.0.0.1:6275');
$performer->setSocketsParams($performerSocketParams);
$this->zmqPerformer = $performer;
И далее в необходимом месте, перед вызовом целевого действия, требующего синхронизации/координации, вызываем метод, отвечающий за получение разрешения от Пульсара:
$this->zmqPerformer->connectToPulsarAndWaitPermissionToAct();
3) После выполнения целевого действия необходимо отправить результирующее сообщение о том, возникли ли ошибки. Например в таком виде:
//имеет место ошибка превышения 10 QPS
if (strpos($e->getMessage(), GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED) !== false) {
$actionResultWithError = new ActionResultingPushDto();
$actionResultWithError->setActionCompleteCorrectly(false);
$actionResultWithError->setSlowDown(true);
$actionResultWithError->setErrorMessage($e->getMessage());
$actionResultWithError->setErrorReason(GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED);
$this->zmqPerformer->pushActionResultInfo($actionResultWithError);
// превышен дневной лимит и необходимо на время уснуть
} elseif (strpos($e->getMessage(), GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED) !== false) {
$actionResultWithError = new ActionResultingPushDto();
$actionResultWithError->setActionCompleteCorrectly(false);
$sleepForPeriod = new ErrorSleepForPeriod();
$sleepForPeriod->setSleepPeriod((60 * 60 * 1000000)); //на час, в микросекундах
$actionResultWithError->setSleepForPeriod($sleepForPeriod);
$actionResultWithError->setErrorMessage($e->getMessage());
$actionResultWithError->setErrorReason(GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED);
$this->zmqPerformer->pushActionResultInfo($actionResultWithError);
//всё корректно
} else {
$this->zmqPerformer->pushActionResultInfoWithoutPulsarCorrectionBehavior();
}
Это актуально также для случая, если к Пульсару подключена только часть процессов, а другая работает в произвольном хаотичном виде. И такая механика позволит снижать количество ошибок, создаваемых совместной активностью упорядоченных и неупорядоченных процессов.
***
При этом, как уже было сказано, модуль можно использовать для любой периодической передачи информации процессам. Для этого достаточно при инициализации засетить свой класс, отнаследованный от PublisherToSubscribersDto, содержащий логику управления процессами, которые его получат.
То есть при инициализации демона в пункте 1) добавить:
$publisherToSubscribersDto = new YourNameExtendedByPublisherToSubscribersDto();
$publisherToSubscribersDto->setYourProperty();
$publisherPulsarDto->setPublisherToSubscribersDto($publisherToSubscribersDto);
И этот объект будет передаваться процессам.
***
Основная часть кода была написана на прошлой работе в компании Adventum в рамках решения коммерческих задач и публикуется с ее разрешения.