Integrando Celery con Django: programar una tarea periódica

En el post de hoy vamos a ver como integrar Celery con Django para programar una tarea periódica.

Qué es Celery

Celery es una cola de tareas asíncrona Open Source, basada en el envío distribuido de mensajes, que nos permite operaciones a tiempo real, pero también, operaciones programadas o periódicas. Es decir, nos permite enviar tareas a una cola, para que se ejecuten cuando sea posible.

¿Para que me sirve celery?

Hay veces que necesitamos ejecutar trabajos intensivos en recursos, que requieren tiempos de ejecución prolongados, lo que generaría un problema para cualquier aplicación donde haya interacción con el usuario, ya que bloquearía esa interacción hasta que finalizara el trabajo.

Si hablamos de aplicaciones web, donde hay diversos timeouts asociados (Web Server, navegador), Celery permite desacoplar la ejecución de estos procesos, de la aplicación con la que interactua el usuario, enviando los trabajos a un message broker, que los distribuirá en celery workers para que los ejecuten.

Las unidades de ejecución, denominadas tasks se ejecutan de forma concurrente por uno o varios servidores de trabajo, y se pueden ejecutar tanto en background, como de forma síncrona (esperando hasta que se completen)

La gracia en el entorno web es que podemos distribuir las tasks entre varios threads de trabajo, que pueden estar en diferentes servidores del que ejecuta la aplicación, para no afectar al rendimiento del servidor de la aplicación web.

Message Broker

Para enviar tareas a los threads de trabajo, Celery utiliza un sistema de envío y recepeción mensajes (cola de mensajes), lo que soluciona a través de un servicio aparte denominado message broker. El Message Broker recomendado para usar con Celery es RabbitMQ, pero también ofrece soporte limitado para usar Redis, Beanstalk, MongoDB, CouchDB y también bases de datos (usando SQLAlchemy o Django ORM).

RabbitMQ

RabbitMQ (MQ viene de Messages Queue), es un sistema de mensajería para aplicaciones basado en colas, muy potente y escalable, desarrollado con Erlang. Es muy completo en cuanto a características, estable, y de facil instalación. Desde Celery lo proponen como una elección excelente para el entorno de producción. Sí queréis indagar más en el tema, podéis mirar en Using RabbitMQ.

Instalando Celery y RabbitMQ

Recordemos que los comandos son para el supuesto de trabajo sobre Debian o Ubuntu, y usamos entornos virtuales mediante virtualenvwrapper

Instalemos RabbitMQ

$ sudo apt-get install rabbitmq-server

Tras la instalación, el broker debería estar listo y corriendo en background preparado para dirigir mensajes: Starting rabbitmq-server: SUCCESS.

Instalemos Celery

Nos movemos a nuestro entorno virtual e instalamos Celery:

$: workon mientorno
(mientorno)$:pip install celery

Estructura de un proyecto Django con Celery

Lo primero que vamos a hacer es ver como deberíamos estructurar el proyecto. Considerando que disponemos de la estructura típica, añadiremos los archivos celery.py y tasks.py como se muestra a continuación:

miProyecto
    /db.sqlite3
    /manage.py
    /miAplicacion
        /tasks.py
        /…
    /miProyecto
        /__init__.py
        /urls.py
        /wsgi.py
        /settings.py
        /celery.py

Fichero celery.py

En este fichero definiremos la instancia de Celery:

#Activamos los imports absolutos para evitar conflictos entre packages
from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

#indicamos el Django settings module por defecto para nuestro programa celery (no es necesario pero asi evitamos tener que pasarle siempre al programa celery nuestro módulo settings)
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘miProyecto.settings‘)

app = Celery(‘miProyecto‘)

#Añadimos el Django settings module como fuente de configuración de Celery (nos permite configurar Celery directamente desde el Django settings). Al pasarlo como string nos ahorramos un problema si trabajasemos con windows.
app.config_from_object(‘django.conf:settings’)

# Si tenemos nuestras tareas en un fichero de nombre tasks.py, esto nos permite indicarle a celery que encuentre automáticamente dicho módulo dentro del proyecto. De este modo no tenemos que añadirlo a la variable CELERY_IMPORTS del settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

#ejemplo de tarea que muestra su propia información. El bind=True indica que hace referencia a su instancia de tarea actual.
@app.task(bind=True)
def debug_task(self):
    print(‘Request: {0!r}’.format(self.request))

Nótese que he resaltado en cursiva la llamada al fichero settings de nuestro proyecto así como la instanciación de Celery (en ambos casos tendremos que personalizarlo con el nombre del directorio adecuado) así commo la definición de una tarea demostrativa mediante el decorador task

