[Из песочницы] Простая система демонов для Yii2

В данной статье постараюсь раскрыть основные нюансы реализации системы демонов для PHP и научить консольные команды Yii2 демонизироваться.

Последние 3 года я занимаюсь разработкой и развитием достаточно большого корпоративного портала для одной группы компаний. Я, какие и многие, столкнулся с проблемой, когда решение задачи, которую требует бизнес, не укладывается ни в какие таймауты. Сделайте отчетик в excel на 300 тыс. строк, отправьте рассылку на 1500 писем и так далее. Естественно, такие задачи должны решаться фоновыми заданиями, демонами и crontab-ами. В рамках статьи я не буду приводить сравнение кронов и демонов, мы для решения подобных задач выбрали демонов. При этом важным требованием для нас стала возможность иметь доступ ко всему, что уже написано для бэкенда, соответственно, демоны должны быть продолжением фрейворка Yii2. По этой же причине нам не подошли уже готовые решения типа phpDaemon.

Под катом готовое решение для реализации демонов на Yii2, которое у меня вышло.

Тема демонов на PHP поднимается с завидной регулярностью (раз, два, три, а ребята из badoo даже перезапускают их без потери соединений). Возможно мой велосипед быстрый способ запустить демоны на популярном фреймворке будет полезен.

Немного основ


Для того, чтобы процесс стал демоном, нужно:
  1. Отвязать скрипт от консоли и стандартных потоков ввода-вывода;
  2. Завернуть исполнения основного кода в бесконечный цикл;
  3. Реализовать механизмы контроля над процессом.

Отвязываемся от консоли
Для начала закрываем стандартные потоки STDIN, STOUT, STDERR. Но PHP без них не может, поэтому первый открытый поток он сделает стандартным, так что откроем их в /dev/null.
if (is_resource(STDIN)) {
   fclose(STDIN);
   $stdIn = fopen('/dev/null', 'r');
}
if (is_resource(STDOUT)) {
    fclose(STDOUT);
    $stdOut = fopen('/dev/null', 'ab');
}
if (is_resource(STDERR)) {
    fclose(STDERR);
    $stdErr = fopen('/dev/null', 'ab');
}

Далее форкаем процесс и делаем форк основным процессом. Процесс донор — завершаем.
$pid = pcntl_fork();
if ($pid == -1) {
   $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
} elseif ($pid) {
   $this->halt(self::EXIT_CODE_NORMAL);
} else {
   posix_setsid();
}

Бесконечный цикл и контроль
Я думаю, с циклом все понято. А вот необходимые механизмы контроля стоит рассмотреть подробнее.Фиксация уже запущенных процессов
Тут все просто — после запуска демон кладет в файл со своим названием свой PID, а при завершении своей работы этот файл сносит.Обработка POSIX сигналов
Демон должен корректно обрабатывать сигналы от операционной системы, т.е. при получении сигнала SIGTERM должен плавно завершать свою работу. Достигается это несколькими вещами: первое, определяем функцию, которая будет обрабатывать полученные сигналы:
pcntl_signal(SIGTERM, ['MyClassName', 'mySignalHandlerFunction']);

Второе, в функцию обработки сигналов ставим присвоение некоторому статическому свойству класса значение true.
static function signalHandler($signo, $pid = null, $status = null)
{
   self::$stopFlag = true;
}

Ну и третье, наш бесконечный цикл теперь должен быть не такой уж бесконечный:
while (!self::$stopFlag) {
   pcntl_signal_dispatch();
}

Особенности обработки сигналов в разных версиях PHP
В PHP < 5.3.0 для распределения сигналов использовалась специальная директива declare(ticks = N). Где тик — это событие, которое случается каждые N низкоуровневых операций, выполненных парсером внутри блока declare. Распределение сигналов осуществлялось в соответствии с настройкой. Слишком маленькое значение приводило к провалу в производительности, а слишком большое — к несвоевременной обработке сигналов.

