Working with Flask and Apache Kafka for Event-Driven Systems
Building scalable event-driven architectures using Flask and Kafka
Introduction
In modern applications, event-driven architectures allow systems to be asynchronous, scalable, and resilient. One of the most popular tools for handling event-driven workflows is Apache Kafka, which enables real-time data streaming and event processing.
In this guide, we’ll learn how to integrate Flask with Kafka to produce and consume messages efficiently.
Prerequisites
Before we begin, ensure you have:
- Python 3 installed
- Apache Kafka running locally or on a cloud service
Flask
,kafka-python
, andconfluent-kafka
installed
Install Required Dependencies
pip install flask kafka-python confluent-kafka
Setting Up Apache Kafka
Step 1: Download and Start Kafka
Download Kafka from Apache Kafka’s official website. Extract it and navigate to the Kafka directory.
Start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka Broker:
bin/kafka-server-start.sh config/server.properties
Create a Kafka topic:
bin/kafka-topics.sh --create --topic flask-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Producing Messages from Flask
To publish events, we’ll set up a Flask producer.
Step 2: Initialize Flask and Kafka Producer
from flask import Flask, request, jsonify
from kafka import KafkaProducer
import json
app = Flask(__name__)
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
@app.route("/publish", methods=["POST"])
def publish():
data = request.json
producer.send("flask-events", value=data)
return jsonify({"message": "Event published", "data": data})
if __name__ == "__main__":
app.run(debug=True, port=5000)
Step 3: Test the Kafka Producer
Run the Flask server and send a test event:
curl -X POST http://127.0.0.1:5000/publish -H "Content-Type: application/json" -d '{"event": "user_registered", "user_id": 123}'
Consuming Messages with Flask
Now, let’s build a Kafka consumer to process events.
Step 4: Initialize Kafka Consumer
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"flask-events",
bootstrap_servers="localhost:9092",
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)
print("Listening for messages...")
for message in consumer:
print(f"Received event: {message.value}")
Step 5: Run the Kafka Consumer
python consumer.py
You should see events being printed when new messages are published.
Enhancing with Confluent Kafka
For better performance and error handling, use confluent-kafka
:
Install Confluent Kafka
pip install confluent-kafka
Using Confluent Kafka Producer
from confluent_kafka import Producer
conf = {"bootstrap.servers": "localhost:9092"}
producer = Producer(conf)
@app.route("/publish", methods=["POST"])
def publish():
data = request.json
producer.produce("flask-events", key="event_key", value=json.dumps(data))
producer.flush()
return jsonify({"message": "Event published"})
Deploying to Production
For production, consider:
- Using Kafka clusters instead of a single broker
- Deploying Flask with Gunicorn
- Using Docker to containerize the services
Running Flask with Gunicorn
gunicorn -w 4 app:app
Running Kafka in Docker
docker-compose up -d
Conclusion
Integrating Flask with Apache Kafka allows us to build real-time event-driven applications. We covered:
✅ Setting up Kafka
✅ Producing messages from Flask
✅ Consuming messages with Kafka
✅ Optimizing with Confluent Kafka
🚀 Start building scalable event-driven applications with Flask and Kafka today!