[Перевод] 7 распространенных ошибок, которые нужно проверить при отладке DAG-файлов Airflow

?v=1

Задачи не выполняются? DAG не работает? Журналы не находятся? У нас были те же самые проблемы. Вот список распространенных ошибок и некоторые соответствующие исправления, которые следует учитывать при отладке развертывания Airflow.

Apache Airflow стал ведущим планировщиком задач с открытым исходным кодом практически для любого вида работы, от обучения модели машинного обучения до общей оркестровки ETL. Это невероятно гибкий инструмент, который, как мы можем сказать по опыту, поддерживает критически важные проекты как для стартапов из пяти человек, так и для команд из списка Fortune 50.

С учетом сказанного, тот самый инструмент, который многие считают мощным «чистым холстом», может быстро стать обоюдоострым мечом, если вы только начинаете. И, к сожалению, нет особенно огромного богатства ресурсов и лучших практик на шаг или два выше базовых основ Apache Airflow.

Стремясь максимально заполнить этот пробел, мы собрали некоторые из наиболее распространенных проблем, с которыми сталкивается почти каждый пользователь, независимо от того, насколько опытна и многочисленна его команда. Независимо от того, являетесь ли вы новичком в Airflow или опытным пользователем, ознакомьтесь с этим списком распространенных ошибок и некоторыми соответствующими исправлениями, которые следует учитывать.


1. Ваш DAG не работает в необходимое время

Вы написали новый DAG, который должен запускаться каждый час. Вы устанавливаете почасовой интервал, начинающийся сегодня в 14:00, и устанавливаете напоминание, чтобы проверить его через пару часов. Вы проверяете его в 15:30 и обнаруживаете, что хотя ваш DAG действительно работал, в ваших журналах указано, что существует только одна записанная дата выполнения на 14:00. А что происходило в 3 часа дня?

Прежде чем вы перейдете в верхний режим исправления (вы не будете первым), будьте уверены, что это вполне ожидаемое поведение. Функциональность планировщика Airflow немного противоречит здравому смыслу (и вызывает некоторые разногласия в сообществе Airflow), но вы освоитесь. Две вещи:


  • По замыслу Airflow DAG будет выполняться по завершении своего schedule_interval.
    Это означает, что один schedule_interval выполняется ПОСЛЕ даты начала. Например, ежечасный DAG выполнит свой запуск в 2 часа дня, когда часы пробьют 3 часа дня. Причина здесь в том, что Airflow не может гарантировать, что все данные, соответствующие интервалу 2 часа дня, присутствуют до конца этого часового интервала.
    Это специфический аспект для Airflow, но важно помнить о нем, особенно если вы используете переменные и макросы по умолчанию.
  • Время в Airflow по умолчанию указано в формате UTC.
    Это не должно вызывать удивления, учитывая, что остальные ваши базы данных и API, скорее всего, также придерживаются этого формата, но это стоит уточнить.
    Не следует ожидать, что выполнение ваших DAG-файлов будет соответствовать вашему местному часовому поясу. Если вы находитесь в тихоокеанском времени США, запуск DAG с 19:00 будет соответствовать 12:00 по местному времени.
    Начиная с версии 1.10, Airflow фактически учитывает часовой пояс, но мы по-прежнему рекомендуем сохранять ваши DAGи с расчетом на временные метки UTC для согласованности.

2. Одна из ваших DAG не работает

Если рабочие процессы в вашем развертывании обычно выполняются без сбоев, но вы обнаруживаете, что один конкретный DAG не планирует задачи и не запускается вообще, это может иметь какое-то отношение к тому, как вы настроили ее для расписания.

Убедитесь, что у вас нет
datetime.now()
в значении переменой start_date.

Интуитивно понятно, что если вы скажете своему DAGу начать работу сейчас, она выполнится сейчас. НО, это не учитывает, как сам Airflow фактически читает datetime.now().

Для выполнения DAG start_date должен быть временем в прошлом, иначе Airflow будет считать, что он еще не готов к выполнению. Когда Airflow оценивает ваш DAG, он интерпретирует datetime.now() как текущую метку времени (т.е. НЕ время в прошлом) и решает, что он не готов к запуску. Поскольку это будет происходить каждый раз, когда пульс Airflow будет оценивать ваш DAG каждые 5–10 секунд, он никогда не запустится.

Чтобы правильно запустить DAG, обязательно ставьте фиксированное время в прошлом (например, datetime(2019,1,1)) и установите catchup=False (если вы не хотите запускать обратную засыпку).

