Real-Time vs Batch: Choosing the Right Data Pipeline Architecture

·8 min read·Data & Dashboards

Understand when to use batch processing, real-time streaming, or a hybrid approach — with architecture diagrams, working code for each pattern, and a decision framework.

Real-Time vs Batch: Choosing the Right Data Pipeline Architecture

Every data pipeline discussion eventually reaches the same question: should this run on a schedule or react to events in real time?

The answer depends on what you are building. A daily sales report does not need to update every second. A fraud detection system cannot wait until morning. But most teams default to one pattern — usually batch — without evaluating whether it fits.

This guide covers three pipeline architectures: batch, real-time, and hybrid. Each includes working Python code, architecture diagrams, and clear guidance on when to use it.

# The Three Architectures

flowchart TD
  subgraph Batch
    B1["Source"] -->|"Scheduled\n(cron)"| B2["Extract"]
    B2 --> B3["Transform"]
    B3 --> B4["Load"]
  end

  subgraph RealTime["Real-Time"]
    R1["Event"] -->|"Instant"| R2["Receive"]
    R2 --> R3["Process"]
    R3 --> R4["Store"]
  end

  subgraph Hybrid
    H1["Source"] -->|"Events"| H2["Queue"]
    H2 -->|"Micro-batch\n(every 5 min)"| H3["Process"]
    H3 --> H4["Store"]
  end

# Pattern 1: Batch Processing

The most common pattern. Data is collected, then processed on a schedule.

# When to Use Batch

  • Reports that users check once per day or week
  • Data that changes slowly (daily sales, monthly metrics)
  • Sources that only provide bulk exports (CSV, Excel)
  • Downstream systems that do not need instant updates

# Architecture

flowchart LR
  S1["API\n(full export)"] --> C["Collect\n(6 AM daily)"]
  S2["Database\n(snapshot)"] --> C
  S3["Excel\n(uploaded)"] --> C
  C --> T["Transform\n& Validate"]
  T --> L["Load to\nDatabase"]
  L --> R["Generate\nReport"]
  R --> D["Distribute\n(email / dashboard)"]

# Working Code

python
import pandas as pd
import sqlite3
import requests
from datetime import datetime
import logging

logger = logging.getLogger("batch_pipeline")

class BatchPipeline:
    """Standard batch pipeline — run on schedule."""

    def __init__(self, db_path="reports.db"):
        self.db_path = db_path
        self.run_timestamp = datetime.now().isoformat()

    def extract(self, api_url, headers):
        """Extract full dataset from API."""
        response = requests.get(api_url, headers=headers, timeout=60)
        response.raise_for_status()
        data = response.json()
        df = pd.DataFrame(data["results"])
        logger.info(f"Extracted {len(df)} records from API")
        return df

    def transform(self, df):
        """Clean and aggregate data."""
        # Standardise columns
        df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

        # Parse dates
        if "created_at" in df.columns:
            df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")

        # Clean numerics
        for col in ["amount", "revenue", "quantity"]:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors="coerce")

        # Remove duplicates
        if "id" in df.columns:
            df = df.drop_duplicates(subset=["id"], keep="last")

        # Add pipeline metadata
        df["_pipeline_run"] = self.run_timestamp

        logger.info(f"Transformed: {len(df)} rows")
        return df

    def load(self, df, table_name):
        """Load to database (replace for idempotency)."""
        conn = sqlite3.connect(self.db_path)
        df.to_sql(table_name, conn, if_exists="replace", index=False)
        conn.close()
        logger.info(f"Loaded {len(df)} rows to {table_name}")

    def run(self, api_url, headers):
        """Execute full batch pipeline."""
        logger.info(f"Batch pipeline started: {self.run_timestamp}")
        df = self.extract(api_url, headers)
        df = self.transform(df)
        self.load(df, "daily_orders")
        logger.info("Batch pipeline completed")
        return {"rows": len(df), "status": "success"}

# Scheduling

bash
# cron — run every day at 6 AM
0 6 * * * cd /path/to/project && python batch_pipeline.py >> logs/batch.log 2>&1

# Batch Characteristics

Property Value
Latency Minutes to hours
Complexity Low
Infrastructure cron + Python
Data freshness Last run time
Failure recovery Re-run entire pipeline
Cost Minimal (runs briefly)

# Pattern 2: Real-Time Event Processing

Data arrives as events and is processed immediately. No schedule — the pipeline reacts.

# When to Use Real-Time

  • Alerts that need to fire within seconds (fraud, outages)
  • Live dashboards that users watch continuously
  • Webhook-driven workflows (payment received, order placed)
  • Systems where stale data causes harm

# Architecture

flowchart LR
  E1["Shopify\n(webhook)"] --> W["Webhook\nReceiver"]
  E2["Stripe\n(webhook)"] --> W
  W --> V["Validate\n& Parse"]
  V --> P["Process\n(business logic)"]
  P --> S["Store\n(database)"]
  P --> A["Alert\n(if threshold)"]
  S --> D["Dashboard\n(auto-refresh)"]

