Как вкусно приготовить «Сельдерей» (Celery)

То, что мы больше всего уважаем в python сообществе — это небольшой разброс в подходах и довольно стабильный набор архитектурных конфигураций. В своей практике мы можем часто встретить Django проекты с довольно типовой архитектурой, когда в качестве инструмента для фоновой обработки задач используется Celery.

Celery можно любить или не любить, но избежать работы с ним практически невозможно. Однако, не все инженеры задумываются о том, что происходит в момент вызова Celery задач. И в этой статье я хочу рассказать, как именно происходит вызов и отправка сообщения, к чему может привести игнорирование того, что Celery под собой имеет транспорт, и что может произойти, если этот транспорт начнет сбоить.

сгенерированно с помощью https://shedevrum.ai

сгенерированно с помощью https://shedevrum.ai

Это база

Мы не будем концентрироваться на разработке с нуля, потому что про сам Celery и как писать задачи уже существует достаточно много хороших материалов и так же доступна очень хорошая документация на официальном сайте с примерами кода.

Итак, пусть у нас есть веб приложение на Django, и в качестве брокера сообщений мы используем RabbitMQ. RabbitMQ здесь выбран для примера, в общем случае брокер сообщений может быть любым.

Предположим, есть API метод (создание какой-либо сущности) и во время его выполнения требуется собирать дополнительную аналитику или делать замеры времени исполнения метода и т.д. И для этих второстепенных задач используются третьи сервисы. Время выполнения которых, а также их успех или неудача не должны влиять на выполнение API метода. Соответственно, мы решаем реализовать эти второстепенные задачи с помощью Celery.

Пример API метода:

class CreateTaskSimpleView(APIView):
    def post(self, request: Request) -> Response:
        result: AsyncResult = simple_task.delay(
            request.data.get("a", 1),
            request.data.get("b", 2)
        )
        return Response(
            data={"task_id": result.task_id if result else None}
        )

Где simple_task — это второстепенная задача, которая может завершиться неуспешно:

@shared_task(**settings.CELERY_DEFAULT_BIND_TASK_CONF)
def simple_task(self: Task, a: int, b: int):
    return a / b

settings.CELERY_DEFAULT_BIND_TASK_CONF  содержит настройки по-умолчанию:

CELERY_DEFAULT_BIND_TASK_CONF = {
    'bind': True,
    'queue': 'default'
}

Процесс отправки собщений в очередь брокера

В примере выше используется простой вызов Celery через метод delay. Здесь мы «напрямую» вызываем задачу без всяких возможных исключений.

На практике других способов вызова Celery можно и не встретить. И это то о чем я и упомянул в начале — не все могут задумываться, что организация отправки сообщений брокеру устроена гораздо сложнее, чем просто вызов одного метода. И что на этом этапе может что-то сломаться.

Рассмотрим процесс отправки сообщений:

84c4a21a04981e9112fb8cc9d7dea30b.png

В процессе участвуют kombu и pyamqp — это части экосистемы Celery и разрабатываются и поддерживаются проектом Celery.

Kombu — Уровень транспорта. Здесь решается какой клиент брокера использовать.

pyamqp — Непосредственно клиент для брокера сообщений.

Как видно на схеме, после вызова метода delay (или apply_async) происходит следующее:

  1. Вызов менеджера соединений для получения/создания соединения с брокером. Менеджер хранит и управляет пулом соединений.

  2. Упаковка сообщений в соответствии с протоколом брокера.

  3. Непосредственный вызов метода клиента брокера для записи сообщения в очередь RabbitMQ.

Т.е как мы видим совершается довольно много работы, даже если смотреть поверхностно. 

После того как сообщение записано, спустя какое-то время в очередь зайдет свободный обработчик (Celery.worker), заберет сообщение и начнет его обрабатывать.

Celery очень сложный внутри?

В процессе исследования работы Celery была составлена такая схема, но к сожалению история и комментарии всех стрелочек была утеряна, но надеюсь общий принцип понятен. Все очень сложно.

ff7fe260f6c1aaaeab1ad4cce89b5355.png

Не сломаться сразу

А теперь вернемся к нашему примеру и, допустим, «моргнет сеть», тогда вызов Celery задачи завершится с исключением и наш метод API вернет 500, 502 или 504 (в зависимости от конфигурации окружения). И именно таких ситуаций мы хотим избежать, так как эти задачи второстепенны и не должны влиять на основной бизнес процесс.

retry