Примечание. Вы можете вручную запустить DAG через пользовательский интерфейс Airflow прямо на панели управления (она выглядит как кнопка Play). Ручной триггер выполняется немедленно и не прерывает регулярное планирование, хотя он будет ограничен любыми конфигурациями параллелизма, которые у вас есть на уровне DAG, уровне развертывания или уровне задачи. Когда вы посмотрите соответствующие журналы, run_id покажет manual__ вместо scheduled__.


3. Вы видите ошибку 503 при развертывании

Если вы переходите к развертыванию Airflow только для того, чтобы понять, что ваш экземпляр полностью недоступен через веб-браузер, скорее всего, это как-то связано с вашим веб-сервером.
Если вы уже обновили страницу один или два раза и продолжаете видеть ошибку 503, прочтите ниже некоторые рекомендации, связанные с веб-сервером.


Ваш веб-сервер может дать сбой

Ошибка 503 обычно указывает на проблему веб-сервера (или проблему deployment в kubernetes), основным компонентом Airflow, отвечающим за отображение состояния задачи и журналов выполнения задач в интерфейсе Airflow. Если по какой-либо причине у него недостаточно мощности или иным образом возникла проблема, это может повлиять на время загрузки пользовательского интерфейса или доступность веб-браузера.

По нашему опыту, ошибка 503 часто указывает на то, что ваш веб-сервер дает сбой (например, в Astronomer в kubernetes это называется состоянием CrashLoopBackOff). Если вы запускаете deployment в kubernetes, и вашему веб-серверу по какой-либо причине требуется больше нескольких секунд для запуска, он может не достичь периода ожидания (10 секунд по умолчанию), в котором он вылетит, прежде чем он успеет развернуться. Это вызывает повторную попытку, которая снова дает сбой и так далее.

Если ваше deployment находится в этом состоянии, возможно, ваш веб-сервер достигает предела памяти при загрузке ваших DAG (даже если ваши рабочие и планировщик продолжают выполнять задачи, как ожидалось).


Несколько замечаний


  1. Вы пытались увеличить ресурсы своего веб-сервера?
    Airflow 1.10 немного жаднее, чем Airflow 1.9, в отношении ЦП (использования памяти), поэтому мы видели недавний всплеск количества пользователей, сообщающих о 503-х ошибках. Помогает быстрое увеличение ресурсов, выделенных вашему веб-серверу.
    Если вы используете Astronomer, мы рекомендуем поддерживать размер веб-сервера на отметке минимум 5 AU (Astronomer Units).


  2. Как насчет увеличения периода ожидания веб-сервера?
    Если увеличение ресурсов веб-сервера кажется неэффективным (не сходите с ума), вы можете попробовать увеличить web_server_master_timeout или web_server_worker_timeout.
    Повышение этих значений укажет вашему веб-серверу Airflow подождать немного дольше для загрузки, прежде чем он покажет вам 503 ошибку (тайм-аут). Вы все равно можете пробовать медленную загрузку, если deployment на самом деле недостаточно мощно, но вы, вероятно, избежите попадания в 503.


  3. Вы делаете запросы вне оператора?
    Если вы выполняете вызовы API, запросы JSON или запросы к базе данных за пределами оператора с высокой частотой, вероятность тайм-аута вашего веб-сервера гораздо выше.
    Когда Airflow интерпретирует файл для поиска любых допустимых DAG, он сначала немедленно запускает весь код на верхнем уровне (то есть вне операторов). Даже если сам оператор выполняется только во время выполнения, все, что вызывается вне оператора, вызывается при каждом такте, что может быть довольно утомительным.
    Мы бы порекомендовали взять логику, которую вы в настоящее время выполняете вне оператора, и по возможности переместить ее внутрь оператора Python.



4. Задачи тасков периодически не работают

Это подводит нас к общей передовой практике, которую мы начали применять.


Будьте осторожны при использовании Sensors

Если вы используете Airflow 1.10.1 или более раннюю версию, датчики работают непрерывно и постоянно занимают слот для задач, пока не найдут то, что ищут, поэтому они имеют тенденцию вызывать проблемы с параллелизмом. Если у вас действительно никогда не бывает более нескольких задач, выполняемых одновременно, мы рекомендуем избегать их, если вы не знаете, что они не займут слишком много времени для выхода.