В PHP >= 5.3.0 появилась функция pcntl_signal_dispatch (), которую можно вызывать для ручного распределения сигналов, что мы и делаем после каждой итерации.
Ну и наконец, в PHP 7.1 станет доступно асинхронное распределение сигналов, что позволит почти мгновенно получать сигналы без оверхеда и ручного вызова функций.


Теперь при получении команды от операционной системы скрипт спокойно завершит текущую итерацию и выйдет из цикла.Контроль за утечками памяти
К сожалению, если демон долго трудится без перезапуска — у него начинает течь память. Интенсивность утечки зависит от того, какие функции вы используете. Из нашей практики — наиболее сильно «текли» демоны, которые работают с удаленными SOAP-сервисами через стандартный класс SoapClient. Так что за этим нужно следить и периодически их перезапускать. Дополним наш цикл условием контроля за утечками:
while (!self::$stopFlag) {
   if (memory_get_usage() > $this->memoryLimit) {
      break;
   }
   pcntl_signal_dispatch();
}

Где же код для Yii?


Исходники выложены на Github — yii2-daemon, пакет также доступен для установки через composer.

Пакет состоит всего из 2-х абстрактных классов — базовый класс DaemonController и класс WatcherDaemonController.

DaemonController

 */
abstract class DaemonController extends Controller
{

    const EVENT_BEFORE_JOB = "beforeJob";
    const EVENT_AFTER_JOB = "afterJob";

    const EVENT_BEFORE_ITERATION = "beforeIteration";
    const EVENT_AFTER_ITERATION = "afterIteration";

    /**
     * @var $demonize boolean Run controller as Daemon
     * @default false
     */
    public $demonize = false;

    /**
     * @var $isMultiInstance boolean allow daemon create a few instances
     * @see $maxChildProcesses
     * @default false
     */
    public $isMultiInstance = false;

    /**
     * @var $parentPID int main procces pid
     */
    protected $parentPID;

    /**
     * @var $maxChildProcesses int max daemon instances
     * @default 10
     */
    public $maxChildProcesses = 10;

    /**
     * @var $currentJobs [] array of running instances
     */
    protected static $currentJobs = [];

    /**
     * @var int Memory limit for daemon, must bee less than php memory_limit
     * @default 32M
     */
    protected $memoryLimit = 268435456;

    /**
     * @var boolean used for soft daemon stop, set 1 to stop
     */
    private static $stopFlag = false;

    /**
     * @var int Delay between task list checking
     * @default 5sec
     */
    protected $sleep = 5;

    protected $pidDir = "@runtime/daemons/pids";

    protected $logDir = "@runtime/daemons/logs";

    private $stdIn;
    private $stdOut;
    private $stdErr;

    /**
     * Init function
     */
    public function init()
    {
        parent::init();

        //set PCNTL signal handlers
        pcntl_signal(SIGTERM, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGINT, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGHUP, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGUSR1, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGCHLD, ['vyants\daemon\DaemonController', 'signalHandler']);
    }

    function __destruct()
    {
        $this->deletePid();
    }

    /**
     * Adjusting logger. You can override it.
     */
    protected function initLogger()
    {
        $targets = \Yii::$app->getLog()->targets;
        foreach ($targets as $name => $target) {
            $target->enabled = false;
        }
        $config = [
            'levels' => ['error', 'warning', 'trace', 'info'],
            'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->getProcessName() . '.log',
            'logVars' => [],
            'except' => [
                'yii\db\*', // Don't include messages from db
            ],
        ];
        $targets['daemon'] = new \yii\log\FileTarget($config);
        \Yii::$app->getLog()->targets = $targets;
        \Yii::$app->getLog()->init();
    }

    /**
     * Daemon worker body
     *
     * @param $job
     *
     * @return boolean
     */
    abstract protected function doJob($job);

