Записки разработчика: airflow->symfony-console->bitrix agents

Всем привет, случалось такое, что вам надо поставить кучу агентов битрикса на крон, а потом сидеть и разбираться — сколько они отрабатывают, отрабатывают ли вообще, когда падают или зависают?

Ну конечно случалось. Так вот, чтобы получить визуальное представление о том, что там происходит, было принято решение, вынести агенты даже не на крон, а на apache airflow. Поведаю вам, как это было реализовано.

Агенты:

По факту, агенты — это выполнение php команд по-расписанию. Но мы пойдем дальше и выделим их в отдельную сущность, в классы с расширением Agents.php

Создадим интерфейс для агентов

Далее создаем общий родитель для всех агентов

Ну и пример самого агента

moveToStage();
            }
        } catch (LoaderException|Exception $e) {
            LoggerFacade::elk()->critical('Move import error', (new CatchSchema($e))->toArray());
        }

        return 'It`s done, dude!';
    }
}

Что имеем: когда мы вызываем из агентов в админ-панели запись TestImportAgent::execute(), то нам выводится сообщение 'It`s done, dude!'
а если мы вызываем из терминала
TestImportAgent::console(), то нам возвращается
'It`s done, dude!
DONE FOR AIRFLOW'
и это важно, чтобы AIRFLOW мог понять, что все выполнено корректно.

Консоль: Битриксвнедрили себе красивенькую, рабочую консольку, и можно было бы даже с ней поиграться, если бы она не была жестко завязана на модули, и не стояла устаревшая версия symfony/console

Для того, чтобы нам иметь свою красивую консольку, установим ее отдельно

093dea06b2d8ee178f056f59b0392c2d.png

Добавим local/console.php

 $value) {
    my_autoloader($file);
}

spl_autoload_register('my_autoloader');

foreach (get_declared_classes() as $item) {
    if (is_subclass_of($item, '\Symfony\Component\Console\Command\Command')) {
        $obApplication->add(new $item);
    }
}

try {
    $obApplication->run();
} catch (Exception $e) {
}

Что мы тут делаем:
1 Итеративно проходим по всем директориям и ищем файлы, которые заканчиваются на *Command.php

2 Берем найденные файлы, и если, классы этих файлов наследуют '\Symfony\Component\Console\Command\Command', то регистрируем их в симфони консоли.

3 Далее файлы с маской Command можно использовать также как в обычном symfony/console

Ну и пишем команду, которая находит, по аналогии, все файлы с маской *Agent.php, наследующие базовый аент, и выполняет Agent: console ($params);

setDescription('Запустить агент"')
            // the command help shown when running the command with the "--help" option
            ->setHelp('Запустить агент по имени класса')
            ->addArgument('className', InputArgument::OPTIONAL, 'Имя класса агента' )
        ;
    }

    /**
     * Executes the current command.
     * @param InputInterface $input Console input steam.
     * @param OutputInterface $output Console output steam.
     * @return int
     */
    protected function execute(
        Console\Input\InputInterface $input,
        Console\Output\OutputInterface $output
    ): int
    {
        try {
            if (!Loader::includeModule(moduleName: "...")) {
                throw new \RuntimeException(message:
                    Loc::getMessage(code: '..._CRM_MODULE_NOT_INSTALLED')
                );
            }

            $className = $input->getArgument('className');
          // тут вынес сканирование по маске в отдельный метод
            $arItems = ManagementService::create()->filesystem()->modules()->psr()->actions()->registerSplByMask('Agent');

            foreach ($arItems as $item) {
                if (is_subclass_of($item, 'App\Infrastructure\Cli\Agents\BaseAgent')) {
                    if(strstr($item, $className)){
                        $findClass = $item;
                    }
                }
            }
            if(!empty($findClass)) {
              // тут идет вызов метода
                (new $findClass)->console();
            } else {
                throw new \InvalidArgumentException($className . ' is not exists');
            }

        } catch (LoaderException|Exception $e) {
            $output->writeln($e->getMessage());
            return Command::FAILURE;
        }

        return Command::SUCCESS;
    }
}

Теперь нам доступен вызов любого агента, с помощью консоли

cd ./local && php ./console.php agents: run TestImportAgent

Вывод в эйрфлоу

Устанавливаем систему

Знакомство с Apache Airflow: установка и запуск первого DAGа