Например, если работник может одновременно запускать только X задач, а у вас работает три датчика (sensors?), то вы сможете запускать только X-3 задачи в любой заданный момент. Имейте в виду, что если вы постоянно используете датчик (sensors?), это ограничивает то, как и когда может произойти перезапуск планировщика (иначе датчик (sensors?) выйдет из строя).

В зависимости от вашего варианта использования мы предлагаем рассмотреть следующее:


  1. Создайте DAG, который запускается с более частым интервалом.
    Возможно, что задан тычок — и пропускает последующие задачи, если файл не найден.

    2. Триггер лямбда-функции


Примечание: новая функция датчика Airflow v1.10.2 mode = reschedule решает эту проблему. Если у вас больше датчиков, чем рабочих слотов, датчик теперь будет переведен в новое состояние up_for_reschedule, что разблокирует рабочий слот.


5. Задачи выполняются, но становятся бутылочным горлышком

Если все выглядит так, как ожидалось, но вы обнаруживаете, что ваши задачи становятся бутылочным горлышком, мы рекомендуем внимательнее присмотреться к двум вещам: Ваши переменные Env и конфигурации, связанные с параллелизмом + ваши ресурсы Worker и Scheduler.


1. Проверьте свои переменные Env и связанные с параллелизмом (Concurrency) конфигурации

Какие именно эти значения должны быть установлены (и что может стать потенциальным узким местом), зависит от вашей настройки — например, вы запускаете несколько DAG одновременно или один DAG с сотнями одновременных задач? С учетом сказанного, их точная настройка, безусловно, может помочь решить проблемы с производительностью. Вот список того, что вы можете найти:

1. Параллелизм (параллелизм)


  • Это определяет, сколько экземпляров задач может активно выполняться параллельно (parallel) в нескольких DAG с учетом ресурсов, доступных в любой момент времени на уровне развертывания. Думайте об этом как о «максимально активных задачах в любом месте».


  • ENV AIRFLOW__CORE__PARALLELISM=18

2. Concurrency DAG (dag_concurrency)


  • Это определяет, сколько экземпляров задач ваш планировщик может запланировать одновременно для каждой DAG. Думайте об этом как о «максимальном количестве задач, которые можно запланировать за один раз для каждой DAG».


  • ENV AIRFLOW__CORE__DAG_CONCURRENCY=16

3. Количество слотов задач без пула (Nonpooledtaskslotcount)


  • Когда пулы не используются, задачи запускаются в «пуле по умолчанию», размер которого определяется этим элементом конфигурации.


  • ENV AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=256

4. Максимальное количество активных запусков на DAG (maxactiverunsperdag)


  • Это говорит само за себя, но он определяет максимальное количество активных запусков DAG на DAG.


  • ENV AIRFLOW__CORE__MAX_ACTIVE_RUNS 3

5. Concurrency воркеров (worker_concurrency)


  • Это определяет, сколько задач каждый воркер может запускать в любой момент времени. Например, CeleryExecutor по умолчанию будет одновременно выполнять не более 16 задач. Думайте об этом как о «Сколько задач каждый из моих воркеров может взять на себя в любой момент времени».


  • Важно отметить, что это количество, естественно, будет ограничено dagconcurrency. Если у вас есть 1 воркер и вы хотите, чтобы он соответствовал мощности вашего развертывания, workerconcurrency = parallelism.


  • ENV AIRFLOW__CELERY__WORKER_CONCURRENCY=9

6. Параллелизм (параллелизм)


  • Не путать с приведенными выше настройками. «Параллелизм» здесь устанавливается на индивидуальном уровне DAG и определяет количество задач, которые могут выполняться одновременно в одном DAG. Это также может потребовать настройки, но это не будет работать, если оно определено как часть файла airflow.cfg.

Совет от профессионала: если вы рассматриваете возможность установить низкое число конфигураций параллелизма на уровне DAG или развертывания для защиты от ограничений скорости API, мы рекомендуем вместо этого использовать «пулы» — они позволят вам ограничить параллелизм на уровне задачи и выиграть t, ограничивать планирование или выполнение за пределами задач, которые в этом нуждаются.


2. Попробуйте увеличить масштаб планировщика или добавить воркера

Если задачи становятся узкими местами и все ваши конфигурации concurrency выглядят нормально, возможно, ваш Планировщик недостаточно мощный или ваше развертывание (deployment) может использовать другого воркера. Если вы используете Astronomer, мы обычно рекомендуем 5 AU в качестве минимума по умолчанию для Scheduler и 10 AU для ваших рабочих Celery, если они у вас есть.

