[Из песочницы] Celery taskcls: новый декоратор, новые возможности

Привет, Хабр! Я расскажу тебе историю своего профессионального подгорания.

Так вышло, что я терпеть не могу рутинных однообразных действий. У меня за плечами несколько проектов, использующих Celery. Каждый раз, когда задача становится сложнее вывода 2 + 2 = 5, шаблон решения сводится к созданию класса, выполняющего задачу, и функции-стартера, с которой умеет работать Celery — бойлерплейта. В этой статье я расскажу, как я боролся с бойлерплейтом, и что из этого вышло.

Logo

Рассмотрим рядовую таску Celery. Есть класс, исполняющий задачу, и функция-стартер, выполняющая инстанцирование класса и запуск одного его метода, в котором реализована вся логика задачи и унаследована обработка ошибок:

class MyTask(
    FirstMixin,
    SecondMixin,
    ThirdMixin,
):
    def main(self):
        data = self.do_something()
        response = self.remote_call(data)
        parsed = self.parser(response)
        return self.process(parsed)

@app.task(bind=True)
def my_task(self, arg1, arg2):
    instance = MyTask(
        celery_task=self,
        arg1=arg1,
        arg2=arg2,
    )
    return instance.full_task()

При этом метод full_task включает в себя вызов main, однако также занимается обработкой ошибок, логгированием и прочей чушью, не имеющей прямого отношения к основной задаче.

В корне тасккласса лежит простая идея: в базовом классе можно определить метод класса task, в нём реализовать поведение функции-стартера, а после наследоваться:

class BaseTask:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def full_task(self):
        try:
            return self.main()
        except:
            self.celery_task.retry(countdown=30)

    @classmethod
    def task(cls, task, **kwargs):
        self = cls(
            celery_task=celery_task,
            **kwargs,
        )
        return self.full_task()

Вся вспомогательная скукотища собрана в базовом классе. Больше к ней не возвращаемся. Реализуем логику задачи:

@app.taskcls(bind=True)
class MyTask(
    BaseTask,
    FirstMixin,
    SecondMixin,
    ThirdMixin,
):
    def main(self):
        data = self.do_something()
        response = self.remote_call(data)
        parsed = self.parser(response)
        return self.process(parsed)

Больше никакой шелухи, уже намного лучше. Однако что же с точкой входа?

MyTask.task.delay(...)

MyTask.task обладает всеми методами обычной таски: delay, apply_async, и, вообще говоря, ей и является.

Теперь аргументы декоратора. Особенно весело тащить bind = True в каждую таску. Можно ли передать аргументы по умолчанию через базовый класс?

class BaseTask:
    class MetaTask:
        bind = True

    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def full_task(self):
        try:
            return self.main()
        except:
            self.celery_task.retry(countdown=30)

    @classmethod
    def task(cls, task, **kwargs):
        self = cls(
            celery_task=celery_task,
            **kwargs,
        )
        return self.full_task()

Вложенный класс MetaTask содержит аргументы по умолчанию и будет доступен всем дочерним классам. Интересно, что и его можно унаследовать:

class BaseHasTimeout(BaseTask):
    class MetaTask(BaseTask.MetaTask):
        timeout = 42

Наивысшим приоритетом обладают аргументы, переданные декоратору @app.taskcls:

@app.taskcls(timeout=20)
class MyTask(
    BaseHasTimeout,
    FirstMixin,
    SecondMixin,
    ThirdMixin,
):
    def main(self):
        ...

В итоге timeout для задачи будет 20.

В web-приложениях часто есть необходимость из view запустить таску. В случае высокой сцепленности view и таски их можно совместить:

class BaseViewTask:
    @classmethod
    def task(cls, **kwargs):
        # Somehow init View class manually
        self = cls(...)
        return self.celery()

@app.taskcls
class MyView(
    BaseViewTask,
    FirstMixin,
    SecondMixin,
    ThirdMixin,
    APIView,
):
    queryset = MyModel.objects.all()

    def get_some_data(self, *args, **kwargs):  # common methed
        return self.queryset.filtert(...)

    def get(self, request):
        data = self.get_some_data(request.field)  # used in request handling
        return Response(json.dumps(data))

    def post(self, request):
        self.task.delay(...)
        return Response(status=201)

    def celery(self):
        data = self.get_some_data(...)  # also used in background task
        return self.do_something(data)

Кстати, именно для исключения коллизии имён вложенный класс называется MetaTask, а не Meta, как в django.

Эта функциональность ожидается в Celery 4.5. Однако я также подготовил пакет, позволяющий попрбовать декоратор taskcls уже сегодня. Идея пакета сводится к тому, что при обновлении Celery до версии 4.5 вы сможете убрать его импорт не меняя более ни строчки кода.

© Habrahabr.ru