Introduction

Modern web applications often need to process long-running tasks, such as sending emails, processing files, or handling complex computations. Flask, being a lightweight framework, is not designed for background task execution. This is where Celery comes in—a powerful distributed task queue that allows Flask applications to process tasks asynchronously.

In this blog, we will explore how to integrate Celery with Flask, optimize task execution, and improve performance using Redis as a message broker.

Setting Up Flask and Celery

Installing Dependencies

To get started, install Flask, Celery, and Redis using pip:

pip install flask celery redis

Ensure that Redis is installed and running:

redis-server

Configuring Celery in Flask

Create a Flask application and configure Celery with Redis as the broker:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

Now, Celery is ready to handle background tasks.

Defining and Executing Celery Tasks

Creating an Asynchronous Task

Define a sample Celery task inside your Flask app:

@celery.task
def long_running_task(n):
import time
time.sleep(n)
return f"Task completed in {n} seconds"

This function simulates a long-running task that executes asynchronously.

Triggering Tasks from Flask

Modify your Flask app to trigger Celery tasks:

from flask import request, jsonify

@app.route('/run-task', methods=['POST'])
def run_task():
data = request.get_json()
task = long_running_task.apply_async(args=[data.get('duration', 5)])
return jsonify({"task_id": task.id, "status": "Task started"}), 202

Running Celery Worker

Start a Celery worker to process tasks in the background:

celery -A app.celery worker --loglevel=info

Once the worker is running, Celery will handle the background tasks asynchronously.

Monitoring Task Status

Checking Task Results

Use Flask to retrieve task results using Celery’s task ID:

@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):
task = long_running_task.AsyncResult(task_id)
return jsonify({"task_id": task_id, "status": task.state, "result": task.result})

Scaling Celery with Multiple Workers

Running Multiple Workers

To scale task execution, run multiple Celery workers:

celery -A app.celery worker --concurrency=4 --loglevel=info

This allows Celery to process multiple tasks concurrently, improving performance.

Conclusion

Integrating Celery with Flask provides an efficient way to handle long-running operations asynchronously. By leveraging Redis as a message broker, applications can scale seamlessly and handle background jobs efficiently.

Start building distributed task queues with Flask and Celery today! 🚀