# Working Code: Webhook Receiver

python
from flask import Flask, request, jsonify
import pandas as pd
import sqlite3
import hmac
import hashlib
import logging
import os

app = Flask(__name__)
logger = logging.getLogger("realtime_pipeline")

def verify_webhook_signature(payload, signature, secret):
    """Verify webhook authenticity using HMAC signature."""
    computed = hmac.new(
        secret.encode("utf-8"),
        payload,
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(computed, signature)


@app.route("/webhook/order", methods=["POST"])
def handle_order_webhook():
    """Process incoming order events in real time."""
    # Verify signature
    signature = request.headers.get("X-Webhook-Signature", "")
    webhook_secret = os.environ["WEBHOOK_SECRET"]

    if not verify_webhook_signature(request.data, signature, webhook_secret):
        logger.warning("Invalid webhook signature")
        return jsonify({"error": "Invalid signature"}), 401

    event = request.get_json()

    # Process event
    try:
        process_order_event(event)
        return jsonify({"status": "processed"}), 200
    except Exception as e:
        logger.error(f"Failed to process event: {e}")
        return jsonify({"error": "Processing failed"}), 500


def process_order_event(event):
    """Process a single order event."""
    order = {
        "order_id": event["id"],
        "customer_id": event["customer"]["id"],
        "total": float(event["total_price"]),
        "created_at": event["created_at"],
        "status": event["financial_status"],
        "processed_at": datetime.now().isoformat(),
    }

    # Store immediately
    conn = sqlite3.connect("orders_realtime.db")
    pd.DataFrame([order]).to_sql("orders", conn, if_exists="append", index=False)
    conn.close()

    # Check alert thresholds
    if order["total"] > 5000:
        logger.info(f"High-value order detected: #{order['order_id']} (${order['total']})")
        send_alert(f"High-value order: #{order['order_id']} — ${order['total']}")

    logger.info(f"Processed order #{order['order_id']}")

# Real-Time Characteristics

Property Value
Latency Milliseconds to seconds
Complexity Medium to high
Infrastructure Web server + process manager
Data freshness Immediate
Failure recovery Retry queue + dead letter
Cost Always-on server

# Pattern 3: Hybrid (Micro-Batch)

Events are collected into a queue and processed in small batches every few minutes. This gives near-real-time freshness with batch-level simplicity.

# When to Use Hybrid

  • Dashboards that need updates every 5–15 minutes (not instant)
  • High-volume events where per-event processing is wasteful
  • Systems that need to deduplicate or aggregate before storing
  • Teams that want real-time value without real-time complexity

# Architecture

flowchart LR
  E1["Events"] --> Q["Redis Queue"]
  E2["Webhooks"] --> Q
  E3["API Poll"] --> Q

  Q --> W["Worker\n(every 5 min)"]
  W --> B["Batch Process\n(dedupe, aggregate)"]
  B --> S["Store"]
  S --> D["Dashboard"]
  W -.-> DLQ["Dead Letter\nQueue"]

# Working Code: Queue + Worker

python
import redis
import json
import time
import pandas as pd
import sqlite3
from datetime import datetime
import logging

logger = logging.getLogger("hybrid_pipeline")

class EventQueue:
    """Simple event queue using Redis lists."""

    def __init__(self, redis_url="redis://localhost:6379", queue_name="pipeline_events"):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.dlq_name = f"{queue_name}_dlq"

    def push(self, event):
        """Add an event to the queue."""
        self.redis.rpush(self.queue_name, json.dumps(event, default=str))

    def pop_batch(self, max_items=100):
        """Pop up to max_items from the queue."""
        pipe = self.redis.pipeline()
        pipe.lrange(self.queue_name, 0, max_items - 1)
        pipe.ltrim(self.queue_name, max_items, -1)
        results = pipe.execute()

        events = [json.loads(item) for item in results[0]]
        return events

    def send_to_dlq(self, event, error):
        """Send failed event to dead letter queue."""
        event["_error"] = str(error)
        event["_failed_at"] = datetime.now().isoformat()
        self.redis.rpush(self.dlq_name, json.dumps(event, default=str))

    def queue_depth(self):
        """Check how many events are waiting."""
        return self.redis.llen(self.queue_name)


class MicroBatchProcessor:
    """Process queued events in small batches."""

    def __init__(self, queue, db_path="hybrid_data.db"):
        self.queue = queue
        self.db_path = db_path

    def process_batch(self):
        """Pop events from queue and process as a batch."""
        events = self.queue.pop_batch(max_items=500)

        if not events:
            logger.debug("No events to process")
            return {"processed": 0}

        logger.info(f"Processing batch of {len(events)} events")

        # Convert to DataFrame for efficient processing
        df = pd.DataFrame(events)

        # Deduplicate (same event can arrive twice)
        if "event_id" in df.columns:
            before = len(df)
            df = df.drop_duplicates(subset=["event_id"], keep="last")
            dupes = before - len(df)
            if dupes > 0:
                logger.info(f"Removed {dupes} duplicate events")

        # Clean and transform
        for col in ["amount", "total", "price"]:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors="coerce")

        df["_batch_processed_at"] = datetime.now().isoformat()

        # Store
        conn = sqlite3.connect(self.db_path)
        df.to_sql("events", conn, if_exists="append", index=False)
        conn.close()

        logger.info(f"Batch complete: {len(df)} events stored")
        return {"processed": len(df)}

    def run_worker(self, interval_seconds=300):
        """Run the micro-batch worker on an interval."""
        logger.info(f"Worker started — processing every {interval_seconds}s")

        while True:
            try:
                result = self.process_batch()
                depth = self.queue.queue_depth()
                if depth > 1000:
                    logger.warning(f"Queue depth high: {depth} events waiting")
            except Exception as e:
                logger.error(f"Batch processing failed: {e}")

            time.sleep(interval_seconds)

