Celery and Django: Creating Asynchronous Tasks

Nowadays, data processing and analysis is increasingly required within web applications. Unfortunately, the required execution time can sometimes be too large to handle requests asynchronously. In this tutorial we discover how to use Celery in a Django project to create asynchronous tasks for our needs.

Share

Share on facebook
Share on linkedin
Share on twitter
Share on email
Reading time: 8 minutes

Web applications nowadays require to be faster and faster to meet user needs. There are studies that show that a site loading time of more than 4 seconds can significantly reduce user engagement and consequently a site’s revenue. Think about when you’re using an e-commerce, for example. If the navigation is complex and page loading is slow, you won’t be enticed to buy on that platform unless it’s strictly necessary!

Although more and more speed is required, the amount of data that needs to be processed is increasing all the time. The calculations that need to be done sometimes take many seconds, if not minutes and hours. It is therefore unthinkable to handle some calculations synchronously. If you have to use a distributed model that optimizes performance and at the same time is not blocking for the main application.

When you need to set up asynchronous and distributed tasks and program them in Python, the predominant framework is Celery.

Celery is a package that implements the message queuing model for distributed computing on one or more nodes by exploiting the Advanced Message Queuing Protocol (AMQP), an open standard application-level protocol for developing message-oriented middleware.

Celery is not only a framework for executing asynchronous and distributed tasks, but it can also be used for many other purposes. For example, it’s possible to schedule the execution of some tasks in a cron-like way, or using celery chords to execute map-reduce distributed tasks on large amounts of data.

In this article we’ll see how to configure the latest version of Celery 5.x both in standalone mode and inside Django applications.

Prerequisites

To make the work highly transportable from one location to another we will use Docker and Docker Compose. We therefore recommend reading the articles Introduction to Docker and Docker compose – how to orchestrate different containers to get an idea of the technology we will use.

Within our Docker project we are going to create several containers. The Python version required by Celery must be 3.6 or higher. We also need to choose a broker. This represents the middleware that facilitates communication between various Python services in a continuous and distributed manner. Therefore, it is important to choose the right broker from the beginning based on its features and the context in which it operates.

The brokers supported by Celery 5.x are as follows:

  • RabbitMQ is complete, stable, durable and easy to install. It is usually the recommended choice for a production environment.
  • Redis provides the same functionality as RabbitMQ, but is more susceptible to data loss in the event of a sudden outage or power failure.
  • AWS SQS is the message queue management service provided by Amazon. Although there is the possibility to have the service for free within certain usage limits, its configuration is slightly more complex. Therefore, it is suitable for microservice-based applications that require high reliability and high workload.

In addition to these, Zookeeper is also supported on an experimental basis. Since there is no direct support for Celery it is not recommended to use it for production environments.

To simplify the configurations, in this tutorial we will use Redis. In case you want to try with other brokers you only need to change the connection parameters and install some packages if they are needed. Please refer to the official Celery guide for the various broker configurations. In the case of Redis it is also possible to use a cloud version of the service: redislab. Among the various options there is a free package ideal for testing. 

Django project structure

Before developing Docker we need to create a basic structure of our Django project. To do this we use pipenv to create a minimal python environment with the essential packages to set up the project structure. We start with the following command

pipenv shell 

It will then install a small python environment that we can configure without changing the system configurations. To create the project we will only need Django. In fact all other packages will be installed inside the Docker that will host our application. We install the package using the command:

pip install Django 

Once the installation is finished we can proceed with creating the project and the apps we need. In this tutorial we will call the project celerytutorial. Let’s then create an app called bitcoins inside which we’re going to define a task to retrieve the price of bitcoins. You can find all the Django commands in the official documentation. Below are the ones used for our project.

django-admin startproject celerytutorial
cd celerytutorial
python manage.py startapp bitcoins 

Inside the project folder we are going to add the folders _config, _static and _templates. These will serve us respectively to indicate the packages to install for python and javascript, the static files of our site (css, javascript, images), and finally the templates of the various applications. You can use, of course, the structure you prefer.

Docker Setup

In this tutorial we will use a docker compose configuration for development. We will then use the Django server to run our application. If you were to provide the application in a production environment, you would add a web server such as Nginx or Apache.

The docker-compose file to place in the main project folder is as follows.

