Celery: изучаем на реальных примерах ч.1

0361694f4c01b4a3d81438c7ca1b7a1f.jpeg

Лучший способ что-то понять — попробовать на практике, а лучшая практика — это реальные примеры. В этой статье мы разберем возможности Celery и применение основных параметров для управления задачами. Для начала пару слов про сам Celery.

Зачем же нам Celery?

Celery — это самый популярный инструмент для асинхронной обработки задач. Он позволяет выполнять задачи в фоновом режиме и гибко настраивать параметры для каждой задачи. Кроме того, Celery предоставляет возможности для планирования, работы с разными очередями и мониторинга выполнения задач.

А теперь рассмотрим шесть базовых сценариев использования.

Сценарий 1: Выполнение задачи в фоне

Начнем с самого простого — обычное выполнение задачи в фоновом режиме. Представим, что нам нужно сгенерировать тяжелый отчет по запросу и синхронное выполнение здесь не подходит (слишком долго). Поэтому мы отправляем запрос на сервер для его генерации, после чего задача встает в очередь. Когда очередь дойдет до нашей задачи — начнется генерация отчета. Теперь перейдем к реализации.

Создание асинхронной задачи начинается с определения функции, которая будет выполняться асинхронно. Для этого используем декоратор @task

import time
from celery import Celery

app = Celery('myapp', broker='pyamqp://guest@localhost//')

@app.task
def generate_report_task(arg1, arg2):
    print("Start generating report")
    time.sleep(10)
    print("Report generated")

Осталось только запустить. Есть три способа это сделать: apply_async, delay и обычный вызов call.

apply_async — это метод, который предоставляет максимальную гибкость при запуске задачи и принимает большое количество аргументов.

generate_report_task.apply_async(args=[arg1_value], kwargs={'key': 'value'})

delay — в отличие от apply_async имеет ограниченный список принимаемых аргументов. Такой способ запуска мы рассматриваем, когда нужно просто запустить задачу без необходимости передавать именованные аргументы и другие параметры.

generate_report_task.delay(arg1_value, arg2_value)

Этот метод часто используется, когда задача принимает всего несколько аргументов и нам нужно просто её запустить.

Последний способ — это обычный вызов функции. В таком случае задача будет выполнена сразу же, а не назначена в очередь.

generate_report_task(arg1_value, arg2_value)

Сценарий 2: Выполнить задачу через час

Следующая задача — пользователь создал статью и хочет опубликовать её через один час. Настало время узнать об аргументах, которые принимает apply_async.

Вариантов здесь — два. Самый простой — аргумент countdown — в переводе «обратный отсчёт». Он позволяет задать время в секундах, через которое задача станет доступна для выполнения. Как раз то, что нам нужно!

from datetime import datetime

@app.task
def publish_article(arg1, arg2):
    print(f"Publish time: {datetime.now()}")

publish_article_after = 60 * 60 # 60 минут
result = publish_article.apply_async(args=[article_id], countdown=publish_article_after)

Важно для Redis Backend

Данный способ не подойдет, если вы используете Redis в качестве брокера. Дело в том, что Redis помещает отложенные задачи в очередь unacked, из которой по истечение времени, указанного в аргументе VISIBILITY_TIMEOUT, задача будет назначена еще одному обработчику. Например, countdown у нас равен 120 минутам, а VISIBILITY_TIMEOUT по умолчанию 60. В таком случае есть риск, что задача будет назначена сразу трём обработчикам (первому сразу, второму через 60 минут, третьему — если задача через 120 минут будет еще в очереди). В результате, мы получим выполнение одной и той же задачи несколько раз. Подробнее в документации тут и тут.

Важно для RabbitMQ Backend

Параметрconsumer_timeout  по умолчанию равен 30 минутам. Не желательно устанавливать countdown больше этого времени, иначе будет возбуждено исключение PRECONDITION_FAILED. Если есть такая необходимость, необходимо увеличить время в rabbitmq.conf. Подробнее — тут.

Сценарий 3: Выполнить задачу завтра в полдень

Теперь наш пользователь хочет выложить статью завтра в полдень. Эта ситуация очень похожа на предыдущую и мы могли бы использовать countdown. Но он лучше подходит для небольших промежутков времени — через минуту или пол часа. А для назначения задачи на конкретное время намного удобнее использовать аргумент eta. Он расшифровывается как Estimated Time of Arrival, что в переводе «Ожидаемое время прибытия».

Здесь есть две важных детали:

  • при использовании Redis отложенные с помощью eta задачи столкнутся с той же проблемой, что и countdown из-за VISIBILITY_TIMEOUT.

  • eta — это не точное время, в которое будет выполнена задача. Указывая время, мы говорим Celery — «задача должна быть выполнена не раньше этого времени». Как только это время наступит — задача будет выполнена в порядке очереди и будет зависеть от количества задач в очереди.

Вот пример:

from datetime import datetime

