Записки разработчика: airflow->symfony-console->bitrix agents
Всем привет, случалось такое, что вам надо поставить кучу агентов битрикса на крон, а потом сидеть и разбираться — сколько они отрабатывают, отрабатывают ли вообще, когда падают или зависают?
Ну конечно случалось. Так вот, чтобы получить визуальное представление о том, что там происходит, было принято решение, вынести агенты даже не на крон, а на apache airflow. Поведаю вам, как это было реализовано.
Агенты:
По факту, агенты — это выполнение php команд по-расписанию. Но мы пойдем дальше и выделим их в отдельную сущность, в классы с расширением Agents.php
Создадим интерфейс для агентов
Далее создаем общий родитель для всех агентов
Ну и пример самого агента
moveToStage();
}
} catch (LoaderException|Exception $e) {
LoggerFacade::elk()->critical('Move import error', (new CatchSchema($e))->toArray());
}
return 'It`s done, dude!';
}
}
Что имеем: когда мы вызываем из агентов в админ-панели запись TestImportAgent::execute()
, то нам выводится сообщение 'It`s done, dude!'
а если мы вызываем из терминала TestImportAgent::console(),
то нам возвращается
'It`s done, dude!
DONE FOR AIRFLOW'
и это важно, чтобы AIRFLOW мог понять, что все выполнено корректно.
Консоль: Битриксвнедрили себе красивенькую, рабочую консольку, и можно было бы даже с ней поиграться, если бы она не была жестко завязана на модули, и не стояла устаревшая версия symfony/console
Для того, чтобы нам иметь свою красивую консольку, установим ее отдельно
Добавим local/console.php
$value) {
my_autoloader($file);
}
spl_autoload_register('my_autoloader');
foreach (get_declared_classes() as $item) {
if (is_subclass_of($item, '\Symfony\Component\Console\Command\Command')) {
$obApplication->add(new $item);
}
}
try {
$obApplication->run();
} catch (Exception $e) {
}
Что мы тут делаем:
1 Итеративно проходим по всем директориям и ищем файлы, которые заканчиваются на *Command.php
2 Берем найденные файлы, и если, классы этих файлов наследуют '\Symfony\Component\Console\Command\Command'
, то регистрируем их в симфони консоли.
3 Далее файлы с маской Command можно использовать также как в обычном symfony/console
Ну и пишем команду, которая находит, по аналогии, все файлы с маской *Agent.php, наследующие базовый аент, и выполняет
setDescription('Запустить агент"')
// the command help shown when running the command with the "--help" option
->setHelp('Запустить агент по имени класса')
->addArgument('className', InputArgument::OPTIONAL, 'Имя класса агента' )
;
}
/**
* Executes the current command.
* @param InputInterface $input Console input steam.
* @param OutputInterface $output Console output steam.
* @return int
*/
protected function execute(
Console\Input\InputInterface $input,
Console\Output\OutputInterface $output
): int
{
try {
if (!Loader::includeModule(moduleName: "...")) {
throw new \RuntimeException(message:
Loc::getMessage(code: '..._CRM_MODULE_NOT_INSTALLED')
);
}
$className = $input->getArgument('className');
// тут вынес сканирование по маске в отдельный метод
$arItems = ManagementService::create()->filesystem()->modules()->psr()->actions()->registerSplByMask('Agent');
foreach ($arItems as $item) {
if (is_subclass_of($item, 'App\Infrastructure\Cli\Agents\BaseAgent')) {
if(strstr($item, $className)){
$findClass = $item;
}
}
}
if(!empty($findClass)) {
// тут идет вызов метода
(new $findClass)->console();
} else {
throw new \InvalidArgumentException($className . ' is not exists');
}
} catch (LoaderException|Exception $e) {
$output->writeln($e->getMessage());
return Command::FAILURE;
}
return Command::SUCCESS;
}
}
Теперь нам доступен вызов любого агента, с помощью консоли
cd ./local && php ./console.php agents: run TestImportAgent
Вывод в эйрфлоу
Устанавливаем систему
Знакомство с Apache Airflow: установка и запуск первого DAGа
Привет! Меня зовут Алексей Карпов, я прикладной администратор (MLOps) отдела сопровождения моделей м…
Далее мы просто пишем даг для эйрфлоу
"""Запустить агент тестовый"""
import datetime
import pendulum
from airflow import DAG
from functions import create_task_ssh_command, dag_success_callback, container_root, php_command
agent_name = "TestAgent"
dag = DAG(
dag_id=agent_name,
schedule="0 */1 * * *",
start_date=pendulum.datetime(2024, 11, 15, tz="Europe/Moscow"),
catchup=False,
doc_md=__doc__,
max_active_runs=1,
dagrun_timeout=datetime.timedelta(minutes=6),
on_success_callback=dag_success_callback,
tags=[ 'agent', 'test'],
)
test_dag = create_task_ssh_command(dag, agent_name, f"""
cd /path/ && docker compose exec -u user_name php sh -c \"{php_command} -f {container_root}/local/console.php agents:run {agent_name} 2>&1 | tee /var/log/airflow/{agent_name}.log\"
""")
if __name__ == "__main__":
dag.run()
Скрытый текст
Ну и в functions.py пишем обработчик для результатов выполнения
from __future__ import annotations
from airflow.exceptions import AirflowFailException
from pprint import pprint
import logging
import datetime
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.operators.python import PythonOperator
from airflow.models import Variable
tz = datetime.timezone(datetime.timedelta(hours=3))
container_root = Variable.get("container_root")
php_command = Variable.get("php_command")
sshHook = SSHHook(ssh_conn_id='alconcrm', banner_timeout=60.0)
output = ''
DONE_STRING = 'DONE FOR AIRFLOW'
FLAG_IS_EMPTY = 'EMPTY'
FLAG_IS_DONE = 'DONE'
FLAG_IS_NOT_EMPTY = 'NOT_EMPTY'
def create_task_ssh_command(dag, task_id, task_command, task_success_callback=None):
return PythonOperator(
task_id=task_id,
op_kwargs={"command": task_command},
python_callable=task_method,
# on_success_callback=task_success_callback,
dag=dag
)
def task_method(**kwargs):
run_ssh_command(kwargs['command'])
def dag_success_callback(context):
# logging.info("pprint(vars(context)) dag_success_callback")
# logging.info(pprint(vars(context)))
# send_to_telegram(f"""
# \ud83d\ude00 Задача выполнена успешно.
# ID группы задач: {context.get('dag').safe_dag_id}
# Название группы задач: {context.get('dag').doc_md}
# Время запуска: {context.get('dag_run').start_date.astimezone(tz)}
# Время завершения: {context.get('dag_run').end_date.astimezone(tz)}
# Тип запуска: {context.get('dag_run').run_type}
# Время исполнения: {context.get('dag_run').end_date - context.get('dag_run').start_date}
# """)
return True
def run_ssh_command(command):
ssh_client = None
try:
ssh_client = sshHook.get_conn()
ssh_client.load_system_host_keys()
stdin, stdout, stderr = ssh_client.exec_command(command)
return_result = stdout.read().decode('utf-8')
logging.info(pprint(return_result))
check_output(return_result)
return return_result
finally:
if ssh_client:
ssh_client.close()
def check_output(return_result_arg):
flag = check_done_output(return_result_arg)
if flag == FLAG_IS_NOT_EMPTY:
raise AirflowFailException('Неожиданный ответ команды в терминале: '+"\n"+(output))
elif flag == FLAG_IS_EMPTY:
raise AirflowFailException('Пустой ответ команды в терминале')
def check_line(arg_line):
return arg_line.strip() == DONE_STRING
def check_done_output(arg_output):
is_empty = True
global output
for line in arg_output.splitlines():
str = line.strip();
if check_line(str):
return FLAG_IS_DONE
elif str != '':
is_empty = False
output += str + "\n"
flag = FLAG_IS_EMPTY if is_empty else FLAG_IS_NOT_EMPTY
return flag
И наслаждаемся полученной картинкой