Event-Driven Data Pipelines with Python and Redis

·7 min read·Data & Dashboards

Build event-driven data pipelines that react to changes in real time using Python and Redis Streams. Covers pub/sub patterns, consumer groups, and backpressure handling.

Event-Driven Data Pipelines with Python and Redis

Most data pipelines run on a schedule. Extract at midnight, transform at 1 AM, load by 2 AM. But what happens when your data source updates continuously — new orders every second, sensor readings every millisecond, user events firing constantly?

Scheduled pipelines cannot keep up. You need pipelines that react to events as they arrive, process them in order, and handle failures without losing data.

This guide builds event-driven data pipelines with Python and Redis Streams — a lightweight, production-ready pattern that sits between simple cron jobs and full-scale Kafka deployments.

# Architecture Overview

flowchart LR
  P1["Producer\n(API / Webhook)"] --> RS["Redis Stream\n(orders_stream)"]
  P2["Producer\n(Database CDC)"] --> RS
  RS --> C1["Consumer 1\n(Transform)"]
  RS --> C2["Consumer 2\n(Transform)"]
  C1 --> W["Warehouse\n(PostgreSQL)"]
  C2 --> W

Producers write events to Redis Streams. Consumer groups read and process events in parallel, with automatic acknowledgement and retry for failures.

# What You Will Need

bash
pip install redis pydantic
  • redis — Python client for Redis (includes Streams support)
  • pydantic — data validation for event schemas

You also need a running Redis instance (v5.0+ for Streams support):

bash
# Docker (quickest way)
docker run -d --name redis-streams -p 6379:6379 redis:7-alpine

# Why Redis Streams Over Alternatives

Feature Redis Streams Redis Pub/Sub Kafka RabbitMQ
Message persistence Yes No Yes Yes
Consumer groups Yes No Yes Yes
Replay from history Yes No Yes No
Setup complexity Low Low High Medium
Throughput ~100K msg/s ~500K msg/s ~1M msg/s ~50K msg/s
Best for Small–medium pipelines Fire-and-forget Large-scale streaming Task queues

Redis Streams give you Kafka-like semantics (persistence, consumer groups, replay) without Kafka-like operational overhead. For pipelines processing up to 100K events per second, Redis Streams are the sweet spot.

# Step 1: Define Event Schemas

Start with structured events. Every event needs a type, a timestamp, and a payload:

python
from datetime import datetime, timezone
from pydantic import BaseModel, Field
import json

class OrderEvent(BaseModel):
    """Event emitted when a new order is placed."""
    event_type: str = "order.created"
    event_id: str = Field(description="Unique event identifier")
    timestamp: str = Field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    order_id: str
    customer_id: str
    total_amount: float
    currency: str = "GBP"
    items: list[dict]

    def to_redis(self) -> dict:
        """Serialise for Redis Stream (all values must be strings)."""
        return {
            "event_type": self.event_type,
            "event_id": self.event_id,
            "timestamp": self.timestamp,
            "payload": self.model_dump_json(),
        }

    @classmethod
    def from_redis(cls, data: dict) -> "OrderEvent":
        """Deserialise from Redis Stream entry."""
        payload = json.loads(data["payload"])
        return cls(**payload)

# Step 2: Producing Events

Write events to a Redis Stream:

python
import redis
import uuid

class EventProducer:
    """Writes events to a Redis Stream."""

    def __init__(self, redis_url="redis://localhost:6379"):
        self.client = redis.from_url(redis_url, decode_responses=True)

    def emit(self, stream: str, event: OrderEvent) -> str:
        """Add an event to the stream. Returns the message ID."""
        message_id = self.client.xadd(
            stream,
            event.to_redis(),
            maxlen=100_000,  # cap stream length to prevent unbounded growth
        )
        return message_id

    def emit_batch(self, stream: str, events: list[OrderEvent]) -> list[str]:
        """Add multiple events in a pipeline (reduces round trips)."""
        pipe = self.client.pipeline()
        for event in events:
            pipe.xadd(stream, event.to_redis(), maxlen=100_000)
        return pipe.execute()

# Producing Events from a Webhook

python
producer = EventProducer()

