How to create a celery task that fills out fields using Django

Posted on Nov 29, 2020
Last updated on Jan 19, 2023

Hi everyone!

It’s been way too long, I know. In this oportunity, I wanted to talk about asynchronicity in Django, but first, lets set up the stage:

Imagine you are working in a library and you have to develop an app that allows users to register new books using a barcode scanner. The system has to read the ISBN code and use an external resource to fill in the information (title, pages, authors, etc.). You don’t need the complete book information to continue, so the external resource can’t hold the request.

How can you process the external request asynchronously? 🤔

For that, we need Celery.

What is Celery?

Celery is a “distributed task queue”. Fron their website:

> Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

So Celery can get messages from external processes via a broker (like Redis), and process them.

The best thing is: Django can connect to Celery very easily, and Celery can access Django models without any problem. Sweet!

Lets code!

Let’s assume our project structure is the following:

- app/
  - manage.py
  - app/
    - __init__.py
    - settings.py
    - urls.py

Celery

First, we need to set up Celery in Django. Thankfully, Celery has an excellent documentation, but the entire process can be summarized to this:

In app/app/celery.py:

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings")

app = Celery("app")

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    """A debug celery task"""
    print(f"Request: {self.request!r}")

What’s going on here?

  • First, we set the DJANGO_SETTINGS_MODULE environment variable
  • Then, we instantiate our Celery app using the app variable.
  • Then, we tell Celery to look for celery configurations in the Django settings with the CELERY prefix. We will see this later in the post.
  • Finally, we start Celery’s autodiscover_tasks. Celery is now going to look for tasks.py files in the Django apps.

In /app/app/__init__.py:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ("celery_app",)

Finally in /app/app/settings.py:

...
# Celery
CELERY_BROKER_URL = env.str("CELERY_BROKER_URL")
CELERY_TIMEZONE = env.str("CELERY_TIMEZONE", "America/Montevideo")
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "django-cache"
...

Here, we can see that the CELERY prefix is used for all Celery configurations, because on celery.py we told Celery the prefix was CELERY

With this, Celery is fully configured. 🎉

Django

First, let’s create a core app. This is going to be used for everything common in the app

$ python manage.py startapp core

On core/models.py, lets set the following models:

"""
Models
"""
import uuid

from django.db import models


class TimeStampMixin(models.Model):
    """
    A base model that all the other models inherit from.
    This is to add created_at and updated_at to every model.
    """

    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        """Setting up the abstract model class"""

        abstract = True


class BaseAttributesModel(TimeStampMixin):
    """
    A base model that sets up all the attibutes models
    """

    name = models.CharField(max_length=255)
    outside_url = models.URLField()

    def __str__(self):
        return self.name

    class Meta:
        abstract = True

Then, let’s create a new app for our books:

python manage.py startapp books

And on books/models.py, let’s create the following models:

"""
Books models
"""
from django.db import models

from core.models import TimeStampMixin, BaseAttributesModel


class Author(BaseAttributesModel):
    """Defines the Author model"""


class People(BaseAttributesModel):
    """Defines the People model"""


class Subject(BaseAttributesModel):
    """Defines the Subject model"""


class Book(TimeStampMixin):
    """Defines the Book model"""

    isbn = models.CharField(max_length=13, unique=True)
    title = models.CharField(max_length=255, blank=True, null=True)
    pages = models.IntegerField(default=0)
    publish_date = models.CharField(max_length=255, blank=True, null=True)
    outside_id = models.CharField(max_length=255, blank=True, null=True)
    outside_url = models.URLField(blank=True, null=True)
    author = models.ManyToManyField(Author, related_name="books")
    person = models.ManyToManyField(People, related_name="books")
    subject = models.ManyToManyField(Subject, related_name="books")

    def __str__(self):
        return f"{self.title} - {self.isbn}"

Author, People, and Subject are all BaseAttributesModel, so their fields come from the class we defined on core/models.py.

For Book we add all the fields we need, plus a many_to_many with Author, People and Subjects. Because:

  • Books can have many authors, and many authors can have many books

Example: 27 Books by Multiple Authors That Prove the More, the Merrier

  • Books can have many persons, and many persons can have many books

Example: Ron Weasley is in several Harry Potter books

  • Books can have many subjects, and many subjects can have many books

Example: A book can be a comedy, fiction, and mystery at the same time

Let’s create books/serializers.py:

"""
Serializers for the Books
"""
from django.db.utils import IntegrityError
from rest_framework import serializers

from books.models import Book, Author, People, Subject
from books.tasks import get_books_information


class AuthorInBookSerializer(serializers.ModelSerializer):
    """Serializer for the Author objects inside Book"""

    class Meta:
        model = Author
        fields = ("id", "name")


