Using Celery with Flask

In this post, I will talk about using Celery with Flask. I’m going to assume that you have completed this tutorial on Celery.

Basic Flask app structure

Assume that we have written a nicely working Flask app which now needs to do some backend processing.

Here’s our basic Flask app structure:

+--myapp_dir/
   +--app/
      +--static/
      +--templates/
      +--utils/
      +--views/
      +--__init__.py
   +--config.py
   +--run.py
   +--requirements.txt

Include Celery

Including Celery into the current mix, it’d look something like –

+--myapp_dir/
   +--app/
      +--static/
      +--templates/
      +--utils/
      +--views/
      +--app_tasks.py
      +--__init__.py
   +--config.py
   +--run.py
   +--requirements.txt
   +--celery_tasks.py

Notice that we have now added two new files – celery_tasks.py and app_tasks.py

app_tasks.py

In this file would reside our Celery tasks. Let’s put some sample code –

from celery import Celery
import os
os.environ['CELERY_CONFIG_MODULE'] = 'celeryconfig'

app = Celery()

@app.task
def add(x, y):
    return x + y

@app.task
def print_results(res=None):
    if res:
        print "Results: %s" % res
    else:
        print "Bello Merld"

Here are some really catchy facts about this code:

  • os.environ['CELERY_CONFIG_MODULE'] = 'celeryconfig' – This line expects you to have a file called celeryconfig in the same directory as your app_tasks.py. There are 3 ways of handling configuration in Celery and it is documented here.
  • return x + y – This line looks like a very simple function return but when you don’t have a result backend, it throws – NotImplementedError

So, is something wrong with the code?

The code is not wrong but there are 2 advices you might want to bear in mind. These are more like my best practises.

Advice 1 – Do not create a separate settings file when using Celery with Flask

If you look back at the directory structure you will find that we have a file config.py. Re-use your config.py and do not store multiple settings in multiple files.

Example usage:

app = Celery()
app.config_from_object('config.ProductionConfig')

I tend to create my config.py in the following way:

class Config(object):
    # All defaults come here.
    MYSQL_HOST = 1.2.3.4
    MYSQL_USER = 'user'
    MYSQL_PASSWORD = 'password'

class ProductionConfig(Config):
    # All production specific settings can be overriden here.
    MYSQL_HOST = 127.0.0.1
    MYSQL_USER = 'master_user'
    MYSQL_PASSWORD = 'master_password'

class DevelopmentConfig(Config):
    # All development specific settings can be overriden here.
    MYSQL_HOST = 192.168.1.3

This structure allows me to quickly switch between my Production and Development configurations.

Here’s how I can add new settings directly or update existing settings –

app = Celery()
app.config_from_object('config.ProductionConfig')
app.conf.update(CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/')

Advice 2 – Include a Celery results backend before you return anything

Set the following variable – CELERY_RESULT_BACKEND. The error that you see when you don’t set it is –

NotImplementedError: No result backend configured. Please see the documentation for more information.

Note that implementing a results backend could be dangerous as described in my other post.

Updated app_tasks.py

Based on the above advices, let us modify the app_tasks.py.

from celery import Celery

app = Celery()
app.config_from_object('config.ProductionConfig')
app.conf.update(CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/')

# Ensure that the task expires quickly so that it doesn't slow Redis in case of
# high usage.
app.conf.update(CELERY_TASK_RESULT_EXPIRES=600)

@app.task
def add(x, y):
    return x + y

@app.task
def print_results(res=None):
    if res:
        print "Results: %s" % res
    else:
        print "Bello Merld"

celery_tasks.py

In this file, we would import all the tasks from app_tasks. This helps us maintain Celery tasks per app.

from app.app_tasks import *

Call Celery Tasks in Flask

To now use these elsewhere in Flask, we could –

from app.app_tasks import print_results

@app.route('/fetch_results/')
def fetch_results():
    # do something
    print_results.delay(res)
    return jsonify(message=res)

In a later post, I will cover how to deploy Celery with Flask.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s