Event-Driven Data Pipelines with Python and Redis
Build event-driven data pipelines that react to changes in real time using Python and Redis Streams. Covers pub/sub patterns, consumer groups, and backpressure handling.

Most data pipelines run on a schedule. Extract at midnight, transform at 1 AM, load by 2 AM. But what happens when your data source updates continuously — new orders every second, sensor readings every millisecond, user events firing constantly?
Scheduled pipelines cannot keep up. You need pipelines that react to events as they arrive, process them in order, and handle failures without losing data.
This guide builds event-driven data pipelines with Python and Redis Streams — a lightweight, production-ready pattern that sits between simple cron jobs and full-scale Kafka deployments.
# Architecture Overview
flowchart LR P1["Producer\n(API / Webhook)"] --> RS["Redis Stream\n(orders_stream)"] P2["Producer\n(Database CDC)"] --> RS RS --> C1["Consumer 1\n(Transform)"] RS --> C2["Consumer 2\n(Transform)"] C1 --> W["Warehouse\n(PostgreSQL)"] C2 --> W
Producers write events to Redis Streams. Consumer groups read and process events in parallel, with automatic acknowledgement and retry for failures.
# What You Will Need
pip install redis pydantic
- redis — Python client for Redis (includes Streams support)
- pydantic — data validation for event schemas
You also need a running Redis instance (v5.0+ for Streams support):
# Docker (quickest way)
docker run -d --name redis-streams -p 6379:6379 redis:7-alpine
# Why Redis Streams Over Alternatives
| Feature | Redis Streams | Redis Pub/Sub | Kafka | RabbitMQ |
|---|---|---|---|---|
| Message persistence | Yes | No | Yes | Yes |
| Consumer groups | Yes | No | Yes | Yes |
| Replay from history | Yes | No | Yes | No |
| Setup complexity | Low | Low | High | Medium |
| Throughput | ~100K msg/s | ~500K msg/s | ~1M msg/s | ~50K msg/s |
| Best for | Small–medium pipelines | Fire-and-forget | Large-scale streaming | Task queues |
Redis Streams give you Kafka-like semantics (persistence, consumer groups, replay) without Kafka-like operational overhead. For pipelines processing up to 100K events per second, Redis Streams are the sweet spot.
# Step 1: Define Event Schemas
Start with structured events. Every event needs a type, a timestamp, and a payload:
from datetime import datetime, timezone
from pydantic import BaseModel, Field
import json
class OrderEvent(BaseModel):
"""Event emitted when a new order is placed."""
event_type: str = "order.created"
event_id: str = Field(description="Unique event identifier")
timestamp: str = Field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
order_id: str
customer_id: str
total_amount: float
currency: str = "GBP"
items: list[dict]
def to_redis(self) -> dict:
"""Serialise for Redis Stream (all values must be strings)."""
return {
"event_type": self.event_type,
"event_id": self.event_id,
"timestamp": self.timestamp,
"payload": self.model_dump_json(),
}
@classmethod
def from_redis(cls, data: dict) -> "OrderEvent":
"""Deserialise from Redis Stream entry."""
payload = json.loads(data["payload"])
return cls(**payload)
# Step 2: Producing Events
Write events to a Redis Stream:
import redis
import uuid
class EventProducer:
"""Writes events to a Redis Stream."""
def __init__(self, redis_url="redis://localhost:6379"):
self.client = redis.from_url(redis_url, decode_responses=True)
def emit(self, stream: str, event: OrderEvent) -> str:
"""Add an event to the stream. Returns the message ID."""
message_id = self.client.xadd(
stream,
event.to_redis(),
maxlen=100_000, # cap stream length to prevent unbounded growth
)
return message_id
def emit_batch(self, stream: str, events: list[OrderEvent]) -> list[str]:
"""Add multiple events in a pipeline (reduces round trips)."""
pipe = self.client.pipeline()
for event in events:
pipe.xadd(stream, event.to_redis(), maxlen=100_000)
return pipe.execute()
# Producing Events from a Webhook
producer = EventProducer()
def handle_shopify_webhook(payload: dict):
"""Convert a Shopify order webhook into a stream event."""
event = OrderEvent(
event_id=str(uuid.uuid4()),
order_id=payload["id"],
customer_id=payload["customer"]["id"],
total_amount=float(payload["total_price"]),
currency=payload["currency"],
items=[
{"sku": item["sku"], "qty": item["quantity"], "price": float(item["price"])}
for item in payload["line_items"]
],
)
message_id = producer.emit("orders_stream", event)
print(f"Event {event.event_id} added as {message_id}")
# Step 3: Consumer Groups
Consumer groups let multiple workers process events in parallel without duplicating work:
class EventConsumer:
"""Reads events from a Redis Stream using consumer groups."""
def __init__(
self,
redis_url="redis://localhost:6379",
stream="orders_stream",
group="pipeline_workers",
consumer_name="worker-1",
):
self.client = redis.from_url(redis_url, decode_responses=True)
self.stream = stream
self.group = group
self.consumer_name = consumer_name
self._ensure_group()
def _ensure_group(self):
"""Create the consumer group if it does not exist."""
try:
self.client.xgroup_create(
self.stream, self.group, id="0", mkstream=True
)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise # group already exists — safe to continue
def read(self, count=10, block_ms=5000):
"""Read pending events from the stream.
Returns a list of (message_id, event_data) tuples.
block_ms=5000 means wait up to 5 seconds for new events.
"""
results = self.client.xreadgroup(
groupname=self.group,
consumername=self.consumer_name,
streams={self.stream: ">"},
count=count,
block=block_ms,
)
if not results:
return []
# results format: [(stream_name, [(msg_id, data), ...])]
return results[0][1]
def acknowledge(self, message_id: str):
"""Mark a message as successfully processed."""
self.client.xack(self.stream, self.group, message_id)
# Step 4: Processing Loop
Build a consumer loop that processes events and handles failures gracefully:
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("consumer")
def process_event(event: OrderEvent) -> bool:
"""Transform and load a single order event.
Returns True if successful, False if the event should be retried.
"""
# Transform: calculate derived fields
item_count = sum(item["qty"] for item in event.items)
avg_item_price = event.total_amount / max(item_count, 1)
# Load: insert into warehouse (simplified)
logger.info(
"Processing order %s: £%.2f, %d items, avg £%.2f/item",
event.order_id,
event.total_amount,
item_count,
avg_item_price,
)
return True
def run_consumer(consumer: EventConsumer):
"""Main consumer loop with error handling."""
logger.info("Consumer %s started, listening on %s", consumer.consumer_name, consumer.stream)
while True:
messages = consumer.read(count=10, block_ms=5000)
if not messages:
continue # no new events, loop again
for message_id, data in messages:
try:
event = OrderEvent.from_redis(data)
success = process_event(event)
if success:
consumer.acknowledge(message_id)
else:
logger.warning("Event %s needs retry", event.event_id)
except Exception as exc:
logger.error(
"Failed to process message %s: %s",
message_id,
str(exc),
)
# Don't acknowledge — event stays in pending list for retry
# Step 5: Handling Failed Events
Events that fail processing stay in the consumer's Pending Entries List (PEL). Claim and retry them:
class DeadLetterHandler:
"""Reclaim and retry failed events, or move to dead letter stream."""
def __init__(self, consumer: EventConsumer, max_retries=3):
self.consumer = consumer
self.max_retries = max_retries
self.client = consumer.client
def reclaim_stale(self, min_idle_ms=60_000):
"""Reclaim events that have been pending for over min_idle_ms."""
# xautoclaim returns (next_start_id, claimed_messages, deleted_ids)
next_id, claimed, _deleted = self.client.xautoclaim(
self.consumer.stream,
self.consumer.group,
self.consumer.consumer_name,
min_idle_time=min_idle_ms,
count=100,
)
reclaimed = []
for msg_id, fields in claimed:
# Check delivery count via xpending summary
pending_info = self.client.xpending(
self.consumer.stream,
self.consumer.group,
)
delivery_count = pending_info.get("pending", 0)
if delivery_count >= self.max_retries:
# Move to dead letter stream
self._move_to_dead_letter(msg_id)
self.consumer.acknowledge(msg_id)
logger.warning("Message %s moved to dead letter after max retries", msg_id)
else:
reclaimed.append(msg_id)
logger.info("Reclaimed message %s for reprocessing", msg_id)
return reclaimed
def _move_to_dead_letter(self, message_id: str):
"""Move a failed message to the dead letter stream for investigation."""
messages = self.client.xrange(
self.consumer.stream, min=message_id, max=message_id
)
if messages:
_, data = messages[0]
data["original_stream"] = self.consumer.stream
data["original_id"] = message_id
data["failed_at"] = datetime.now(timezone.utc).isoformat()
self.client.xadd("dead_letters", data, maxlen=10_000)
# Step 6: Backpressure and Stream Health
Monitor stream length and consumer lag to detect backpressure:
def get_stream_health(client, stream, group):
"""Check stream and consumer group health metrics."""
# Stream info
stream_info = client.xinfo_stream(stream)
stream_length = stream_info["length"]
# Consumer group info
groups = client.xinfo_groups(stream)
group_info = next((g for g in groups if g["name"] == group), None)
if not group_info:
return {"error": f"Group {group} not found"}
pending_count = group_info["pending"]
last_delivered = group_info["last-delivered-id"]
return {
"stream_length": stream_length,
"pending_count": pending_count,
"lag": stream_length - pending_count,
"consumers": group_info["consumers"],
"last_delivered_id": last_delivered,
}
# Health Check Thresholds
| Metric | Healthy | Warning | Critical |
|---|---|---|---|
| Pending count | < 100 | 100–1000 | > 1000 |
| Stream length | < 50K | 50K–90K | > 90K (near maxlen) |
| Consumer lag | < 500 | 500–5000 | > 5000 |
| Idle consumers | 0 | 1 | > 1 |
When pending count climbs, add more consumers. When stream length approaches maxlen, your consumers are not keeping up — investigate bottlenecks.
# Wiring It All Together
def main():
"""Run the event-driven pipeline."""
consumer = EventConsumer(
stream="orders_stream",
group="pipeline_workers",
consumer_name="worker-1",
)
dead_letter = DeadLetterHandler(consumer, max_retries=3)
# Check for stale events on startup
reclaimed = dead_letter.reclaim_stale(min_idle_ms=60_000)
if reclaimed:
logger.info("Reclaimed %d stale events on startup", len(reclaimed))
# Start the processing loop
run_consumer(consumer)
if __name__ == "__main__":
main()
# Running Multiple Workers
# Terminal 1
CONSUMER_NAME=worker-1 python consumer.py
# Terminal 2
CONSUMER_NAME=worker-2 python consumer.py
# Terminal 3
CONSUMER_NAME=worker-3 python consumer.py
Redis distributes events across workers automatically. Each event is delivered to exactly one consumer in the group.
# What This Replaces
| Cron-Based Pipeline | Event-Driven Pipeline |
|---|---|
| Runs on a fixed schedule (e.g. hourly) | Processes events as they arrive |
| Data is always stale by at least one interval | Near real-time data freshness |
| Entire dataset re-processed each run | Only new events processed |
| No backpressure handling | Stream length and consumer lag monitoring |
| Failed rows lost until next run | Failed events retry automatically |
| One worker, no parallelism | Consumer groups scale horizontally |
# Next Steps
Event-driven pipelines work best as part of a broader data architecture:
- Compare this approach with batch patterns in Real-Time vs Batch Pipeline Architecture
- Add structured logging to your consumers for production observability
- Handle failures with self-healing patterns that restart crashed consumers
- Test your event processors with mock streams and fixtures
If your pipelines need to react to data in real time but Kafka feels like overkill, Redis Streams are the pragmatic middle ground.
Need help designing event-driven data architectures? Get in touch or explore our data analytics services.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.