Первое что может прийти в голову — нас спасет опция retry, однако если прочитать официальную документацию внимательнее, то станет ясно, что эта опция для обработчиков (Celery.worker) в непосредственный момент исполнения задачи. Через retry мы указываем сколько попыток и с какими паузами обработчику можно пытаться выполнить задачу. Но сейчас мы говорим про момент отправки сообщения в очередь и опция retry здесь никак не защищает.

На самом деле у Celery есть опции «retry» для отправки сообщений. Но мы этими опциями слабо управляем и они конфигурируются на уровне транспорта. По умолчанию у клиента брокета есть 100 попыток, но их тоже может не хватить и мы все равно получим исключение, в случае если брокер полностью недоступен.

Получается, явного retry в методе delay не существует.

try/catch

Таким образом первым рабочим вариантом защиты будет оборачивание вызова Celery задачи в try/catch.

Например так:

def wrapped_simple_task(a: int, b: int) -> AsyncResult:
    try:
        return simple_task.delay(a, b)
    except Exception as ex:
        logger.error("error while calling celery.delay: %s",ex)

И теперь задачу можно вызывать используя враппер:

class CreateTaskWrappedView(APIView):
    def post(self, request: Request) -> Response:
        result: AsyncResult = wrapped_simple_task(
            request.data.get("a", 1),
            request.data.get("b", 2)
        )
        return Response(
            data={"task_id": result.task_id if result else None}
        )

И на этом моменте можно было бы и остановиться. Однако, при работе на проекте с большим легаси изменение вызова всех Celery задач будет трудозатратным, даже если использовать декоратор.

Но есть более элегантное решение. У Celery отличное API и мы можем переопределить и указать базовый класс задач Celery при старте приложения.

Например:

celery_task_cls = os.environ.get('CELERY_TASK_CLS', 'celery.app.task.PatchedTask')
app = Celery('example', task_cls=celery_task_cls)

Где наш класс celery.app.task.PatchedTask переопределяет метод delay (или apply_async) и нам не потребуется вносить изменения во всех местах вызова Celery задач.

celery.app.task.PatchedTask

class PatchedTask(celery.app.task.Task):
    try_apply_async = True  # wrap to try_except by default behaviour
    propagate_exception = True  # propagate exception by default

    @contextmanager
    def wrap_connection_exceptions(self):
        connection_succeed = True
        try:
            yield
        except transport_errors as exc:
            connection_succeed = False
            raise exc
        finally:
            logger.debug("celery.task.connection.succeed | %s", connection_succeed)

    @contextmanager
    def wrap_apply_async_exceptions(self):
        apply_succeed = True
        try:
            with self.wrap_connection_exceptions():
                yield
        except Exception as e:
            apply_succeed = False
            logger.error("celery.task.apply_async.failed | %s", self.name)
            if self.propagate_exception:
                raise CeleryTaskApplyException(e)
        finally:
            logger.debug("celery.task.apply_succeed | %s", apply_succeed)

    def apply_async(
            self,
            args=None,
            kwargs=None,
            task_id=None,
            producer=None,
            link=None,
            link_error=None,
            shadow=None,
            **options,
    ):
        logger.debug("%s called by apply_async", self.name)

        if get_connection().in_atomic_block:
            logger.warning("celery.task.apply_async.in_atomic_block | %s", self.name)

        if not self.try_apply_async:
            return super().apply_async(
                args, kwargs, task_id, producer, link, link_error, shadow, **options
            )

        with self.wrap_apply_async_exceptions():
            return super().apply_async(
                args, kwargs, task_id, producer, link, link_error, shadow, **options
            )

Лучше спросите у DevOps

сгенерированно с помощью https://shedevrum.ai

В современном мире приложения и веб сервисы не запускаются одной простой python командой. Для запуска используется Web Server Gateway (например uwsgi или gunicorn) и приложение может работать в своем docker контейнере. Также для раздачи статики и реализации reverse proxy скорее всего будет использоваться Nginx и он тоже может быть запущен в своем docker контейнере. В свою очередь, все это может работать на отдельном POD или Виртуальной Машине (мне кажется, что аллегория с бургером здесь будет уместа).

e5fccbb66a8fe44f4bd921a1bc2ac0e7.png

Соответственно в зависимости от количества «ингридиентов» и нагрузки на приложение мы можем получить разные варианты развития событий, в случае внезапной катастрофы.

Рассмотрим, что случится если мы используем вызовы Celery задач в транзакции, и у нас возникнут проблемы с отправкой сообщений в брокер. Как это может негативно повлиять на СУБД и на приложение в целом.