class PeopleInBookSerializer(serializers.ModelSerializer):
    """Serializer for the People objects inside Book"""

    class Meta:
        model = People
        fields = ("id", "name")


class SubjectInBookSerializer(serializers.ModelSerializer):
    """Serializer for the Subject objects inside Book"""

    class Meta:
        model = Subject
        fields = ("id", "name")


class BookSerializer(serializers.ModelSerializer):
    """Serializer for the Book objects"""

    author = AuthorInBookSerializer(many=True, read_only=True)
    person = PeopleInBookSerializer(many=True, read_only=True)
    subject = SubjectInBookSerializer(many=True, read_only=True)

    class Meta:
        model = Book
        fields = "__all__"


class BulkBookSerializer(serializers.Serializer):
    """Serializer for bulk book creating"""

    isbn = serializers.ListField()

    def create(self, validated_data):
        return_dict = {"isbn": []}
        for isbn in validated_data["isbn"]:
            try:
                Book.objects.create(isbn=isbn)
                return_dict["isbn"].append(isbn)
            except IntegrityError as error:
                pass

        return return_dict

    def update(self, instance, validated_data):
        """The update method needs to be overwritten on
        serializers.Serializer. Since we don't need it, let's just
        pass it"""
        pass


class BaseAttributesSerializer(serializers.ModelSerializer):
    """A base serializer for the attributes objects"""

    books = BookSerializer(many=True, read_only=True)


class AuthorSerializer(BaseAttributesSerializer):
    """Serializer for the Author objects"""

    class Meta:
        model = Author
        fields = ("id", "name", "outside_url", "books")


class PeopleSerializer(BaseAttributesSerializer):
    """Serializer for the Author objects"""

    class Meta:
        model = People
        fields = ("id", "name", "outside_url", "books")


class SubjectSerializer(BaseAttributesSerializer):
    """Serializer for the Author objects"""

    class Meta:
        model = Subject
        fields = ("id", "name", "outside_url", "books")

The most important serializer here is BulkBookSerializer. It’s going to get an ISBN list and then bulk create them in the DB.

On books/views.py, we can set the following views:

"""
Views for the Books
"""
from rest_framework import viewsets, mixins, generics
from rest_framework.permissions import AllowAny

from books.models import Book, Author, People, Subject
from books.serializers import (
    BookSerializer,
    BulkBookSerializer,
    AuthorSerializer,
    PeopleSerializer,
    SubjectSerializer,
)


class BookViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list Books and retrieve books by ID
    """

    permission_classes = (AllowAny,)
    queryset = Book.objects.all()
    serializer_class = BookSerializer


class AuthorViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list Authors and retrieve authors by ID
    """

    permission_classes = (AllowAny,)
    queryset = Author.objects.all()
    serializer_class = AuthorSerializer


class PeopleViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list People and retrieve people by ID
    """

    permission_classes = (AllowAny,)
    queryset = People.objects.all()
    serializer_class = PeopleSerializer


class SubjectViewSet(
    viewsets.GenericViewSet,
    mixins.ListModelMixin,
    mixins.RetrieveModelMixin,
):
    """
    A view to list Subject and retrieve subject by ID
    """

    permission_classes = (AllowAny,)
    queryset = Subject.objects.all()
    serializer_class = SubjectSerializer


class BulkCreateBook(generics.CreateAPIView):
    """A view to bulk create books"""

    permission_classes = (AllowAny,)
    queryset = Book.objects.all()
    serializer_class = BulkBookSerializer

Easy enough, endpoints for getting books, authors, people and subjects and an endpoint to post ISBN codes in a list.

We can check swagger to see all the endpoints created:

Now, how are we going to get all the data? 🤔

Creating a Celery task

Now that we have our project structure done, we need to create the asynchronous task Celery is going to run to populate our fields.

To get the information, we are going to use the OpenLibrary API.

First, we need to create books/tasks.py:

"""
Celery tasks
"""
import requests
from celery import shared_task

from books.models import Book, Author, People, Subject


def get_book_info(isbn):
    """Gets a book information by using its ISBN.
    More info here https://openlibrary.org/dev/docs/api/books"""
    return requests.get(
        f"https://openlibrary.org/api/books?jscmd=data&format=json&bibkeys=ISBN:{isbn}"
    ).json()


def generate_many_to_many(model, iterable):
    """Generates the many to many relationships to books"""
    return_items = []
    for item in iterable:
        relation = model.objects.get_or_create(
            name=item["name"], outside_url=item["url"]
        )
        return_items.append(relation)
    return return_items