def handle_shopify_webhook(payload: dict):
    """Convert a Shopify order webhook into a stream event."""
    event = OrderEvent(
        event_id=str(uuid.uuid4()),
        order_id=payload["id"],
        customer_id=payload["customer"]["id"],
        total_amount=float(payload["total_price"]),
        currency=payload["currency"],
        items=[
            {"sku": item["sku"], "qty": item["quantity"], "price": float(item["price"])}
            for item in payload["line_items"]
        ],
    )

    message_id = producer.emit("orders_stream", event)
    print(f"Event {event.event_id} added as {message_id}")

# Step 3: Consumer Groups

Consumer groups let multiple workers process events in parallel without duplicating work:

python
class EventConsumer:
    """Reads events from a Redis Stream using consumer groups."""

    def __init__(
        self,
        redis_url="redis://localhost:6379",
        stream="orders_stream",
        group="pipeline_workers",
        consumer_name="worker-1",
    ):
        self.client = redis.from_url(redis_url, decode_responses=True)
        self.stream = stream
        self.group = group
        self.consumer_name = consumer_name
        self._ensure_group()

    def _ensure_group(self):
        """Create the consumer group if it does not exist."""
        try:
            self.client.xgroup_create(
                self.stream, self.group, id="0", mkstream=True
            )
        except redis.exceptions.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise  # group already exists — safe to continue

    def read(self, count=10, block_ms=5000):
        """Read pending events from the stream.

        Returns a list of (message_id, event_data) tuples.
        block_ms=5000 means wait up to 5 seconds for new events.
        """
        results = self.client.xreadgroup(
            groupname=self.group,
            consumername=self.consumer_name,
            streams={self.stream: ">"},
            count=count,
            block=block_ms,
        )

        if not results:
            return []

        # results format: [(stream_name, [(msg_id, data), ...])]
        return results[0][1]

    def acknowledge(self, message_id: str):
        """Mark a message as successfully processed."""
        self.client.xack(self.stream, self.group, message_id)

# Step 4: Processing Loop

Build a consumer loop that processes events and handles failures gracefully:

python
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("consumer")

def process_event(event: OrderEvent) -> bool:
    """Transform and load a single order event.

    Returns True if successful, False if the event should be retried.
    """
    # Transform: calculate derived fields
    item_count = sum(item["qty"] for item in event.items)
    avg_item_price = event.total_amount / max(item_count, 1)

    # Load: insert into warehouse (simplified)
    logger.info(
        "Processing order %s: £%.2f, %d items, avg £%.2f/item",
        event.order_id,
        event.total_amount,
        item_count,
        avg_item_price,
    )
    return True

def run_consumer(consumer: EventConsumer):
    """Main consumer loop with error handling."""
    logger.info("Consumer %s started, listening on %s", consumer.consumer_name, consumer.stream)

    while True:
        messages = consumer.read(count=10, block_ms=5000)

        if not messages:
            continue  # no new events, loop again

        for message_id, data in messages:
            try:
                event = OrderEvent.from_redis(data)
                success = process_event(event)

                if success:
                    consumer.acknowledge(message_id)
                else:
                    logger.warning("Event %s needs retry", event.event_id)

            except Exception as exc:
                logger.error(
                    "Failed to process message %s: %s",
                    message_id,
                    str(exc),
                )
                # Don't acknowledge — event stays in pending list for retry

# Step 5: Handling Failed Events

Events that fail processing stay in the consumer's Pending Entries List (PEL). Claim and retry them:

python
class DeadLetterHandler:
    """Reclaim and retry failed events, or move to dead letter stream."""

    def __init__(self, consumer: EventConsumer, max_retries=3):
        self.consumer = consumer
        self.max_retries = max_retries
        self.client = consumer.client

    def reclaim_stale(self, min_idle_ms=60_000):
        """Reclaim events that have been pending for over min_idle_ms."""
        # xautoclaim returns (next_start_id, claimed_messages, deleted_ids)
        next_id, claimed, _deleted = self.client.xautoclaim(
            self.consumer.stream,
            self.consumer.group,
            self.consumer.consumer_name,
            min_idle_time=min_idle_ms,
            count=100,
        )

        reclaimed = []
        for msg_id, fields in claimed:
            # Check delivery count via xpending summary
            pending_info = self.client.xpending(
                self.consumer.stream,
                self.consumer.group,
            )
            delivery_count = pending_info.get("pending", 0)

            if delivery_count >= self.max_retries:
                # Move to dead letter stream
                self._move_to_dead_letter(msg_id)
                self.consumer.acknowledge(msg_id)
                logger.warning("Message %s moved to dead letter after max retries", msg_id)
            else:
                reclaimed.append(msg_id)
                logger.info("Reclaimed message %s for reprocessing", msg_id)

        return reclaimed

    def _move_to_dead_letter(self, message_id: str):
        """Move a failed message to the dead letter stream for investigation."""
        messages = self.client.xrange(
            self.consumer.stream, min=message_id, max=message_id
        )
        if messages:
            _, data = messages[0]
            data["original_stream"] = self.consumer.stream
            data["original_id"] = message_id
            data["failed_at"] = datetime.now(timezone.utc).isoformat()
            self.client.xadd("dead_letters", data, maxlen=10_000)

# Step 6: Backpressure and Stream Health

Monitor stream length and consumer lag to detect backpressure:

python
def get_stream_health(client, stream, group):
    """Check stream and consumer group health metrics."""
    # Stream info
    stream_info = client.xinfo_stream(stream)
    stream_length = stream_info["length"]

    # Consumer group info
    groups = client.xinfo_groups(stream)
    group_info = next((g for g in groups if g["name"] == group), None)

    if not group_info:
        return {"error": f"Group {group} not found"}

    pending_count = group_info["pending"]
    last_delivered = group_info["last-delivered-id"]

    return {
        "stream_length": stream_length,
        "pending_count": pending_count,
        "lag": stream_length - pending_count,
        "consumers": group_info["consumers"],
        "last_delivered_id": last_delivered,
    }

# Health Check Thresholds

Metric Healthy Warning Critical
Pending count < 100 100–1000 > 1000
Stream length < 50K 50K–90K > 90K (near maxlen)
Consumer lag < 500 500–5000 > 5000
Idle consumers 0 1 > 1

When pending count climbs, add more consumers. When stream length approaches maxlen, your consumers are not keeping up — investigate bottlenecks.

# Wiring It All Together

python
def main():
    """Run the event-driven pipeline."""
    consumer = EventConsumer(
        stream="orders_stream",
        group="pipeline_workers",
        consumer_name="worker-1",
    )
    dead_letter = DeadLetterHandler(consumer, max_retries=3)

    # Check for stale events on startup
    reclaimed = dead_letter.reclaim_stale(min_idle_ms=60_000)
    if reclaimed:
        logger.info("Reclaimed %d stale events on startup", len(reclaimed))

    # Start the processing loop
    run_consumer(consumer)

if __name__ == "__main__":
    main()

# Running Multiple Workers

bash
# Terminal 1
CONSUMER_NAME=worker-1 python consumer.py

# Terminal 2
CONSUMER_NAME=worker-2 python consumer.py

# Terminal 3
CONSUMER_NAME=worker-3 python consumer.py

Redis distributes events across workers automatically. Each event is delivered to exactly one consumer in the group.

# What This Replaces

Cron-Based Pipeline Event-Driven Pipeline
Runs on a fixed schedule (e.g. hourly) Processes events as they arrive
Data is always stale by at least one interval Near real-time data freshness
Entire dataset re-processed each run Only new events processed
No backpressure handling Stream length and consumer lag monitoring
Failed rows lost until next run Failed events retry automatically
One worker, no parallelism Consumer groups scale horizontally

# Next Steps

Event-driven pipelines work best as part of a broader data architecture:

If your pipelines need to react to data in real time but Kafka feels like overkill, Redis Streams are the pragmatic middle ground.

Need help designing event-driven data architectures? Get in touch or explore our data analytics services.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

event driven pipeline pythonredis streams pythonpython pub sub patternreal time data pipelineredis consumer groupsevent driven architecturepython message queueasync data processingredis pubsub pythonstreaming data pipeline