Introduction

In modern applications, event-driven architectures allow systems to be asynchronous, scalable, and resilient. One of the most popular tools for handling event-driven workflows is Apache Kafka, which enables real-time data streaming and event processing.

In this guide, we’ll learn how to integrate Flask with Kafka to produce and consume messages efficiently.

Prerequisites

Before we begin, ensure you have:

  • Python 3 installed
  • Apache Kafka running locally or on a cloud service
  • Flask, kafka-python, and confluent-kafka installed

Install Required Dependencies

pip install flask kafka-python confluent-kafka

Setting Up Apache Kafka

Step 1: Download and Start Kafka

Download Kafka from Apache Kafka’s official website. Extract it and navigate to the Kafka directory.

Start Zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka Broker:

bin/kafka-server-start.sh config/server.properties

Create a Kafka topic:

bin/kafka-topics.sh --create --topic flask-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Producing Messages from Flask

To publish events, we’ll set up a Flask producer.

Step 2: Initialize Flask and Kafka Producer

from flask import Flask, request, jsonify
from kafka import KafkaProducer
import json

app = Flask(__name__)

producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

@app.route("/publish", methods=["POST"])
def publish():
data = request.json
producer.send("flask-events", value=data)
return jsonify({"message": "Event published", "data": data})

if __name__ == "__main__":
app.run(debug=True, port=5000)

Step 3: Test the Kafka Producer

Run the Flask server and send a test event:

curl -X POST http://127.0.0.1:5000/publish -H "Content-Type: application/json" -d '{"event": "user_registered", "user_id": 123}'

Consuming Messages with Flask

Now, let’s build a Kafka consumer to process events.

Step 4: Initialize Kafka Consumer

from kafka import KafkaConsumer

consumer = KafkaConsumer(
"flask-events",
bootstrap_servers="localhost:9092",
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)

print("Listening for messages...")

for message in consumer:
print(f"Received event: {message.value}")

Step 5: Run the Kafka Consumer

python consumer.py

You should see events being printed when new messages are published.


Enhancing with Confluent Kafka

For better performance and error handling, use confluent-kafka:

Install Confluent Kafka

pip install confluent-kafka

Using Confluent Kafka Producer

from confluent_kafka import Producer

conf = {"bootstrap.servers": "localhost:9092"}
producer = Producer(conf)

@app.route("/publish", methods=["POST"])
def publish():
data = request.json
producer.produce("flask-events", key="event_key", value=json.dumps(data))
producer.flush()
return jsonify({"message": "Event published"})

Deploying to Production

For production, consider:

  • Using Kafka clusters instead of a single broker
  • Deploying Flask with Gunicorn
  • Using Docker to containerize the services

Running Flask with Gunicorn

gunicorn -w 4 app:app

Running Kafka in Docker

docker-compose up -d

Conclusion

Integrating Flask with Apache Kafka allows us to build real-time event-driven applications. We covered:

✅ Setting up Kafka
✅ Producing messages from Flask
✅ Consuming messages with Kafka
✅ Optimizing with Confluent Kafka

🚀 Start building scalable event-driven applications with Flask and Kafka today!