    /**
     * Base action, you can\t override or create another actions
     * @return bool
     * @throws NotSupportedException
     */
    final public function actionIndex()
    {
        if ($this->demonize) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
            } elseif ($pid) {
                $this->cleanLog();
                $this->halt(self::EXIT_CODE_NORMAL);
            } else {
                posix_setsid();
                $this->closeStdStreams();
            }
        }
        $this->changeProcessName();

        //run loop
        return $this->loop();
    }

    /**
     * Set new process name
     */
    protected function changeProcessName()
    {
        //rename process
        if (version_compare(PHP_VERSION, '5.5.0') >= 0) {
            cli_set_process_title($this->getProcessName());
        } else {
            if (function_exists('setproctitle')) {
                setproctitle($this->getProcessName());
            } else {
                \Yii::error('Can\'t find cli_set_process_title or setproctitle function');
            }
        }
    }

    /**
     * Close std streams and open to /dev/null
     * need some class properties
     */
    protected function closeStdStreams()
    {
        if (is_resource(STDIN)) {
            fclose(STDIN);
            $this->stdIn = fopen('/dev/null', 'r');
        }
        if (is_resource(STDOUT)) {
            fclose(STDOUT);
            $this->stdOut = fopen('/dev/null', 'ab');
        }
        if (is_resource(STDERR)) {
            fclose(STDERR);
            $this->stdErr = fopen('/dev/null', 'ab');
        }
    }

    /**
     * Prevent non index action running
     *
     * @param \yii\base\Action $action
     *
     * @return bool
     * @throws NotSupportedException
     */
    public function beforeAction($action)
    {
        if (parent::beforeAction($action)) {
            $this->initLogger();
            if ($action->id != "index") {
                throw new NotSupportedException(
                    "Only index action allowed in daemons. So, don't create and call another"
                );
            }

            return true;
        } else {
            return false;
        }
    }

    /**
     * Возвращает доступные опции
     *
     * @param string $actionID
     *
     * @return array
     */
    public function options($actionID)
    {
        return [
            'demonize',
            'taskLimit',
            'isMultiInstance',
            'maxChildProcesses',
        ];
    }

    /**
     * Extract current unprocessed jobs
     * You can extract jobs from DB (DataProvider will be great), queue managers (ZMQ, RabbiMQ etc), redis and so on
     *
     * @return array with jobs
     */
    abstract protected function defineJobs();

    /**
     * Fetch one task from array of tasks
     *
     * @param Array
     *
     * @return mixed one task
     */
    protected function defineJobExtractor(&$jobs)
    {
        return array_shift($jobs);
    }

    /**
     * Main Loop
     *
     * * @return boolean 0|1
     */
    final private function loop()
    {
        if (file_put_contents($this->getPidPath(), getmypid())) {
            $this->parentPID = getmypid();
            \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' started.');
            while (!self::$stopFlag) {
                if (memory_get_usage() > $this->memoryLimit) {
                    \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' .
                        getmypid() . ' used ' . memory_get_usage() . ' bytes on ' . $this->memoryLimit .
                        ' bytes allowed by memory limit');
                    break;
                }
                $this->trigger(self::EVENT_BEFORE_ITERATION);
                $this->renewConnections();
                $jobs = $this->defineJobs();
                if ($jobs && !empty($jobs)) {
                    while (($job = $this->defineJobExtractor($jobs)) !== null) {
                        //if no free workers, wait
                        if ($this->isMultiInstance && (count(static::$currentJobs) >= $this->maxChildProcesses)) {
                            \Yii::trace('Reached maximum number of child processes. Waiting...');
                            while (count(static::$currentJobs) >= $this->maxChildProcesses) {
                                sleep(1);
                                pcntl_signal_dispatch();
                            }
                            \Yii::trace(
                                'Free workers found: ' .
                                ($this->maxChildProcesses - count(static::$currentJobs)) .
                                ' worker(s). Delegate tasks.'
                            );
                        }
                        pcntl_signal_dispatch();
                        $this->runDaemon($job);
                    }
                } else {
                    sleep($this->sleep);
                }
                pcntl_signal_dispatch();
                $this->trigger(self::EVENT_AFTER_ITERATION);
            }

            \Yii::info('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' is stopped.');

            return self::EXIT_CODE_NORMAL;
        }
        $this->halt(self::EXIT_CODE_ERROR, 'Can\'t create pid file ' . $this->getPidPath());
    }

    /**
     * Delete pid file
     */
    protected function deletePid()
    {
        $pid = $this->getPidPath();
        if (file_exists($pid)) {
            if (file_get_contents($pid) == getmypid()) {
                unlink($this->getPidPath());
            }
        } else {
            \Yii::error('Can\'t unlink pid file ' . $this->getPidPath());
        }
    }

    /**
     * PCNTL signals handler
     *
     * @param $signo
     * @param null $pid
     * @param null $status
     */
    final static function signalHandler($signo, $pid = null, $status = null)
    {
        switch ($signo) {
            case SIGINT:
            case SIGTERM:
                //shutdown
                self::$stopFlag = true;
                break;
            case SIGHUP:
                //restart, not implemented
                break;
            case SIGUSR1:
                //user signal, not implemented
                break;
            case SIGCHLD:
                if (!$pid) {
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                while ($pid > 0) {
                    if ($pid && isset(static::$currentJobs[$pid])) {
                        unset(static::$currentJobs[$pid]);
                    }
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                break;
        }
    }

    /**
     * Tasks runner
     *
     * @param string $job
     *
     * @return boolean
     */
    final public function runDaemon($job)
    {
        if ($this->isMultiInstance) {
            $this->flushLog();
            $pid = pcntl_fork();
            if ($pid == -1) {
                return false;
            } elseif ($pid !== 0) {
                static::$currentJobs[$pid] = true;

                return true;
            } else {
                $this->cleanLog();
                $this->renewConnections();
                //child process must die
                $this->trigger(self::EVENT_BEFORE_JOB);
                $status = $this->doJob($job);
                $this->trigger(self::EVENT_AFTER_JOB);
                if ($status) {
                    $this->halt(self::EXIT_CODE_NORMAL);
                } else {
                    $this->halt(self::EXIT_CODE_ERROR, 'Child process #' . $pid . ' return error.');
                }
            }
        } else {
            $this->trigger(self::EVENT_BEFORE_JOB);
            $status = $this->doJob($job);
            $this->trigger(self::EVENT_AFTER_JOB);

            return $status;
        }
    }

    /**
     * Stop process and show or write message
     *
     * @param $code int -1|0|1
     * @param $message string
     */
    protected function halt($code, $message = null)
    {
        if ($message !== null) {
            if ($code == self::EXIT_CODE_ERROR) {
                \Yii::error($message);
                if (!$this->demonize) {
                    $message = Console::ansiFormat($message, [Console::FG_RED]);
                }
            } else {
                \Yii::trace($message);
            }
            if (!$this->demonize) {
                $this->writeConsole($message);
            }
        }
        if ($code !== -1) {
            \Yii::$app->end($code);
        }
    }

    /**
     * Renew connections
     * @throws \yii\base\InvalidConfigException
     * @throws \yii\db\Exception
     */
    protected function renewConnections()
    {
        if (isset(\Yii::$app->db)) {
            \Yii::$app->db->close();
            \Yii::$app->db->open();
        }
    }

    /**
     * Show message in console
     *
     * @param $message
     */
    private function writeConsole($message)
    {
        $out = Console::ansiFormat('[' . date('d.m.Y H:i:s') . '] ', [Console::BOLD]);
        $this->stdout($out . $message . "\n");
    }

    /**
     * @param string $daemon
     *
     * @return string
     */
    public function getPidPath($daemon = null)
    {
        $dir = \Yii::getAlias($this->pidDir);
        if (!file_exists($dir)) {
            mkdir($dir, 0744, true);
        }
        $daemon = $this->getProcessName($daemon);

        return $dir . DIRECTORY_SEPARATOR . $daemon;
    }

    /**
     * @return string
     */
    public function getProcessName($route = null)
    {
        if (is_null($route)) {
            $route = \Yii::$app->requestedRoute;
        }

        return str_replace(['/index', '/'], ['', '.'], $route);
    }

    /**
     *  If in daemon mode - no write to console
     *
     * @param string $string
     *
     * @return bool|int
     */
    public function stdout($string)
    {
        if (!$this->demonize && is_resource(STDOUT)) {
            return parent::stdout($string);
        } else {
            return false;
        }
    }

    /**
     * If in daemon mode - no write to console
     *
     * @param string $string
     *
     * @return int
     */
    public function stderr($string)
    {
        if (!$this->demonize && is_resource(\STDERR)) {
            return parent::stderr($string);
        } else {
            return false;
        }
    }

    /**
     * Empty log queue
     */
    protected function cleanLog()
    {
        \Yii::$app->log->logger->messages = [];
    }

    /**
     * Empty log queue
     */
    protected function flushLog($final = false)
    {
        \Yii::$app->log->logger->flush($final);
    }
}


WatcherDaemonController

 */
abstract class WatcherDaemonController extends DaemonController
{
    /**
     * @var string subfolder in console/controllers
     */
    public $daemonFolder = 'daemons';

    /**
     * @var boolean flag for first iteration
     */
    protected $firstIteration = true;

    /**
     * Prevent double start
     */
    public function init()
    {
        $pid_file = $this->getPidPath();
        if (file_exists($pid_file) && ($pid = file_get_contents($pid_file)) && file_exists("/proc/$pid")) {
            $this->halt(self::EXIT_CODE_ERROR, 'Another Watcher is already running.');
        }
        parent::init();
    }

    /**
     * Job processing body
     *
     * @param $job array
     *
     * @return boolean
     */
    protected function doJob($job)
    {
        $pid_file = $this->getPidPath($job['daemon']);

        \Yii::trace('Check daemon ' . $job['daemon']);
        if (file_exists($pid_file)) {
            $pid = file_get_contents($pid_file);
            if ($this->isProcessRunning($pid)) {
                if ($job['enabled']) {
                    \Yii::trace('Daemon ' . $job['daemon'] . ' running and working fine');

                    return true;
                } else {
                    \Yii::warning('Daemon ' . $job['daemon'] . ' running, but disabled in config. Send SIGTERM signal.');
                    if (isset($job['hardKill']) && $job['hardKill']) {
                        posix_kill($pid, SIGKILL);
                    } else {
                        posix_kill($pid, SIGTERM);
                    }

                    return true;
                }
            }
        }
        \Yii::error('Daemon pid not found.');
        if ($job['enabled']) {
            \Yii::trace('Try to run daemon ' . $job['daemon'] . '.');
            $command_name = $job['daemon'] . DIRECTORY_SEPARATOR . 'index';
            //flush log before fork
            $this->flushLog(true);
            //run daemon
            $pid = pcntl_fork();
            if ($pid === -1) {
                $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() returned error');
            } elseif ($pid === 0) {
                $this->cleanLog();
                \Yii::$app->requestedRoute = $command_name;
                \Yii::$app->runAction("$command_name", ['demonize' => 1]);
                $this->halt(0);
            } else {
                $this->initLogger();
                \Yii::trace('Daemon ' . $job['daemon'] . ' is running with pid ' . $pid);
            }
        }
        \Yii::trace('Daemon ' . $job['daemon'] . ' is checked.');

        return true;
    }

    /**
     * @return array
     */
    protected function defineJobs()
    {
        if ($this->firstIteration) {
            $this->firstIteration = false;
        } else {
            sleep($this->sleep);
        }

        return $this->getDaemonsList();
    }

    /**
     * Daemons for check. Better way - get it from database
     * [
     *  ['daemon' => 'one-daemon', 'enabled' => true]
     *  ...
     *  ['daemon' => 'another-daemon', 'enabled' => false]
     * ]
     * @return array
     */
    abstract protected function getDaemonsList();

    /**
     * @param $pid
     *
     * @return bool
     */
    public function isProcessRunning($pid)
    {
        return file_exists("/proc/$pid");
    }
}


DaemonController
Это родительский класс для всех демонов. Вот минимальный пример демона:

Функция defineJobs () должна возвращать набор задач для выполнения. По-умолчанию ожидается, что она будет возвращать массив. Если вы хотите возвращать, скажем MongoCursor, потребуется еще переопределить defineJobExtractor (). Функция doJob () должна получать на вход одну задачу для выполнения, производить с ней необходимые операции и помечать данную задачу в источнике как отработанную, чтобы она не упала второй раз.

Возможные параметры и настройки:

  • demonize — данный параметр определяет будет ли скрипт демонизироваться или работать как консольное приложение. Параметр доступен для задания из консоли: --demonize=1
  • isMultiInstance и maxChildProcesses — определяет можно ли демону создавать свои собственные копии и какое их максимальное количество может одновременно работать. Данная функция позволяет выполнять несколько задач параллельно. doJob будет выполняться в дочерних процессах, а родительский процесс будет только делегировать задачи своим потомкам и следить, чтобы их количество не превышало допустимый максимум. Весьма полезно, если ресурсов сервера хватает для того, чтобы выполнять несколько достаточно продолжительных по времени задач. По-умолчанию такое поведение выключено. Параметры так же доступны из консоли: --isMultiInstance=1 --maxChildProcesses=2
  • memoryLimit — порог потребления демоном памяти, если демон в режиме ожидания превысит данный порог, то он благородно совершить сиппоку. Как уже было обозначено ранее, для уменьшения размера потребляемой демонами памяти в результате утечек.
  • sleep — время в секундах, на которое демон будет засыпать между проверками наличия задач. Демон отправится спать только если defineJob вернет empty и пока есть задачи демон спать не будет. Поэтому defineJobs не должна возвращать статический список задач, иначе демон будет молотить их без конца и отдыха.
  • pidDir и logDir  — пути для хранения логов и pid-ов, поддерживают алиасы Yii. По-умолчанию »@runtime/daemons/pids» и »@runtime/daemons/logs»
Проблема потери соединений
При осуществлении операции fork () установленные в родительском процессе соединения перестают работать в дочерних процессах. Для того, чтобы избежать этой проблемы, после всех форков проставлен вызов функции renewConnections (). По-умолчанию, данная функция переподключает только Yii::$app→db, но вы можете переопределить ее, и добавить прочие источники, соединение с которыми нужно поддерживать в дочерних процессах.Логгирование
Демоны перенастраивают стандартный логгер Yii под себя. Если вас не устраивает поведение по-умолчанию — переопределите функцию initLogger ().WatcherDaemonController
Это почти готовый демон-наблюдатель. Задача данного демона следить за другими демонами, запускать и останавливать их при необходимости. Он не может стартовать дважды, поэтому можно смело поставить его запуск в crontab. Для того, чтобы начать его использовать, нужно в console/controllers создать папку daemons и положить класс вида:
 'daemons/test', 'enabled' => true]
        ];
    }
}

Требуется определить лишь одну функцию — getDaemonsList (), которая вернет список демонов за которыми нужно следить. В самом простом виде — это зашитый в код массив, но в таком случае вы не будете иметь возможности менять список «на лету». Положите список демонов в базу или отдельный файлик и получайте его каждый раз оттуда. В таком случае, watcher сможет включить или выключить демон без собственного перезапуска.

Заключение


В данный момент у нас более 50-ти демонов, выполняющих разнообразные задачи, начиная от отправки почтовых сообщений и заканчивая генерацией отчетов и актуализацией данных между разными системами.

Демоны работают с разными источниками задач — MySQL, RabbitMQ и даже удаленными веб-сервисами. Полет нормальный.
Безусловно, демоны на php не сравнятся с теми же демонами на Go. Но высокая скорость разработки, возможность повторного использования уже написанного кода и отсутствие необходимости учить команду другому языку перевешивают минусы.

Комментарии (1)

  • 9 августа 2016 в 19:01

    +2

    , а чем не угодил супервизор и обычные команды yii-шные?

© Habrahabr.ru