Real-Time vs Batch: Choosing the Right Data Pipeline Architecture

· 9 min read · Data & Dashboards

Understand when to use batch processing, real-time streaming, or a hybrid approach — with architecture diagrams, working code for each pattern, and a decision framework.

Real-Time vs Batch: Choosing the Right Data Pipeline Architecture

Every data pipeline discussion eventually reaches the same question: should this run on a schedule or react to events in real time?

The answer depends on what you are building. A daily sales report does not need to update every second. A fraud detection system cannot wait until morning. But most teams default to one pattern — usually batch — without evaluating whether it fits.

This guide covers three pipeline architectures: batch, real-time, and hybrid. Each includes working Python code, architecture diagrams, and clear guidance on when to use it.

Who This Is For

  • Data engineers choosing between scheduled jobs and event-driven processing for a new project
  • Technical leads making architecture decisions that will be expensive to change later
  • Vibe coders whose batch scripts are too slow but who are unsure if they need a full streaming setup
  • Anyone who has been told “just use Kafka” without understanding when that is overkill

You need basic Python familiarity. The guide explains each architecture pattern in plain English before showing the implementation — so even if you are not writing the code, you will understand the trade-offs.

The Three Architectures

Pattern 1: Batch Processing

The most common pattern. Data is collected, then processed on a schedule.

When to Use Batch

  • Reports that users check once per day or week
  • Data that changes slowly (daily sales, monthly metrics)
  • Sources that only provide bulk exports (CSV, Excel)
  • Downstream systems that do not need instant updates

Architecture

Working Code

import pandas as pd
import sqlite3
import requests
from datetime import datetime
import logging

logger = logging.getLogger("batch_pipeline")

class BatchPipeline:
    """Standard batch pipeline — run on schedule."""

    def __init__(self, db_path="reports.db"):
        self.db_path = db_path
        self.run_timestamp = datetime.now().isoformat()

    def extract(self, api_url, headers):
        """Extract full dataset from API."""
        response = requests.get(api_url, headers=headers, timeout=60)
        response.raise_for_status()
        data = response.json()
        df = pd.DataFrame(data["results"])
        logger.info(f"Extracted {len(df)} records from API")
        return df

    def transform(self, df):
        """Clean and aggregate data."""
        # Standardise columns
        df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

        # Parse dates
        if "created_at" in df.columns:
            df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")

        # Clean numerics
        for col in ["amount", "revenue", "quantity"]:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors="coerce")

        # Remove duplicates
        if "id" in df.columns:
            df = df.drop_duplicates(subset=["id"], keep="last")

        # Add pipeline metadata
        df["_pipeline_run"] = self.run_timestamp

        logger.info(f"Transformed: {len(df)} rows")
        return df

    def load(self, df, table_name):
        """Load to database (replace for idempotency)."""
        conn = sqlite3.connect(self.db_path)
        df.to_sql(table_name, conn, if_exists="replace", index=False)
        conn.close()
        logger.info(f"Loaded {len(df)} rows to {table_name}")

    def run(self, api_url, headers):
        """Execute full batch pipeline."""
        logger.info(f"Batch pipeline started: {self.run_timestamp}")
        df = self.extract(api_url, headers)
        df = self.transform(df)
        self.load(df, "daily_orders")
        logger.info("Batch pipeline completed")
        return {"rows": len(df), "status": "success"}

Scheduling

# cron — run every day at 6 AM
0 6 * * * cd /path/to/project && python batch_pipeline.py >> logs/batch.log 2>&1

Batch Characteristics

PropertyValue
LatencyMinutes to hours
ComplexityLow
Infrastructurecron + Python
Data freshnessLast run time
Failure recoveryRe-run entire pipeline
CostMinimal (runs briefly)

Pattern 2: Real-Time Event Processing

Data arrives as events and is processed immediately. No schedule — the pipeline reacts.

When to Use Real-Time

  • Alerts that need to fire within seconds (fraud, outages)
  • Live dashboards that users watch continuously
  • Webhook-driven workflows (payment received, order placed)
  • Systems where stale data causes harm

Architecture

Working Code: Webhook Receiver

from flask import Flask, request, jsonify
import pandas as pd
import sqlite3
import hmac
import hashlib
import logging
import os

app = Flask(__name__)
logger = logging.getLogger("realtime_pipeline")