# Producing Events

python
# From a webhook handler
queue = EventQueue()

@app.route("/webhook/event", methods=["POST"])
def receive_event():
    """Receive events and queue for batch processing."""
    event = request.get_json()
    event["_received_at"] = datetime.now().isoformat()
    queue.push(event)
    return jsonify({"status": "queued"}), 202

# Hybrid Characteristics

Property Value
Latency 1–15 minutes (configurable)
Complexity Medium
Infrastructure Redis + worker process
Data freshness Near real-time
Failure recovery DLQ + batch retry
Cost Low (worker + Redis)

# Decision Framework

flowchart TD
  Q1{"Need data in\n< 1 minute?"} -->|Yes| Q2{"High volume?\n(100+ events/sec)"}
  Q1 -->|No| Q3{"Need data in\n< 15 minutes?"}
  Q2 -->|Yes| H["Hybrid\n(micro-batch)"]
  Q2 -->|No| RT["Real-Time\n(event-driven)"]
  Q3 -->|Yes| H
  Q3 -->|No| B["Batch\n(scheduled)"]

# Side-by-Side Comparison

Factor Batch Hybrid Real-Time
Latency Hours Minutes Seconds
Complexity Low Medium High
Infrastructure cron Redis + worker Web server + queue
Best for Reports, exports Dashboards, monitoring Alerts, fraud
Failure mode Re-run entire pipeline Retry batch Retry individual event
Scaling Larger window More workers Horizontal scaling
Testing Easy (deterministic) Moderate Hard (timing-dependent)
Cost Lowest Low Highest

# Common Scenarios

Scenario Recommended pattern Why
Daily sales report Batch Users check once per day
Inventory dashboard Hybrid (5 min) Needs to be current, not instant
Fraud detection Real-time Every second of delay costs money
Weekly executive summary Batch Runs once per week
Order status tracker Hybrid (1 min) Near-instant without per-event cost
Price drop alerting Real-time Customer expectation of immediacy
Data warehouse load Batch Downstream is batch anyway
Live support queue Real-time Agents need current view

# Migration Path

Most teams start with batch and evolve. Here is a practical migration path:

# Phase 1: Batch Only (Week 1)

python
# cron runs at 6 AM
pipeline = BatchPipeline()
pipeline.run(api_url, headers)

# Phase 2: Add Event Collection (Week 3)

python
# Webhook receiver queues events
# Batch pipeline still runs daily but also processes queued events
queue = EventQueue()
processor = MicroBatchProcessor(queue)

# Phase 3: Reduce Batch Interval (Week 5)

python
# Worker processes every 5 minutes instead of daily
processor.run_worker(interval_seconds=300)

# Phase 4: Real-Time for Critical Events (As Needed)

python
# Only high-value or alert-worthy events trigger immediate processing
# Everything else stays in the micro-batch worker
if event["total"] > 5000:
    process_immediately(event)
else:
    queue.push(event)

# What This Replaces

Old approach New approach
Everything runs daily via cron Right pattern for each use case
Building real-time when batch is fine Decision framework prevents over-engineering
Custom queue implementations Redis-backed event queue
No event-driven capability Webhook receiver + queue + worker

# Next Steps

Start with batch. It covers 80% of reporting and automation use cases. Add the hybrid pattern when users ask "why is the dashboard not up to date?" Add real-time only when latency directly impacts outcomes.

For building the batch pipelines, see How to Design Data Pipelines for Reliable Reporting. For making any pipeline pattern recover from failures, see How to Build Self-Healing Data Pipelines.

Data analytics services include designing pipeline architectures that match your actual latency and freshness requirements.

Get in touch to discuss the right pipeline architecture for your data.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

real time data pipelinebatch vs streamingwebhook data pipeline pythonevent driven pipelinereal time data processingbatch processing pythondata pipeline architectureredis queue pythonwebhook receiver pythonhybrid data pipeline