DETALLE: Imports Absolutos

En Python 2.x, cuando realizamos algo del estilo import string, primero se busca en nuestros packages relativos del proyecto, y si no encuentra nada, busca en los directorios absolutos del sys.path. Normalmente no queremos ese comportamiento, y existe la posibilidad de indicar que busque en los directorios relativos con la inclusión de un punto (from . import string), así que para asegurarnos de que los imports que hacemos sin usar el . son absolutos, habilitamos esta característica mediante la llamada from __future__ import absolute_import

Importar la instancia de Celery

La instancia app de Celery que acabamos de definir, tiene que importarse con nuestro módulo, para asegurarnos que se importa al iniciar Django para que sea utilizada por el decorador @shared_task (que veremos más adelante), por lo que la llamaremos en el fichero miProyecto/miProyecto/__init__.py como vemos a continuación:

from __future__ import absolute_import
from .celery import app as celery_app

Fichero tasks.py

Creamos un fichero de demostración de tareas dentro de miProyecto/miAplicacion/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

Decorador @shared_task

La idea es que nuestras tareas vivan en una app reusable, que por definición no puede depender del propio proyecto, así que no podremos importar la instancia directamente. En lugar de eso, utilizamos el decorador @shared_task que nos permite crear tareas sin tener ninguna instancia concreta de nuestro programa Celery.

Usando RabbitMQ como Message Broker

Anteriormente hemos tomado la decisión de unsar RabbitMQ como Message Broker, y lo hemos instalado en el sistema, pero tenemos que indicarle a Celery que lo use. Para eso, y dado que hemos establecido el archivo settings.py como fuente de configuración de Celery en pasos anteriores, solo tenemos que editar el fichero miProyecto/miProyecto/settings.py para incluir la siguiente variable:

BROKER_URL = ‘amqp://guest:guest@localhost//’

Es interesante destacar que el broker podría no estar en el mismo servidor que ejecuta las aplicaciones Django, es por eso que el broker se especifica con una URL.

Backend de Resultados: guardando los resultados en Django

Para almacenar información del estado de las tareas, Celery necesita guardar o enviar los estados a algún sitio. Hay varios backends disponibles para esto, por lo que podemos elegir entre:SQLAlchemy, Django ORM, Memcached, Redis, AMQP (RabbitMQ), y MongoDB – o incluso puedes definirte el tuyo propio.

Evidentemente, en nuestro caso vamos a usar Django ORM

Para utilizar Django como nuestro backend de resultados, deberemos seguir 4 pasos:

  • Instalar primero la librería django-celery

    (mientorno)$:pip install django-celery

  • Añadir djcelery como INSTALED_APPS en el fichero de settings (miProyecto/miProyecto/settings.py)

    INSTALLED_APPS = (
        ‘django.contrib.auth’,
        ‘django.contrib.contenttypes’,
        ‘django.contrib.sessions’,
        ‘django.contrib.sites’,
        ‘django.contrib.messages’,
        ‘django.contrib.staticfiles’,
        ‘miAplicacion’,
        ‘djcelery’,
        # otras apps que use mi proyecto,
    )

  • Crear las tablas de la base de datos de celery (tanto para guardar los resultados, como para el scheduler de tareas periódicas)

    • En caso de utilizar south

      (miEntorno)$:./manage.py migrate djcelery

    • En caso contrario

      (miEntorno)$:./manage.py syncdb

  • Configurar Celery para usar el backend django-celery, añadiendo la variable correspondiente directamente en nuestro fichero settings.py:

    • Si queremos usar la base de datos como backend:

      CELERY_RESULT_BACKEND=’djcelery.backends.database:DatabaseBackend’

    • Si queremos usar la cache como backend:

      CELERY_RESULT_BACKEND=’djcelery.backends.cache:CacheBackend’,

Iniciando el proceso de trabajo

En un entorno de producción, habría que ejecutar el worker en background como un daemon, pero para probarlo en nuestro entorno de desarrollo será suficiente con ejecutar el siguiente comando:

(miEntorno)$:celery -A miProyecto worker -l info

Probando la ejecución de tareas

Una vez hemos iniciado desde terminal el proceso de trabajo (comando anterior), si no ha habido ningún error, nos habrá salido el mensaje de que está listo, y el proceso estará vivo en ese terminal, por lo que:

  • Abrimos un nuevo terminal
  • Cargamos nuestro entorno virtual

    $:workon miEntorno

  • lanzamos la consola de django

    (miEntorno)$: ./manage.py shell

  • Y ahora podemos utilizar los métodos add, mul y xsum que hemos definido en el modulo tasks:

    >>> from miAplicacion.tasks import add
    >>> result = add.delay(2, 7)

