Celery предоставляет не только базовый функциональнал для выполнения задач, но и множество дополнительных возможностей. Далее речь пойдет о наиболее интересных из них.
Обработка ошибок
При выполнении задач могут возникать ошибки. Celery предоставляет механизм обработки ошибок, который позволяет перехватывать и обрабатывать ошибки в задачах.
from celery.exceptions import Retry
@task(bind=True, max_retries=3)
def my_task(self):
try:
# выполнение задачи
except Exception as exc:
self.retry(exc=exc, countdown=5)
В этом примере мы использовали декоратор task()
для определения задачи и указали максимальное количество повторных попыток выполнения задачи, если возникнет ошибка. Если в процессе выполнения задачи возникает исключение, мы вызываем метод retry()
с указанием причины ошибки и времени задержки перед следующей попыткой выполнения задачи. Важно также то, что для вызова retry()
мы использовали параметр bind=True в декораторе, позволивший обрашаться к методу задачи из его тела.
Распределение задач на несколько воркеров
Для обеспечения высокой производительности и масштабируемости Celery позволяет распределять задачи на несколько воркеров. Для этого необходимо запустить несколько воркеров с помощью команды celery worker
.
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
Таким образом, задачи отправляются в очередь, а затем один или несколько воркеров из пула получают задачи из нее и начинают их выполнение. Также количество воркеров в пуле можно настроить в конфигурационном файле Celery.
Кэширование результатов задач
Если задача возвращает некоторый результат, то его можно закэшировать, чтобы при последующих вызовах задачи не происходило повторного вычисления результата. Для этого Celery предоставляет механизм кэширования результатов задач.
from celery import Task
class CachedTask(Task):
cache = {}
def run(self, *args, **kwargs):
key = self.get_cache_key(*args, **kwargs)
if key not in self.cache:
self.cache[key] = super().run(*args, **kwargs)
return self.cache[key]
def get_cache_key(self, *args, **kwargs):
return str(args) + str(kwargs)
В этом примере мы определяем класс задачи CachedTask
, который кэширует результаты выполнения задач. Мы определяем переменную класса cache
, которая будет использоваться для хранения результатов задач. Метод run()
перехватывает вызов задачи и проверяет, есть ли результат в кэше. Если результат есть в кэше, то он возвращается, иначе задача выполняется и результат сохраняется в кэше.
Мониторинг и управление
Celery также предоставляет множество инструментов для мониторинга и управления задачами. Например, Celery Flower - это веб-интерфейс для мониторинга и управления выполнением задач.
Чтобы установить и запустить Celery Flower, необходимо выполнить следующие команды:
pip install flower
celery flower --app=your_project_name --broker=amqp://guest@localhost//
Здесь мы устанавливаем пакет flower
и запускаем Celery Flower с помощью команды celery flower
. Мы указываем имя проекта и URL брокера сообщений. После запуска Celery Flower будет доступен по адресу http://localhost:5555
.
В интерфейсе Celery Flower вы можете просмотреть список запущенных задач, мониторить их выполнение, просматривать журналы задач и многое другое.
Ограничения времени выполнения задач
Celery позволяет устанавливать ограничения времени выполнения задач. Если задача не завершается в указанное время, то она будет отменена. Для этого можно использовать опцию time_limit
при запуске задачи.
from myapp.tasks import my_task
# Ограничение времени выполнения задачи до 10 секунд
result = my_task.apply_async(args=[data], time_limit=10)