def verify_webhook_signature(payload, signature, secret):
    """Verify webhook authenticity using HMAC signature."""
    computed = hmac.new(
        secret.encode("utf-8"),
        payload,
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(computed, signature)


@app.route("/webhook/order", methods=["POST"])
def handle_order_webhook():
    """Process incoming order events in real time."""
    # Verify signature
    signature = request.headers.get("X-Webhook-Signature", "")
    webhook_secret = os.environ["WEBHOOK_SECRET"]

    if not verify_webhook_signature(request.data, signature, webhook_secret):
        logger.warning("Invalid webhook signature")
        return jsonify({"error": "Invalid signature"}), 401

    event = request.get_json()

    # Process event
    try:
        process_order_event(event)
        return jsonify({"status": "processed"}), 200
    except Exception as e:
        logger.error(f"Failed to process event: {e}")
        return jsonify({"error": "Processing failed"}), 500


def process_order_event(event):
    """Process a single order event."""
    order = {
        "order_id": event["id"],
        "customer_id": event["customer"]["id"],
        "total": float(event["total_price"]),
        "created_at": event["created_at"],
        "status": event["financial_status"],
        "processed_at": datetime.now().isoformat(),
    }

    # Store immediately
    conn = sqlite3.connect("orders_realtime.db")
    pd.DataFrame([order]).to_sql("orders", conn, if_exists="append", index=False)
    conn.close()

    # Check alert thresholds
    if order["total"] > 5000:
        logger.info(f"High-value order detected: #{order['order_id']} (${order['total']})")
        send_alert(f"High-value order: #{order['order_id']} — ${order['total']}")

    logger.info(f"Processed order #{order['order_id']}")

Real-Time Characteristics

PropertyValue
LatencyMilliseconds to seconds
ComplexityMedium to high
InfrastructureWeb server + process manager
Data freshnessImmediate
Failure recoveryRetry queue + dead letter
CostAlways-on server

Pattern 3: Hybrid (Micro-Batch)

Events are collected into a queue and processed in small batches every few minutes. This gives near-real-time freshness with batch-level simplicity.

When to Use Hybrid

  • Dashboards that need updates every 5–15 minutes (not instant)
  • High-volume events where per-event processing is wasteful
  • Systems that need to deduplicate or aggregate before storing
  • Teams that want real-time value without real-time complexity

Architecture

Working Code: Queue + Worker

import redis
import json
import time
import pandas as pd
import sqlite3
from datetime import datetime
import logging

logger = logging.getLogger("hybrid_pipeline")

class EventQueue:
    """Simple event queue using Redis lists."""

    def __init__(self, redis_url="redis://localhost:6379", queue_name="pipeline_events"):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.dlq_name = f"{queue_name}_dlq"

    def push(self, event):
        """Add an event to the queue."""
        self.redis.rpush(self.queue_name, json.dumps(event, default=str))

    def pop_batch(self, max_items=100):
        """Pop up to max_items from the queue."""
        pipe = self.redis.pipeline()
        pipe.lrange(self.queue_name, 0, max_items - 1)
        pipe.ltrim(self.queue_name, max_items, -1)
        results = pipe.execute()

        events = [json.loads(item) for item in results[0]]
        return events

    def send_to_dlq(self, event, error):
        """Send failed event to dead letter queue."""
        event["_error"] = str(error)
        event["_failed_at"] = datetime.now().isoformat()
        self.redis.rpush(self.dlq_name, json.dumps(event, default=str))

    def queue_depth(self):
        """Check how many events are waiting."""
        return self.redis.llen(self.queue_name)


class MicroBatchProcessor:
    """Process queued events in small batches."""

    def __init__(self, queue, db_path="hybrid_data.db"):
        self.queue = queue
        self.db_path = db_path

    def process_batch(self):
        """Pop events from queue and process as a batch."""
        events = self.queue.pop_batch(max_items=500)

        if not events:
            logger.debug("No events to process")
            return {"processed": 0}

        logger.info(f"Processing batch of {len(events)} events")

        # Convert to DataFrame for efficient processing
        df = pd.DataFrame(events)

        # Deduplicate (same event can arrive twice)
        if "event_id" in df.columns:
            before = len(df)
            df = df.drop_duplicates(subset=["event_id"], keep="last")
            dupes = before - len(df)
            if dupes > 0:
                logger.info(f"Removed {dupes} duplicate events")

        # Clean and transform
        for col in ["amount", "total", "price"]:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors="coerce")

        df["_batch_processed_at"] = datetime.now().isoformat()

        # Store
        conn = sqlite3.connect(self.db_path)
        df.to_sql("events", conn, if_exists="append", index=False)
        conn.close()

        logger.info(f"Batch complete: {len(df)} events stored")
        return {"processed": len(df)}

    def run_worker(self, interval_seconds=300):
        """Run the micro-batch worker on an interval."""
        logger.info(f"Worker started — processing every {interval_seconds}s")

        while True:
            try:
                result = self.process_batch()
                depth = self.queue.queue_depth()
                if depth > 1000:
                    logger.warning(f"Queue depth high: {depth} events waiting")
            except Exception as e:
                logger.error(f"Batch processing failed: {e}")

            time.sleep(interval_seconds)