Увеличите ли вы свои текущие ресурсы или добавите дополнительного работника, во многом зависит от вашего варианта использования, но мы обычно рекомендуем следующее:


  • Если вы выполняете относительно большое количество легких задач в DAG и с относительно высокой частотой, вам, вероятно, лучше иметь 2 или 3 «легких» воркеров для распределения работы.
  • Если вы выполняете меньше, но более тяжелые задачи с меньшей частотой, вам, вероятно, будет лучше с одним, но «более тяжелым» воркером, который может более эффективно выполнять эти задачи.

Для получения дополнительной информации о различиях между Executors ознакомьтесь с нашим Airflow Executors: Explained Guide.


6. У вас отсутствуют журналы

Вообще говоря, журналы не отображаются из-за процесса, который умер на одном из ваших рабочих процессов.

Вы можете увидеть что-то вроде следующего:

Failed to fetch log file from worker. Invalid URL 'http://:8793/log/staging_to_presentation_pipeline_v5/redshift_to_s3_Order_Payment_17461/2019-01-11T00:00:00+00:00/1.log': No host supplied

Несколько действий, которые стоит попробовать:


  1. Повторите (удалите) задачу, если это возможно, чтобы увидеть, появляются ли журналы.

    Это очистит / сбросит задачи и предложит снова их запустить


  2. Измените log_fetch_timeout_sec на значение более 5 секунд (по умолчанию).

    Это количество времени (в секундах), в течение которого веб-сервер будет ожидать начального рукопожатия (handshake) при получении журналов от других воркеров.


  3. Дайте вашим воркерам немного больше прав

    Если вы используете Astronomer, вы можете сделать это на вкладке Configure пользовательского интерфейса Astronomer.


  4. Вы ищете журнал, сделанный более 15 дней назад?

    Если вы используете Astronomer, период хранения журнала — это переменная среды, которую мы жестко запрограммировали на нашей платформе. На данный момент у вас не будет доступа к журналам, которым более 15 дней.


  5. Вы можете выполнить команду в одном из своих воркеров Celery, чтобы найти там файлы журнала.

    Эта функция предназначена только для корпоративных клиентов или людей, использующих Kubernetes.

    После настройки Kubectl можно запустить: kubectl exec -it {worker_name} bash

    Файлы журнала должны быть в ~/logs. Оттуда они будут разделены на DAG/TASK/RUN.


  6. Задачи медленно планируются или вообще перестали планироваться.

    Если ваши задачи выполняются медленнее, чем обычно, вы захотите проверить, как часто вы устанавливаете свой планировщик для перезапуска. К сожалению, у Airflow есть хорошо известная проблема, из-за которой производительность планировщика со временем ухудшается и требуется быстрый перезапуск для обеспечения оптимальной производительности.

    Частота перезапусков определяется в вашем airflow.cfg как run_duration. Значение run_duration, равное -1, означает, что вы никогда не хотите, чтобы ваш планировщик перезапускался, тогда как run_duration, равное 3600, будет перезапускать ваш планировщик каждый час. Ознакомьтесь с этим сообщением на форуме для получения дополнительной информации. Обычно мы перезапускаем наши собственные планировщики примерно раз в день, но частота, с которой вы это можете делать, очень сильно зависит от вашего конкретного варианта использования.


Если вы используете Astronomer, вы можете перезапустить планировщик следующим образом:


  • Вставьте AIRFLOW__SCHEDULER__RUN_DURATION={num_seconds_between_restarts} в качестве переменной среды на странице настройки пользовательского интерфейса Astronomer для установки повторяющегося перезапуска ИЛИ
  • Запустите astro airflow deploy через интерфейс командной строки для немедленного перезапуска всего (если вы используете Celery, вы можете воспользоваться периодом отсрочки прекращения воркера, который вы можете использовать здесь, чтобы свести к минимуму существующие немедленные сбои в выполнении задач)

Этот список основан на нашем опыте оказания помощи клиентам Astronomer в решении основных проблем с Airflow, но мы хотим услышать ваше мнение. Не стесняйтесь обращаться к нам по адресу people@astronomer.io, если мы пропустили что-то, что, по вашему мнению, было бы полезно включить.

Если у вас есть дополнительные вопросы или вы ищете поддержку Airflow от нашей команды, свяжитесь с нами здесь.

© Habrahabr.ru