Modern machine learning applications increasingly require real-time data pipelines that can perform feature engineering, model inference, and continuous learning on streaming data. Apache Pulsar, with its scalable messaging, multi-topic support, and native serverless compute, is uniquely positioned to serve as the backbone for advanced ML pipelines.

In this post, we’ll explore how to use Apache Pulsar for real-time ML pipelines, covering ingestion, transformation, model serving, and feedback loops — all within a streaming-first architecture.


Why Pulsar for Machine Learning Pipelines?

Apache Pulsar offers key features that make it ideal for ML:

  • High-throughput streaming from real-time sources
  • Multi-tenant architecture to isolate data domains
  • Native functions for low-latency model inference
  • Built-in support for schema validation and data versioning
  • Flexibility to plug in Flink, Spark, or custom ML runtimes

It enables streaming-first ML workflows from data collection to model improvement.


Architecture Overview

[Devices / APIs / Kafka / Sensors]
↓
[Ingestion Topics (raw-events)]
↓
[Pulsar Functions / Flink / Spark]
↓
[Feature Topics (engineered-data)]
↓
[Model Inference Services / Pulsar Functions]
↓
[Prediction Topics]
↓
[Feedback Topics for Retraining]

Each stage is modular and scalable — allowing real-time ML without relying solely on batch ETL.


Step 1: Real-Time Ingestion

Use Pulsar producers or Kafka source connectors to push raw event data into Pulsar:

bin/pulsar-client produce persistent://ml/raw/sensor-input \
--messages '{"deviceId": "1234", "temp": 37.4, "ts": 1689912010}'

Use AVRO or JSON schema to ensure message consistency and evolution tracking.


Step 2: Feature Extraction with Pulsar Functions

Pulsar Functions enable lightweight compute directly on streams. Example: normalize temperature and add a feature flag.

def normalize(event, context):
data = json.loads(event)
data['temp_scaled'] = (data['temp'] - 20) / 10
data['alert'] = data['temp_scaled'] > 1.5
return json.dumps(data)

Deploy with:

bin/pulsar-admin functions create \
--tenant ml --namespace features --name temp-transform \
--inputs persistent://ml/raw/sensor-input \
--output persistent://ml/features/sensor-transformed \
--py normalize.py \
--classname normalize

Step 3: Real-Time Model Inference

There are two options:

A. Use Pulsar Functions with embedded models (for fast, lightweight inference):

from joblib import load
model = load("model.pkl")

def classify(event, context):
features = json.loads(event)
prediction = model.predict([[features['temp_scaled']]])
features['prediction'] = int(prediction[0])
return json.dumps(features)

B. Route to external model servers via REST using a sink connector or async consumer.


Step 4: Prediction and Feedback Loop

Send predictions to downstream systems or dashboards via:

  • Sink connectors (e.g., Elasticsearch, PostgreSQL)
  • REST API triggers
  • ML monitoring tools (Seldon, Prometheus, etc.)

Capture outcomes (e.g., user feedback, actual results) into feedback topics:

persistent://ml/feedback/model-accuracy

Use these to continuously evaluate and retrain models.


Integration with Stream Processing Frameworks

Pulsar integrates natively with:

  • Apache Flink: Streaming joins, aggregations, windowed processing
  • Apache Spark Structured Streaming: Real-time transformations and MLlib scoring
  • Airflow / MLFlow: For orchestration and tracking

Use Flink to join event streams with reference data (user profiles, geo-tags, etc.) before inference.


Performance and Scaling Considerations

  • Use Key_Shared subscriptions to maintain order while parallelizing per user/device
  • Enable batching on producers and consumers for higher throughput
  • Store intermediate features in RocksDB-backed stateful functions
  • Monitor function execution time with Pulsar’s built-in metrics
  • Scale functions horizontally across partitions and tenants

Real-World Use Cases

  • Fraud Detection: Score transactions in <100ms using streaming features
  • Personalized Ads: Update user vectors and deliver real-time content
  • Predictive Maintenance: Ingest sensor readings and trigger alerts
  • Dynamic Pricing: Feed real-time demand data into pricing algorithms

Conclusion

Apache Pulsar provides a highly adaptable platform for real-time machine learning pipelines, combining messaging, transformation, inference, and feedback within a single architecture. With serverless Pulsar Functions, scalable topic patterns, and integration with leading ML ecosystems, Pulsar helps bring machine learning models from batch to real-time — enabling smarter decisions at scale.

If you’re building the next generation of intelligent applications, Pulsar should be a core part of your ML infrastructure.