# Получим время для примера. В нормальной ситуации - 
# нам придет аргумент с временем публикации
now = datetime.now()
tomorrow = now + timedelta(days=1)

publish_article_datetime = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 12, 0, 0)

result = publish_article.apply_async(args=["some_value"], eta=publish_article_datetime)

Сценарий 4: Выставить максимальное время выполнения задачи

Разберем на примере задачи по генерации отчёта. Мы знаем, что она не должна занимать больше часа (для примера). Если такое вдруг случилось — скорее всего что-то не так. Необходимо завершить задачу по таймауту, а потом — разобраться в причинах.

Для этого мы будем использовать аргументы soft_time_limit и time_limit. После наступления soft_time_limit в задаче будет возбуждено исключение SoftTimeLimitExceeded. Если задача не завершилась и наступает time_limit, выполнение задачи будет приостановлено.

Первый вариант — передать аргументы в apply_async

@app.task()

def generate_report():

    try:
        time.sleep(60 * 2)
    except SoftTimeLimitExceeded:
        print("Soft time limit exception")
        time.sleep(60 * 3)

soft_time_limit = 60 * 1

hard_time_limit = 60 * 2

result = my_task.apply_async(args=[some_value], soft_time_limit=soft_time_limit, time_limit=hard_time_limit )

Второй вариант — сразу указать ограничения в аргументах декоратора.

@app.task(time_limit=60 * 60, soft_time_limit=59 * 60) # 60/59 min
def generate_report():
    try:
        time.sleep(60 * 2)
    except SoftTimeLimitExceeded:
        print("Soft time limit exception")
        time.sleep(60 * 3)

result = my_task.apply_async(args=[some_value])

В результате, после запуска задачи через одну минуту мы увидим в консоли «Soft time limit exception», а еще через минуту задача будет принудительно завершена.

Сценарий 5: Отмена выполнения задачи по истечение времени

Возможна ситуация, когда определенная задача теряет свою актуальность, если не выполнена в течение какого то времени. Снова рассмотрим на примере генерации отчёта. Пользователь отправил запрос на генерацию отчёта. Задача попала в очередь и за час ни один обработчик не смог её обработать. В таком случае нам нужно отменить «просроченную» задачу. Для этого применяется аргументexpires. Он принимает либо число в секундах, либо объект datetime.

Например:

# Генерация будет отменена через час
generate_report.apply_async((10, 10), expires=3600)

# Генерация будет отменена, если через день задача не начнет выполняться
from datetime import datetime, timedelta, timezone
generate_report.apply_async((10, 10), kwargs,
                expires=datetime.now(timezone.utc) + timedelta(days=1))

Сценарий 6: Повторное выполнение задачи при возникновении ошибки

По какой-то причине в задаче может возникнуть ошибка. В таком случае нам может понадобится механизм повторного выполнения (retry).

Для начала разберем аргументы, которые помогут нам с детальной настройкой. Декоратор @app.task принимает:

  • default_retry_delay[int]: время до следующей попытки в секундах.

  • max_retries[int]: максимальное количество попыток.

  • autoretry_for[list | tuple]: Принимает список или кортеж с исключениями. Автоматический повтор при возникновении ошибки из переданного списка.

  • retry_backoff[bool|int]: при включении, задержка будет расти экспотенциально. Первая повторная попытка будет иметь задержку 1 секунду, вторая повторная попытка будет иметь задержку 2 секунды, третья будет иметь задержку в 4 секунды, четвертая будет иметь задержку в 8 секунд,

  • retry_backoff_max[int]: устанавливает максимальную задержку в секундах. Рекомендуется использовать всегда при использовании retry_backoff, чтобы избежать слишком больших задержек.

  • retry_jitter[bool]: задает случайную задержку. Принцип расчета new_num_of_seconds = random.randrange(retry_backoff + 1) . Соответственно время задержки будет случайным числом от 0 до retry_backoff

Теперь перейдем к коду. Есть два основных способа задействовать механизм retry.
Первый — использовать метод .retry(). Мы можем вызывать его по какому-либо условию.

@app.task(default_retry_delay=30, max_retries=3) # 60/59 min
def generate_report():
    some_condition = some_logic()
    if some_condition:
      generate_report.retry()

result = my_task.apply_async(args=[some_value])

Второй способ — это передать список ошибок при которых нужно выполнить задачу повторно.

@celery_app.task(autoretry_for=(GenerateReportError, SaveReportError, ), default_retry_delay=30,  max_retries=5)
def generate_report():
  ...

Заключение

Отлично! Мы с вами рассмотрели шесть основных сценариев. Надеюсь, что это послужит крепкой основой для ваших будущих задач и дальнейшего погружения в Celery.

Стоит отметить, что в этой статье мы не касались вопросов, связанных с периодическими задачами и использованием celery beat. Это широкая тема, которая заслуживает отдельной статьи, и я надеюсь подробно рассмотреть ее в будущем материале.

© Habrahabr.ru