Привет! Меня зовут Алексей Карпов, я прикладной администратор (MLOps) отдела сопровождения моделей м…

habr.com

Далее мы просто пишем даг для эйрфлоу

"""Запустить агент тестовый"""
import datetime
import pendulum
from airflow import DAG
from functions import create_task_ssh_command, dag_success_callback, container_root, php_command

agent_name = "TestAgent"

dag = DAG(
    dag_id=agent_name,
    schedule="0 */1 * * *",
    start_date=pendulum.datetime(2024, 11, 15, tz="Europe/Moscow"),
    catchup=False,
    doc_md=__doc__,
    max_active_runs=1,
    dagrun_timeout=datetime.timedelta(minutes=6),
    on_success_callback=dag_success_callback,
    tags=[ 'agent', 'test'],
)

test_dag = create_task_ssh_command(dag, agent_name, f"""
cd /path/ && docker compose exec -u user_name php sh -c \"{php_command} -f {container_root}/local/console.php agents:run {agent_name} 2>&1 | tee /var/log/airflow/{agent_name}.log\"
""")

if __name__ == "__main__":
    dag.run()

Скрытый текст

Ну и в functions.py пишем обработчик для результатов выполнения

from __future__ import annotations
from airflow.exceptions import AirflowFailException

from pprint import pprint
import logging
import datetime
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.operators.python import PythonOperator
from airflow.models import Variable

tz = datetime.timezone(datetime.timedelta(hours=3))
container_root = Variable.get("container_root")
php_command = Variable.get("php_command")

sshHook = SSHHook(ssh_conn_id='alconcrm', banner_timeout=60.0)
output = ''
DONE_STRING = 'DONE FOR AIRFLOW'
FLAG_IS_EMPTY = 'EMPTY'
FLAG_IS_DONE = 'DONE'
FLAG_IS_NOT_EMPTY = 'NOT_EMPTY'

def create_task_ssh_command(dag, task_id, task_command, task_success_callback=None):
    return PythonOperator(
        task_id=task_id,
        op_kwargs={"command": task_command},
        python_callable=task_method,
        # on_success_callback=task_success_callback,
        dag=dag
    )


def task_method(**kwargs):
    run_ssh_command(kwargs['command'])

def dag_success_callback(context):
    # logging.info("pprint(vars(context)) dag_success_callback")
    # logging.info(pprint(vars(context)))
	#     send_to_telegram(f"""
	#         \ud83d\ude00 Задача выполнена успешно.
	#         ID группы задач: {context.get('dag').safe_dag_id}
	#         Название группы задач: {context.get('dag').doc_md}
	#         Время запуска: {context.get('dag_run').start_date.astimezone(tz)}
	#         Время завершения: {context.get('dag_run').end_date.astimezone(tz)}
	#         Тип запуска: {context.get('dag_run').run_type}
	#         Время исполнения: {context.get('dag_run').end_date - context.get('dag_run').start_date}
	#     """)
	return True

def run_ssh_command(command):
    ssh_client = None
    try:
        ssh_client = sshHook.get_conn()
        ssh_client.load_system_host_keys()
        stdin, stdout, stderr = ssh_client.exec_command(command)
        return_result = stdout.read().decode('utf-8')
        logging.info(pprint(return_result))
        check_output(return_result)
        return return_result
    finally:
        if ssh_client:
            ssh_client.close()


def check_output(return_result_arg):
    flag = check_done_output(return_result_arg)
    if flag == FLAG_IS_NOT_EMPTY:
        raise AirflowFailException('Неожиданный ответ команды в терминале: '+"\n"+(output))
    elif flag == FLAG_IS_EMPTY:
        raise AirflowFailException('Пустой ответ команды в терминале')


def check_line(arg_line):
    return arg_line.strip() == DONE_STRING


def check_done_output(arg_output):
    is_empty = True
    global output
    for line in arg_output.splitlines():
        str = line.strip();
        if check_line(str):
            return FLAG_IS_DONE
        elif str != '':
            is_empty = False
            output += str + "\n"
    flag = FLAG_IS_EMPTY if is_empty else FLAG_IS_NOT_EMPTY
    return flag

И наслаждаемся полученной картинкой

64e5731f622cfb937da7b3fdc5af9ff1.png

© Habrahabr.ru