Celery для новичков
Привет, Хабр!
Celery — это асинхронная распределенная очередь задач, написанная на Python, она предназначена для обработки сообщений в реальном времени при помощи многозадачности. Используя Celery, можно организовать выполнение задач в фоновом режиме, не загружая основной поток приложения.
Используя Celery можно легко организовать выполнение фоновых задач.
Установка осуществляется через pip.
Основные возможности Celery
Определение задач
Создадим экземпляр Celery в файле celery_app.py
:
from celery import Celery
app = Celery('example', broker='your_broker_url_here')
Можно определить фоновые задачи с помощью декоратора @app.task
. К примеру функция, которая просто складывает два числа:
@app.task
def add(x, y):
return x + y
add
— это асинхронная задача. Можно вызвать её с помощью add.delay(x, y)
.
Celery предлагает параметры для настройки задач:
ignore_result
Если не нужен результат выполнения задачи, есть параметр ignore_result
:
@app.task(ignore_result=True)
def add(x, y):
return x + y
rate_limit
ограничивает скорость выполнения задач. Например, если нужно, чтобы задача add
выполнялась чаще, чем 10 раз в минуту, можно настроить rate_limit
:
@app.task(rate_limit='10/m')
def add(x, y):
return x + y
retry
Задача может иногда терпеть неудачу из-за проблем (которые вечно бывают):
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def add(self, x, y):
try:
# Попытка выполнения задачи
return x + y
except SomeTemporaryException as exc:
# Запланировать повторное выполнение задачи
raise self.retry(exc=exc)
Задача попытается выполниться до 3 раз с интервалом в 60 секунд, если возникнет временная ошибка.
Вызов задач
Метод delay()
— это самый простой способ вызвать задачу асинхронно. Под капотом он использует apply_async()
:
from tasks import add
# Вызов задачи асинхронно
result = add.delay(4, 4)
apply_async()
предлагает больше гибкости, позволяя указать различные параметры выполнения: время и приоритет выполнения, а также колбэки и errbacks:
result = add.apply_async((4, 4), countdown=10)
add
будет запланирована к выполнению через 10 секунд после вызова.
signature()
создает подпись задачи, которую можно использовать для создания сложных раб. процессов:
from celery import signature
sig = signature('tasks.add', args=(2, 2), immutable=True)
sig.delay()
chain()
позволяет соединить несколько задач в одну последовательность, где результат одной задачи передается в качестве аргумента следующей:
from celery import chain
# (4 + 4) -> (8 * 10)
res = chain(add.s(4, 4), multiply.s(10))()
group()
используется для параллельного выполнения набора задач. Он возвращает специальный объект GroupResult
, который позволяет отслеживать выполнение группы задач:
from celery import group
# выполняет add(2, 2) и add(4, 4) параллельно
group_result = group(add.s(2, 2), add.s(4, 4))()
chord()
— это комбинация group()
и chain()
, позволяющая выполнить группу задач параллельно и затем вызвать callback-задачу с результатами группы:
from celery import chord
# cначала выполняет add(2, 2) и add(4, 4) параллельно, затем результаты передаются в multiply()
result = chord([add.s(2, 2), add.s(4, 4)])(multiply.s(2))
Различные примеры применения
Масштабируемая система обработки изображений
from celery import Celery
import PIL
from PIL import Image
app = Celery('image_processor', broker='pyamqp://guest@localhost//')
@app.task
def resize_image(image_path, output_path, size):
with Image.open(image_path) as img:
img.thumbnail(size, PIL.Image.ANTIALIAS)
img.save(output_path)
@app.task
def crop_image(image_path, output_path, crop_box):
with Image.open(image_path) as img:
cropped_img = img.crop(crop_box)
cropped_img.save(output_path)
# юзаем
resize_image.delay('path/to/image.jpg', 'path/to/resized_image.jpg', (800, 600))
crop_image.delay('path/to/image.jpg', 'path/to/cropped_image.jpg', (100, 100, 400, 400))
Асинхронная рассылка уведомлений
Celery часто юзают для асинхронной рассылки электронных писем, SMS, или push-уведомлений пользователям. Пример отправки электронных писем с помощью SMTP:
from celery import Celery
import smtplib
app = Celery('notifications', broker='pyamqp://guest@localhost//')
@app.task
def send_email(recipient, subject, body):
server = smtplib.SMTP('smtp.example.com', 587)
server.starttls()
server.login("your_email@example.com", "your_password")
message = f"Subject: {subject}\n\n{body}"
server.sendmail("your_email@example.com", recipient, message)
server.quit()
# пример использования
send_email.delay("user@example.com", "Welcome!", "Thank you for registering with us.")
Асинхронное выполнение длительных задач на примере генерации отчетов
from celery import Celery
import time
app = Celery('reports', broker='pyamqp://guest@localhost//')
@app.task
def generate_report(report_id, parameters):
# операции по извлечению данных, их обработке и анализе
time.sleep(60) # имитация длительной операции
# сохранение или отправка отчета
return f"Report {report_id} generated with parameters {parameters}"
# использование
generate_report.delay("report_123", {"param1": "value1", "param2": "value2"})
Интеграция с Django
Cоздаем проект django и добавляем приложение:
django-admin startproject myproject
cd myproject
django-admin startapp myapp
Создаем файл celery.py
в корневой директории проекта Django (на уровне settings.py
):
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
Добавляем следующие строкиsettings.py
, чтобы указать Celery использовать Redis в качестве брокера сообщений:
# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
Переходим в приложение, созданное внутри проекта Django, и создайте файл tasks.py
. Например, создадим задачу для сложения двух чисел:
# myapp/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
Теперь можно юзать эту задачу асинхронно из моделей или любой другой части Django. Например, добавим вызов задачи:
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
Для выполнения асинхронных задач необходимо запускаем Celery worker:
celery -A myproject worker --loglevel=info
Это запустит worker, который будет слушать и выполнять асинхронные задачи.
Можно еще подрубить мониторинг задач и воркеров с flower:
celery -A myproject flower
Напомню, что в рамках онлайн-курсов OTUS вы можете изучить самые популярные ЯП, а также зарегистрироваться на ряд бесплатных мероприятий.