Spring Boot and Redis Streams for Real-Time Data Processing
Build scalable real-time data processing pipelines with Spring Boot and Redis Streams
Real-time data processing is a critical component of modern applications. Whether you’re handling financial transactions, IoT sensor readings, or event-based logs, you need a messaging system that is fast, reliable, and scalable.
Redis Streams is a powerful data structure introduced in Redis 5.0 that supports append-only log semantics with built-in message queues and consumer groups. Combined with Spring Boot, you can build reactive, real-time pipelines for ingesting, processing, and distributing event data efficiently.
This post explores how to integrate Redis Streams into Spring Boot applications for scalable real-time data processing.
What is Redis Streams?
Redis Streams is an append-only log that lets you:
- Publish messages to a stream
- Track consumers via consumer groups
- Acknowledge processed messages
- Replay or reprocess unacknowledged events
- Store high-throughput event data
Think of it as a lightweight Kafka alternative embedded inside Redis, ideal for microservices and serverless workflows.
Add Dependencies
Add the Spring Data Redis and Lettuce client dependencies to your pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
Also configure Redis in application.yml
:
spring:
redis:
host: localhost
port: 6379
Writing to a Redis Stream (Producer)
Use RedisTemplate
or StreamOperations
to add records to a stream.
@Autowired
private StringRedisTemplate redisTemplate;
public void publishEvent(String streamKey, Map<String, String> eventData) {
redisTemplate.opsForStream()
.add(StreamRecords.mapBacked(eventData).withStreamKey(streamKey));
}
Example event:
Map<String, String> data = Map.of(
"user", "alice",
"action", "login",
"timestamp", Instant.now().toString()
);
publishEvent("user.activity", data);
Reading from a Stream (Simple Consumer)
You can poll a Redis stream manually:
List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream()
.range("user.activity", Range.unbounded());
for (MapRecord<String, Object, Object> msg : messages) {
System.out.println("Message: " + msg.getValue());
}
This reads all messages — useful for debugging or batch reads.
Using Consumer Groups for Scalability
To consume from a stream in a scalable and fault-tolerant way, use consumer groups.
// Create the group if not exists
StreamOperations<String, Object, Object> ops = redisTemplate.opsForStream();
try {
ops.createGroup("user.activity", "activityGroup");
} catch (RedisSystemException e) {
// Group already exists
}
Now listen as a specific consumer:
List<MapRecord<String, Object, Object>> messages = ops.read(
Consumer.from("activityGroup", "consumer-1"),
StreamReadOptions.empty().count(10).block(Duration.ofSeconds(5)),
StreamOffset.create("user.activity", ReadOffset.lastConsumed())
);
After processing, acknowledge:
ops.acknowledge("user.activity", "activityGroup", messages.get(0).getId());
This guarantees at-least-once delivery.
Auto-Configure with Spring Data Redis StreamListener
Spring Data Redis offers support for listeners with StreamListener
:
@Component
public class UserActivityListener implements StreamListener<String, MapRecord<String, Object, Object>> {
@Override
public void onMessage(MapRecord<String, Object, Object> message) {
System.out.println("Received: " + message.getValue());
// process event...
}
}
Enable via StreamMessageListenerContainer
:
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> listenerContainer(
RedisConnectionFactory connectionFactory,
UserActivityListener listener) {
StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> container =
StreamMessageListenerContainer.create(connectionFactory,
StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
container.receive(Consumer.from("activityGroup", "consumer-1"),
StreamOffset.create("user.activity", ReadOffset.lastConsumed()),
listener);
container.start();
return container;
}
Retention, Trimming, and Replay
Redis Streams can automatically trim old entries:
ops.trim("user.activity", 1000); // Keep last 1000 entries
Or configure max length:
redisTemplate.opsForStream()
.add(StreamRecords.mapBacked(data).withStreamKey("user.activity"))
.withMaxLen(1000);
To replay unacknowledged messages:
List<MapRecord<String, Object, Object>> pending =
ops.readPending("user.activity", Consumer.from("activityGroup", "consumer-1"), Range.unbounded(), 10);
Best Practices
- Use consumer groups for reliable processing
- Acknowledge messages after processing to avoid redelivery
- Set stream TTL or use trimming to prevent unbounded growth
- Use meaningful stream keys (
event:order:created
, etc.) - Consider partitioning large streams across keys
Conclusion
Redis Streams and Spring Boot form a powerful duo for building real-time, event-driven applications. Whether you’re tracking user behavior, processing system logs, or building microservices communication pipelines, Redis Streams offer an elegant and efficient solution.
By using consumer groups, stream trimming, and Spring integrations, you can deliver fast, reliable, and scalable event processing with minimal infrastructure.