version: '3'

services:
  celerytutorial:
    container_name: celerytutorial
    build: ./celerytutorial/
    entrypoint: ['sh', '/data/web/celerytutorial_setup.sh']
    volumes:
      - ./celerytutorial:/data/web
      - celerytutorial_static:/assets
      - celerytutorialstatus:/celerytutorial_status
    working_dir: /data/web
    restart: always
    ports:
        - "8000:8000"
    depends_on:
      - celerytutorialmongodb1
      

  celerytutorialmongodb3:
    image: mongo:4
    restart: always
    container_name: celerytutorialmongodb3
    volumes:
      - celerytutorialmongodata3:/data/db
    expose:
      - "27017"
    entrypoint: [ "/usr/bin/mongod", "--replSet", "rscelerytutorial", "--bind_ip_all", "--wiredTigerCacheSizeGB", "1" ]
    
  celerytutorialmongodb2:
    image: mongo:4
    restart: always
    container_name: celerytutorialmongodb2
    volumes:
      - celerytutorialmongodata2:/data/db
    expose:
      - "27017"
    entrypoint: [ "/usr/bin/mongod", "--replSet", "rscelerytutorial", "--bind_ip_all", "--wiredTigerCacheSizeGB", "1"]

  celerytutorialmongodb1:
    image: mongo:4
    restart: always
    container_name: celerytutorialmongodb1
    volumes:
      - celerytutorialmongodata1:/data/db
    expose:
      - "27017"
    ports:
      - "27018:27017"
    entrypoint: [ "/usr/bin/mongod", "--replSet", "rscelerytutorial", "--bind_ip_all", "--wiredTigerCacheSizeGB", "1"]
    
  celerytutorialmongosetup:
    image: "mongo-setup"
    build: "./mongo-setup"
    container_name: "celerytutorialmongosetup"
    depends_on:
      - celerytutorialmongodb1
    volumes:
      - celerytutorialstatus:/data/

  celerytutorialredis:
    container_name: "celerytutorialredis"
    image: redis:alpine
    expose:
      - "6379"
    restart: always
    sysctls:
      net.core.somaxconn: '4096' 
        
  celerycelerytutorial:
    container_name: "celerycelerytutorial"
    build: ./celerytutorial/
    command: celery -A celerytutorial worker -l INFO --concurrency=3 
    working_dir: /code
    volumes:
      - ./celerytutorial:/code
    depends_on:
      - celerytutorialmongodb1
      - celerytutorialredis

  
  celerytutorialflower:
    container_name: celerytutorialflower
    image: mher/flower
    command: ["flower", "--broker=redis://celerytutorialredis:6379/0", "--port=5555"]  
    ports:  
      - "5555:5555"


volumes:
    celerytutorial_static:
    celerytutorialmongodata1:
    celerytutorialmongodata2:
    celerytutorialmongodata3:
    celerytutorialstatus: 

Let’s analyze the various containers in detail. 

The container named celerytutorial is related to the application developed in Django. The service will be made available on port 8000 of the host.

The relative image build is defined in a Docker file included in the Django project directory. This Docker file uses the multi-stage build approach that allows different images to be used during the various installation phases. Specifically, we use the node image to install the javascript libraries and the python image for the Django project. The output of the javascript libraries installation is copied to the Python-based image. This way there is no need to install node in the final image, making it more streamlined.

The javascript libraries and python packages useful for the project are defined in special files inside the _config/packaging directory of the Django project.

The contents of the Docker file are shown below.

# Install node modules
FROM node AS build

RUN mkdir /assets
WORKDIR /assets
COPY _config/packaging/package.json /assets
RUN npm install


FROM python:3.7
ENV PYTHONUNBUFFERED=1

# Copy installed modules into this new container
COPY --from=build /assets /assets

RUN mkdir /data
WORKDIR /data
COPY _config/packaging/requirements.txt /data/
RUN pip install --upgrade pip
RUN pip install -r requirements.txt 

The Django container entrypoint is a bash script that launches the Django test server. This script can be modified to initialize the project. For example, template migrations can be performed and/or the superuser can be created. If the project is used in a production environment, additional commands can be inserted to launch the application using a daemon (e.g. gunicorn). The file structure used in this tutorial is as follows.

#!/bin/bash

