Flask and Celery: Building Distributed Task Queues for Scalable Applications
Enhance Flask applications with Celery for asynchronous task processing and scalability
Introduction
Modern web applications often need to process long-running tasks, such as sending emails, processing files, or handling complex computations. Flask, being a lightweight framework, is not designed for background task execution. This is where Celery comes in—a powerful distributed task queue that allows Flask applications to process tasks asynchronously.
In this blog, we will explore how to integrate Celery with Flask, optimize task execution, and improve performance using Redis as a message broker.
Setting Up Flask and Celery
Installing Dependencies
To get started, install Flask, Celery, and Redis using pip:
pip install flask celery redis
Ensure that Redis is installed and running:
redis-server
Configuring Celery in Flask
Create a Flask application and configure Celery with Redis as the broker:
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
Now, Celery is ready to handle background tasks.
Defining and Executing Celery Tasks
Creating an Asynchronous Task
Define a sample Celery task inside your Flask app:
@celery.task
def long_running_task(n):
import time
time.sleep(n)
return f"Task completed in {n} seconds"
This function simulates a long-running task that executes asynchronously.
Triggering Tasks from Flask
Modify your Flask app to trigger Celery tasks:
from flask import request, jsonify
@app.route('/run-task', methods=['POST'])
def run_task():
data = request.get_json()
task = long_running_task.apply_async(args=[data.get('duration', 5)])
return jsonify({"task_id": task.id, "status": "Task started"}), 202
Running Celery Worker
Start a Celery worker to process tasks in the background:
celery -A app.celery worker --loglevel=info
Once the worker is running, Celery will handle the background tasks asynchronously.
Monitoring Task Status
Checking Task Results
Use Flask to retrieve task results using Celery’s task ID:
@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):
task = long_running_task.AsyncResult(task_id)
return jsonify({"task_id": task_id, "status": task.state, "result": task.result})
Scaling Celery with Multiple Workers
Running Multiple Workers
To scale task execution, run multiple Celery workers:
celery -A app.celery worker --concurrency=4 --loglevel=info
This allows Celery to process multiple tasks concurrently, improving performance.
Conclusion
Integrating Celery with Flask provides an efficient way to handle long-running operations asynchronously. By leveraging Redis as a message broker, applications can scale seamlessly and handle background jobs efficiently.
Start building distributed task queues with Flask and Celery today! 🚀