@shared_task
def get_books_information(isbn):
    """Gets a book information"""

    # First, we get the book information by its isbn
    book_info = get_book_info(isbn)

    if len(book_info) > 0:
        # Then, we need to access the json itself. Since the first key is dynamic,
        # we get it by accessing the json keys
        key = list(book_info.keys())[0]
        book_info = book_info[key]

        # Since the book was created on the Serializer, we get the book to edit
        book = Book.objects.get(isbn=isbn)

        # Set the fields we want from the API into the Book
        book.title = book_info["title"]
        book.publish_date = book_info["publish_date"]
        book.outside_id = book_info["key"]
        book.outside_url = book_info["url"]

        # For the optional fields, we try to get them first
        try:
            book.pages = book_info["number_of_pages"]
        except:
            book.pages = 0

        try:
            authors = book_info["authors"]
        except:
            authors = []

        try:
            people = book_info["subject_people"]
        except:
            people = []

        try:
            subjects = book_info["subjects"]
        except:
            subjects = []

        # And generate the appropiate many_to_many relationships
        authors_info = generate_many_to_many(Author, authors)
        people_info = generate_many_to_many(People, people)
        subjects_info = generate_many_to_many(Subject, subjects)

        # Once the relationships are generated, we save them in the book instance
        for author in authors_info:
            book.author.add(author[0])

        for person in people_info:
            book.person.add(person[0])

        for subject in subjects_info:
            book.subject.add(subject[0])

        # Finally, we save the Book
        book.save()

    else:
        raise ValueError("Book not found")

So when are we going to run this task? We need to run it in the serializer.

On books/serializers.py:

from books.tasks import get_books_information
...
class BulkBookSerializer(serializers.Serializer):
    """Serializer for bulk book creating"""

    isbn = serializers.ListField()

    def create(self, validated_data):
        return_dict = {"isbn": []}
        for isbn in validated_data["isbn"]:
            try:
                Book.objects.create(isbn=isbn)
                # We need to add this line
                get_books_information.delay(isbn)
                #################################
                return_dict["isbn"].append(isbn)
            except IntegrityError as error:
                pass

        return return_dict

    def update(self, instance, validated_data):
        pass

To trigger the Celery tasks, we need to call our function with the delay function, which has been added by the shared_task decorator. This tells Celery to start running the task in the background since we don’t need the result right now.

Docker configuration

There are a lot of moving parts we need for this to work, so I created a docker-compose configuration to help with the stack. I’m using the package django-environ to handle all environment variables.

On docker-compose.yml:

version: "3.7"

x-common-variables: &common-variables
  DJANGO_SETTINGS_MODULE: "app.settings"
  CELERY_BROKER_URL: "redis://redis:6379"
  DEFAULT_DATABASE: "psql://postgres:postgres@db:5432/app"
  DEBUG: "True"
  ALLOWED_HOSTS: "*,test"
  SECRET_KEY: "this-is-a-secret-key-shhhhh"

services:
  app:
    build:
      context: .
    volumes:
      - ./app:/app
    environment:
      <<: *common-variables
    ports:
      - 8000:8000
    command: >
      sh -c "python manage.py migrate &&
             python manage.py runserver 0.0.0.0:8000"      
    depends_on:
      - db
      - redis

  celery-worker:
    build:
      context: .
    volumes:
      - ./app:/app
    environment:
      <<: *common-variables
    command: celery --app app worker -l info
    depends_on:
      - db
      - redis

  db:
    image: postgres:12.4-alpine
    environment:
      - POSTGRES_DB=app
      - POSRGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

  redis:
    image: redis:6.0.8-alpine

This is going to set our app, DB, Redis, and most importantly our celery-worker instance. To run Celery, we need to execute:

$ celery --app app worker -l info

So we are going to run that command on a separate docker instance

Testing it out

If we run

$ docker-compose up

on our project root folder, the project should come up as usual. You should be able to open http://localhost:8000/admin and enter the admin panel.

To test the app, you can use a curl command from the terminal:

curl -X POST "http://localhost:8000/books/bulk-create" -H  "accept: application/json" \
    -H  "Content-Type: application/json" -d "{  \"isbn\": [ \"9780345418913\", \
    \"9780451524935\", \"9780451526342\", \"9781101990322\", \"9780143133438\"   ]}"

This call lasted 147ms, according to my terminal.

This should return instantly, creating 15 new books and 15 new Celery tasks, one for each book. You can also see tasks results in the Django admin using the django-celery-results package, check its documentation.

Celery tasks list, using django-celery-results

Created and processed books list

Single book information

People in books

Authors

Themes

And also, you can interact with the endpoints to search by author, theme, people, and book. This should change depending on how you created your URLs.

That’s it!

This surely was a LONG one, but it has been a very good one in my opinion. I’ve used Celery in the past for multiple things, from sending emails in the background to triggering scraping jobs and running scheduled tasks (like a unix cronjob)

You can check the complete project in my GitLab here: https://gitlab.com/rogs/books-app

If you have any doubts, let me know! I always answer emails and/or messages.