echo 'Launching the celerytutorial container...\n\n'


if [ ! -f /celerytutorial_status/celerytutorial-init.flag ]; then
    echo "Init celerytutorial"

    mkdir -p /data/celerytutorial_status/
    touch /celerytutorial_status/celerytutorial-init.flag

else
    echo "celerytutorial already initialized"
fi

echo 'Starting development server'
python manage.py runserver 0.0.0.0:8000 

The celerytutorailmongodbX containers are the different instances of the MongoDB replica set. In this tutorial, we will not directly use MongoDB to store data, but it will come in handy in future articles. 

The celerytutorialmongosetup container is also used to configure the MongoDB replica set. 

For more details on how to implement the replica set in Docker environment we refer you to the article MongoDB and Docker – How to create and configure a replica set.

Celery needs to rely on a broker to manage the message queue. We use the redis image in the Docker Hub to define the celerytutorialredis container.

To launch Celery we use another container, celerycelerytutorial, based on the Django project build. This way you are protected from any failures of either container. In a production environment you can also assume to deploy containers on different machines to balance the workload using Docker swarm. The container entrypoint is the command that launches 3 Celery workers in parallel. 

Finally, we add the Flower service. This service allows us, through a web application, to monitor the tasks executed by Celery. Please refer to the official documentation for more details.

Django Configuration

The integration of Celery within a Django project is done through some code changes.

First we need to update the Django project settings to define the redis connections and celery configuration parameters. Having redis available we can also use it for Django cache. The code to insert is as follows.

REDIS_HOST = 'redis://celerytutorialredis:6379'
CELERY_BROKER_URL = REDIS_HOST 
CELERY_RESULT_BACKEND = REDIS_HOST
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'


CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://celerytutorialredis:6379/1", # use database #1 (.e.g., docker-compose exec redis redis-cli -n 1)
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        }
    }
} 

To define the Celery application you need to create a celery.py file in the same folder as the settings. Inside the file you define the settings import and the command to automatically discover the tasks defined in the various apps of the project. Below is the content of the file.

import os

from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerytutorial.settings')

from django.conf import settings

app = Celery('celerytutorial')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() 

To launch Celery automatically when Django is launched, you must update the __init__.py file in the folder where the settings.py file is located as follows.

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app'] 

Task creation

To define a task, you need to create a tasks.py file inside the Django project apps. In our case, we write a task to retrieve the values of bitcoins provided by the coindesk service.

The task will connect to the public api and return a message with the current value of bitcoins in the requested currency. By default the currency is the euro. Below is the code. 

rom celery import shared_task, group, chain
from celerytutorial.mongodb import db, to_json
import requests
import json
from celerytutorial.celery import app

@app.task
def getBitcoins(currency_code = 'EUR'):
    try:
        bpi_url = 'https://api.coindesk.com/v1/bpi/currentprice.json'
        response = requests.get(bpi_url)
        if response.status_code != 200:
            raise Exception(f'GET {bpi_url} returned unexpected response code: {response.status_code}')

        data = json.loads(response.content.decode('utf-8'))

        price_data = 'Latest Bitcoin Price {} {}'.format(currency_code, data.get('bpi').get(currency_code).get('rate_float'))

        return price_data
    except Exception as e:
        return f'Something went wrong: {e}' 

This way it will be possible to call the task from any view of our application asynchronously.

In the next tutorials we will see how to modify this task to automate it and save the results in MongoDB.

Recommended Readings

More To Explore

Google Cloud platform

BigQuery: performance optimization

Although BigQuery is a very good tool for querying terabytes, best practices should be adopted to improve performance. Let’s discover tricks for writing queries that execute quickly and save on execution costs. We also look at how you can optimize table storage through partitioning and clustering.

Google Cloud platform

BigQuery: WINDOWS analytics

In many application scenarios, the statistics you need to extract refer to different groupings on the source data. By defining aggregation windows, you can calculate statistics within the same query. Moreover, if necessary, you can also provide different levels of data granularity through the ARRAY data type. Let’s discover these advanced features through two real-world examples.

Leave a Reply

Your email address will not be published. Required fields are marked *

Design with MongoDB

Design with MongoDB!!!

Buy the new book that will help you to use MongoDB correctly for your applications. Available now on Amazon!