А именно:

  • С учетом количества попыток отправки сообщений сама операция try/catch может занимать существенное время. В этом случае можно легко не уместиться в statement_timeout (в случае postgresql). Что приведет к ошибке 500 Internal Server Error.  

    Существенное это сколько?

    У клиента брокера по умолчанию 100 попыток установить соединение. К тому же timeout увеличивается с каждой попыткой. Тогда операция try/catch может ожидать 100 * Хy ms. Пусть у нас таких вызовов больше чем 1, например N и сама операция метода API тоже требует времени и занимает M ms. Итого получаем M + N * (Хy * 100) ms

  • Также в случае, если используется uwsgi сервер и harakiri сработает раньше, то мы получим ошибку 502 Bad Gateway. 

  • Или если используется nginx и proxy_connect_timeout (proxy_read_timeout, proxy_send_timeout) сработает раньше, то мы получим ошибку 504 Gateway Timeout.

  • Также в зависимости от окружения и настроек healthcheck (если в метод healthcheck добавлена проверка доступности брокера сообщений) может сработать аварийная перезагрузка POD или Виртуальной Машины (ВМ). 

  • В худшем случае из-за некорректного завершения процесса (выключение POD/ВМ) мы получим зависшее соединение с СУБД. Что повлечет за собой увеличение нагрузки на саму СУБД, на менеджер соединений (например pgbouncer), и, если БД является основным источником данных, приложение может стать полностью недоступным.

  • Также POD/ВМ может вообще больше не запустить успешно, потому что по умолчанию broker_connection_retry_on_startup = True и если брокер недоступен мы получим задержку при попытке запуска и может сработать timeout ожидания Readiness probe в случае K8S и мы получим новый цикл попытки и как следствие можем увидеть CrashLoopBackOff.

951b5fdec11f123cc8117799fda30ab6.png

Итого

Недоступность брокера может стать причиной повышенной нагрузки на БД, что может послужить триггером для healthckech и за этим может последовать цикличное отключение POD/ВМ и как результат полная недоступность приложения. Т.е может получиться лавинообразное отключение системы. И самое сложное то, что SRE команде будет трудно определить руткоз проблемы, потому что пострадавших узлов будет очень много.

Как всего этого избежать и минимизировать риски

Как всегда и бывает — серебряной пули не существует. И вызов Celery задач в try/catch не спасет нас полностью, если не учесть следующее:  

Не забывать про «правило быстрых транзакций» — обращение к БД и особенно выполнение транзакции должно происходить максимально быстро. И конечно же не проводить долгие вычисления под транзакцией, или особенно делать вызовы внешних сервисов, а Celery обращается к брокеру, о чем многие забывают.

В примере выше, в измененном классе Task я добавил проверку и логирование, что поможет выявить все места, где Celery вызывается в транзакции. После этого нужно перенести все вызовы за пределы транзакции, либо использовать метод transaction.on_commit ().

Так же имеет смысл пересмотреть необходимость использования проверки работы Celery в методе healthcheck. По хорошему приложение не должно зависеть от работоспособности брокера сообщений и лучше добавить логгирование и алерты и реализовать паттерн fallback на случай отказа какого-либо транспорта или узла системы. Но если все же основная функциональность зависит от Celery, то лучше подумать о реализации паттерна Outbox (Например про это есть статья на habr).

Если в healthcheck присутствует проверка доступности брокера сообщений, то стоит обратить внимание на ее реализацию. Бывает что эта логика опирается на проверку пула менеджера соединений, что не корректно. Потому что пул может быть не пустой, но все соединения «мертвые», и пока из пула не выброшены. Их инвалидация произойдет при первой попытке вызвать задачу Celery. Соответственно может получиться ситуация, когда healthcheck будет отвечать 200 OK, потому что пул не пустой, но приложение уже не будет работать.

Бонус

Так же для наглядной демонстрации я подготовил небольшой проект на githab со всеми примерами выше.

Ссылка на репозиторий

Как использовать:

  • Скачать

  • Запустить make docker-up-all

  • Устроить «катастрофу» make disaster

    • Убедиться что метод без обертки отвечает 500 make call-task-simple

    • Убедиться что метод с оберткой ответчает 200 make call-task-wrapped

  • «Полечить» make heal

  • Все почистить make docker-down

Спасибо

Хочется сказать отдельное спасибо моей жене и главному редактору @avasileva86 за поддержку и непосильную помощь в процессе написания этой статьи.

© Habrahabr.ru