Llegado este punto podemos ver como en el otro terminal sale un mensaje que dice Received task: miAplicacion.tasks.add

Podemos mirar el valor resultante desde nuestra consola django con:

>>> result.ready()
True
>>> result.get()
9

Método delay()

Para llamar a una tarea, debemos usar el nombre de la misma (siguiendo nuestro ejemplo, add) y llamar al método delay, junto con los argumentos de la tarea.
Así pues, si la tarea add la hemos definido como:

@shared_task
def add(x,y)

A la hora de llamarla como tarea con los argumentos x=1, e y=3, ejecutaremos:

>>>add.delay(1,3)

Tareas Periódicas

Para ejecutar tareas periódicas, Celery utiliza Celery Beats, que es un planificador que nos permitirá lanzar tasks a intérvalos regulares que serán ejecutados por los workers disponibles en el cluster.

Algo que hay que recordar al usar Celery Beats es su planteamiento centralizado: Tienes que garantizar que solo haya funcionando un único planificador a la vez para un mismo calendario, ya que en caso contrario se podrían dar casos de tareas duplicadas.

En este sentido hay que tener en cuenta que las tareas se pueden llegar a solapar si su tiempo de ejecución es superior al intérvalo planificado por lo que habrá que vigilar con este tema, y en caso de ser un punto conflictivo, habría que diseñar una estratégia mediante locks

Entradas

Para planificar una tarea periódicamente, tenemos que añadir una entrada en el parámetro CELERYBEAT_SCHEDULE de los settings.

En nuestro caso, vamos a ejecutar cada 30 segundos la tarea add que hemos definido anteriormente para realizar la suma de los valores 15 y 35, por lo que abrimos nuestro fichero miProyecto/miProyecto/settings.py y añadimos:

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    ‘add-every-30-seconds’: {
        ‘task’: ‘miAplicacion.tasks.add’,
        ‘schedule’: timedelta(seconds=30),
        ‘args’: (15, 35)
    },
}

#Si no usamos CELERY_TIMEZONE, se usará por defecto el TIME_ZONE que use nuestra app Django
CELERY_TIMEZONE = ‘Europe/London’

Además del nombre de la tarea, la frecuencia de ejecución, y los args (esto permite listas y tuplas), podemos pasarle un diccionario como kwargs, opciones de ejecución, etc

Consideraciones de tiempo

La zona horaria por defecto de Celery es UTC, pero como véis, podemos usar la variable CELERY_TIMEZONE en los ajustes para cambiarlo.
Además, en el caso de Django, si no se especifica el CELERY_TIMEZONE se toma por defecto el TIME_ZONE que hayamos definido en nuestros settings, que es el que usará nuestro proyecto Django.

En caso de efectuar cambios en la zona horaria, habrá que actualizar la base de datos de Celery Beat para que se actualice el planificador. Para eso, hacemos:

(miEntorno)$:./manage.py shell
>>> from djcelery.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)

Planificaciones

No sólo podemos definir con qué frecuencia debe ejecutarse la tarea, si no que podemos decirle que se ejecute un día y hora concretos, mediante el tipo de planificador crontab

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    ‘add-every-monday-morning’: {
        ‘task’: ‘miAplicacion.tasks.add’,
        ‘schedule’: crontab(hour=7, minute=30, day_of_week=1),
        ‘args’: (15, 35),
    },
}

En la documentación de Celery para Celery Beat, hay una tabla detallada que explica la sintaxis para crontab, y muestra lo complejas que se pueden hacer las planificaciones (cosas del estilo de ejecútate cada 10 minutos, pero solo de 3-4 am, 5-6 pm 9-10 pm los Jueves y los Viernes)

Lanzando el Planificador

Para lanzar el servicio celery-beats, hay un planificador por defecto (que guarda el registro de las últimas ejecuciones en una base de datos local), pero se pueden usar otros, indicándolo con el parámetro -S

En nuestro caso, utilizaremos el planificador que nos proporciona django-celery, que guarda las planificaciones en la BBDD de Django, y que nos permitirá añadir, modificar o elminar dichas planificaciones desde el propio panel de administración Django Admin.

Para lanzar nuestro planificador, ejecutamos:

(miEntorno)$:celery -A proj beat -S djcelery.schedulers.DatabaseScheduler

Entorno de producción

Para lanzar tanto el worker como el planificador en el entorno de producción, deberíamos ejecutarlos como daemons.

Ampliaré este tema cuando tenga algo más de tiempo, pero por ahora puedo dirigiros al tutorial de daemonización de Celery

Espero que este tutorial haya sido de ayuda. ¡Saludos!