Producing Events

# From a webhook handler
queue = EventQueue()

@app.route("/webhook/event", methods=["POST"])
def receive_event():
    """Receive events and queue for batch processing."""
    event = request.get_json()
    event["_received_at"] = datetime.now().isoformat()
    queue.push(event)
    return jsonify({"status": "queued"}), 202

Hybrid Characteristics

PropertyValue
Latency1–15 minutes (configurable)
ComplexityMedium
InfrastructureRedis + worker process
Data freshnessNear real-time
Failure recoveryDLQ + batch retry
CostLow (worker + Redis)

Decision Framework

Side-by-Side Comparison

FactorBatchHybridReal-Time
LatencyHoursMinutesSeconds
ComplexityLowMediumHigh
InfrastructurecronRedis + workerWeb server + queue
Best forReports, exportsDashboards, monitoringAlerts, fraud
Failure modeRe-run entire pipelineRetry batchRetry individual event
ScalingLarger windowMore workersHorizontal scaling
TestingEasy (deterministic)ModerateHard (timing-dependent)
CostLowestLowHighest

Common Scenarios

ScenarioRecommended patternWhy
Daily sales reportBatchUsers check once per day
Inventory dashboardHybrid (5 min)Needs to be current, not instant
Fraud detectionReal-timeEvery second of delay costs money
Weekly executive summaryBatchRuns once per week
Order status trackerHybrid (1 min)Near-instant without per-event cost
Price drop alertingReal-timeCustomer expectation of immediacy
Data warehouse loadBatchDownstream is batch anyway
Live support queueReal-timeAgents need current view

Migration Path

Most teams start with batch and evolve. Here is a practical migration path:

Phase 1: Batch Only (Week 1)

# cron runs at 6 AM
pipeline = BatchPipeline()
pipeline.run(api_url, headers)

Phase 2: Add Event Collection (Week 3)

# Webhook receiver queues events
# Batch pipeline still runs daily but also processes queued events
queue = EventQueue()
processor = MicroBatchProcessor(queue)

Phase 3: Reduce Batch Interval (Week 5)

# Worker processes every 5 minutes instead of daily
processor.run_worker(interval_seconds=300)

Phase 4: Real-Time for Critical Events (As Needed)

# Only high-value or alert-worthy events trigger immediate processing
# Everything else stays in the micro-batch worker
if event["total"] > 5000:
    process_immediately(event)
else:
    queue.push(event)

What This Replaces

Old approachNew approach
Everything runs daily via cronRight pattern for each use case
Building real-time when batch is fineDecision framework prevents over-engineering
Custom queue implementationsRedis-backed event queue
No event-driven capabilityWebhook receiver + queue + worker

Next Steps

Start with batch. It covers 80% of reporting and automation use cases. Add the hybrid pattern when users ask “why is the dashboard not up to date?” Add real-time only when latency directly impacts outcomes.

For building the batch pipelines, see How to Design Data Pipelines for Reliable Reporting. For making any pipeline pattern recover from failures, see How to Build Self-Healing Data Pipelines. For feeding web data into these pipelines, see Web Scraping to Structured Data: Building Reliable Extraction Pipelines. For optimising the SQL queries that power batch reporting, see SQL for Data Engineers: Window Functions, CTEs, and Query Optimization.

Data analytics services include designing pipeline architectures that match your actual latency and freshness requirements.

Get in touch to discuss the right pipeline architecture for your data.

real time data pipeline batch vs streaming webhook data pipeline python event driven pipeline real time data processing batch processing python data pipeline architecture redis queue python webhook receiver python hybrid data pipeline

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles