Real-Time vs Batch: Choosing the Right Data Pipeline Architecture
Understand when to use batch processing, real-time streaming, or a hybrid approach — with architecture diagrams, working code for each pattern, and a decision framework.

Every data pipeline discussion eventually reaches the same question: should this run on a schedule or react to events in real time?
The answer depends on what you are building. A daily sales report does not need to update every second. A fraud detection system cannot wait until morning. But most teams default to one pattern — usually batch — without evaluating whether it fits.
This guide covers three pipeline architectures: batch, real-time, and hybrid. Each includes working Python code, architecture diagrams, and clear guidance on when to use it.
# The Three Architectures
flowchart TD
subgraph Batch
B1["Source"] -->|"Scheduled\n(cron)"| B2["Extract"]
B2 --> B3["Transform"]
B3 --> B4["Load"]
end
subgraph RealTime["Real-Time"]
R1["Event"] -->|"Instant"| R2["Receive"]
R2 --> R3["Process"]
R3 --> R4["Store"]
end
subgraph Hybrid
H1["Source"] -->|"Events"| H2["Queue"]
H2 -->|"Micro-batch\n(every 5 min)"| H3["Process"]
H3 --> H4["Store"]
end# Pattern 1: Batch Processing
The most common pattern. Data is collected, then processed on a schedule.
# When to Use Batch
- Reports that users check once per day or week
- Data that changes slowly (daily sales, monthly metrics)
- Sources that only provide bulk exports (CSV, Excel)
- Downstream systems that do not need instant updates
# Architecture
flowchart LR S1["API\n(full export)"] --> C["Collect\n(6 AM daily)"] S2["Database\n(snapshot)"] --> C S3["Excel\n(uploaded)"] --> C C --> T["Transform\n& Validate"] T --> L["Load to\nDatabase"] L --> R["Generate\nReport"] R --> D["Distribute\n(email / dashboard)"]
# Working Code
import pandas as pd
import sqlite3
import requests
from datetime import datetime
import logging
logger = logging.getLogger("batch_pipeline")
class BatchPipeline:
"""Standard batch pipeline — run on schedule."""
def __init__(self, db_path="reports.db"):
self.db_path = db_path
self.run_timestamp = datetime.now().isoformat()
def extract(self, api_url, headers):
"""Extract full dataset from API."""
response = requests.get(api_url, headers=headers, timeout=60)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data["results"])
logger.info(f"Extracted {len(df)} records from API")
return df
def transform(self, df):
"""Clean and aggregate data."""
# Standardise columns
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
# Parse dates
if "created_at" in df.columns:
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
# Clean numerics
for col in ["amount", "revenue", "quantity"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
# Remove duplicates
if "id" in df.columns:
df = df.drop_duplicates(subset=["id"], keep="last")
# Add pipeline metadata
df["_pipeline_run"] = self.run_timestamp
logger.info(f"Transformed: {len(df)} rows")
return df
def load(self, df, table_name):
"""Load to database (replace for idempotency)."""
conn = sqlite3.connect(self.db_path)
df.to_sql(table_name, conn, if_exists="replace", index=False)
conn.close()
logger.info(f"Loaded {len(df)} rows to {table_name}")
def run(self, api_url, headers):
"""Execute full batch pipeline."""
logger.info(f"Batch pipeline started: {self.run_timestamp}")
df = self.extract(api_url, headers)
df = self.transform(df)
self.load(df, "daily_orders")
logger.info("Batch pipeline completed")
return {"rows": len(df), "status": "success"}
# Scheduling
# cron — run every day at 6 AM
0 6 * * * cd /path/to/project && python batch_pipeline.py >> logs/batch.log 2>&1
# Batch Characteristics
| Property | Value |
|---|---|
| Latency | Minutes to hours |
| Complexity | Low |
| Infrastructure | cron + Python |
| Data freshness | Last run time |
| Failure recovery | Re-run entire pipeline |
| Cost | Minimal (runs briefly) |
# Pattern 2: Real-Time Event Processing
Data arrives as events and is processed immediately. No schedule — the pipeline reacts.
# When to Use Real-Time
- Alerts that need to fire within seconds (fraud, outages)
- Live dashboards that users watch continuously
- Webhook-driven workflows (payment received, order placed)
- Systems where stale data causes harm
# Architecture
flowchart LR E1["Shopify\n(webhook)"] --> W["Webhook\nReceiver"] E2["Stripe\n(webhook)"] --> W W --> V["Validate\n& Parse"] V --> P["Process\n(business logic)"] P --> S["Store\n(database)"] P --> A["Alert\n(if threshold)"] S --> D["Dashboard\n(auto-refresh)"]
# Working Code: Webhook Receiver
from flask import Flask, request, jsonify
import pandas as pd
import sqlite3
import hmac
import hashlib
import logging
import os
app = Flask(__name__)
logger = logging.getLogger("realtime_pipeline")
def verify_webhook_signature(payload, signature, secret):
"""Verify webhook authenticity using HMAC signature."""
computed = hmac.new(
secret.encode("utf-8"),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(computed, signature)
@app.route("/webhook/order", methods=["POST"])
def handle_order_webhook():
"""Process incoming order events in real time."""
# Verify signature
signature = request.headers.get("X-Webhook-Signature", "")
webhook_secret = os.environ["WEBHOOK_SECRET"]
if not verify_webhook_signature(request.data, signature, webhook_secret):
logger.warning("Invalid webhook signature")
return jsonify({"error": "Invalid signature"}), 401
event = request.get_json()
# Process event
try:
process_order_event(event)
return jsonify({"status": "processed"}), 200
except Exception as e:
logger.error(f"Failed to process event: {e}")
return jsonify({"error": "Processing failed"}), 500
def process_order_event(event):
"""Process a single order event."""
order = {
"order_id": event["id"],
"customer_id": event["customer"]["id"],
"total": float(event["total_price"]),
"created_at": event["created_at"],
"status": event["financial_status"],
"processed_at": datetime.now().isoformat(),
}
# Store immediately
conn = sqlite3.connect("orders_realtime.db")
pd.DataFrame([order]).to_sql("orders", conn, if_exists="append", index=False)
conn.close()
# Check alert thresholds
if order["total"] > 5000:
logger.info(f"High-value order detected: #{order['order_id']} (${order['total']})")
send_alert(f"High-value order: #{order['order_id']} — ${order['total']}")
logger.info(f"Processed order #{order['order_id']}")
# Real-Time Characteristics
| Property | Value |
|---|---|
| Latency | Milliseconds to seconds |
| Complexity | Medium to high |
| Infrastructure | Web server + process manager |
| Data freshness | Immediate |
| Failure recovery | Retry queue + dead letter |
| Cost | Always-on server |
# Pattern 3: Hybrid (Micro-Batch)
Events are collected into a queue and processed in small batches every few minutes. This gives near-real-time freshness with batch-level simplicity.
# When to Use Hybrid
- Dashboards that need updates every 5–15 minutes (not instant)
- High-volume events where per-event processing is wasteful
- Systems that need to deduplicate or aggregate before storing
- Teams that want real-time value without real-time complexity
# Architecture
flowchart LR E1["Events"] --> Q["Redis Queue"] E2["Webhooks"] --> Q E3["API Poll"] --> Q Q --> W["Worker\n(every 5 min)"] W --> B["Batch Process\n(dedupe, aggregate)"] B --> S["Store"] S --> D["Dashboard"] W -.-> DLQ["Dead Letter\nQueue"]
# Working Code: Queue + Worker
import redis
import json
import time
import pandas as pd
import sqlite3
from datetime import datetime
import logging
logger = logging.getLogger("hybrid_pipeline")
class EventQueue:
"""Simple event queue using Redis lists."""
def __init__(self, redis_url="redis://localhost:6379", queue_name="pipeline_events"):
self.redis = redis.from_url(redis_url)
self.queue_name = queue_name
self.dlq_name = f"{queue_name}_dlq"
def push(self, event):
"""Add an event to the queue."""
self.redis.rpush(self.queue_name, json.dumps(event, default=str))
def pop_batch(self, max_items=100):
"""Pop up to max_items from the queue."""
pipe = self.redis.pipeline()
pipe.lrange(self.queue_name, 0, max_items - 1)
pipe.ltrim(self.queue_name, max_items, -1)
results = pipe.execute()
events = [json.loads(item) for item in results[0]]
return events
def send_to_dlq(self, event, error):
"""Send failed event to dead letter queue."""
event["_error"] = str(error)
event["_failed_at"] = datetime.now().isoformat()
self.redis.rpush(self.dlq_name, json.dumps(event, default=str))
def queue_depth(self):
"""Check how many events are waiting."""
return self.redis.llen(self.queue_name)
class MicroBatchProcessor:
"""Process queued events in small batches."""
def __init__(self, queue, db_path="hybrid_data.db"):
self.queue = queue
self.db_path = db_path
def process_batch(self):
"""Pop events from queue and process as a batch."""
events = self.queue.pop_batch(max_items=500)
if not events:
logger.debug("No events to process")
return {"processed": 0}
logger.info(f"Processing batch of {len(events)} events")
# Convert to DataFrame for efficient processing
df = pd.DataFrame(events)
# Deduplicate (same event can arrive twice)
if "event_id" in df.columns:
before = len(df)
df = df.drop_duplicates(subset=["event_id"], keep="last")
dupes = before - len(df)
if dupes > 0:
logger.info(f"Removed {dupes} duplicate events")
# Clean and transform
for col in ["amount", "total", "price"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
df["_batch_processed_at"] = datetime.now().isoformat()
# Store
conn = sqlite3.connect(self.db_path)
df.to_sql("events", conn, if_exists="append", index=False)
conn.close()
logger.info(f"Batch complete: {len(df)} events stored")
return {"processed": len(df)}
def run_worker(self, interval_seconds=300):
"""Run the micro-batch worker on an interval."""
logger.info(f"Worker started — processing every {interval_seconds}s")
while True:
try:
result = self.process_batch()
depth = self.queue.queue_depth()
if depth > 1000:
logger.warning(f"Queue depth high: {depth} events waiting")
except Exception as e:
logger.error(f"Batch processing failed: {e}")
time.sleep(interval_seconds)
# Producing Events
# From a webhook handler
queue = EventQueue()
@app.route("/webhook/event", methods=["POST"])
def receive_event():
"""Receive events and queue for batch processing."""
event = request.get_json()
event["_received_at"] = datetime.now().isoformat()
queue.push(event)
return jsonify({"status": "queued"}), 202
# Hybrid Characteristics
| Property | Value |
|---|---|
| Latency | 1–15 minutes (configurable) |
| Complexity | Medium |
| Infrastructure | Redis + worker process |
| Data freshness | Near real-time |
| Failure recovery | DLQ + batch retry |
| Cost | Low (worker + Redis) |
# Decision Framework
flowchart TD
Q1{"Need data in\n< 1 minute?"} -->|Yes| Q2{"High volume?\n(100+ events/sec)"}
Q1 -->|No| Q3{"Need data in\n< 15 minutes?"}
Q2 -->|Yes| H["Hybrid\n(micro-batch)"]
Q2 -->|No| RT["Real-Time\n(event-driven)"]
Q3 -->|Yes| H
Q3 -->|No| B["Batch\n(scheduled)"]# Side-by-Side Comparison
| Factor | Batch | Hybrid | Real-Time |
|---|---|---|---|
| Latency | Hours | Minutes | Seconds |
| Complexity | Low | Medium | High |
| Infrastructure | cron | Redis + worker | Web server + queue |
| Best for | Reports, exports | Dashboards, monitoring | Alerts, fraud |
| Failure mode | Re-run entire pipeline | Retry batch | Retry individual event |
| Scaling | Larger window | More workers | Horizontal scaling |
| Testing | Easy (deterministic) | Moderate | Hard (timing-dependent) |
| Cost | Lowest | Low | Highest |
# Common Scenarios
| Scenario | Recommended pattern | Why |
|---|---|---|
| Daily sales report | Batch | Users check once per day |
| Inventory dashboard | Hybrid (5 min) | Needs to be current, not instant |
| Fraud detection | Real-time | Every second of delay costs money |
| Weekly executive summary | Batch | Runs once per week |
| Order status tracker | Hybrid (1 min) | Near-instant without per-event cost |
| Price drop alerting | Real-time | Customer expectation of immediacy |
| Data warehouse load | Batch | Downstream is batch anyway |
| Live support queue | Real-time | Agents need current view |
# Migration Path
Most teams start with batch and evolve. Here is a practical migration path:
# Phase 1: Batch Only (Week 1)
# cron runs at 6 AM
pipeline = BatchPipeline()
pipeline.run(api_url, headers)
# Phase 2: Add Event Collection (Week 3)
# Webhook receiver queues events
# Batch pipeline still runs daily but also processes queued events
queue = EventQueue()
processor = MicroBatchProcessor(queue)
# Phase 3: Reduce Batch Interval (Week 5)
# Worker processes every 5 minutes instead of daily
processor.run_worker(interval_seconds=300)
# Phase 4: Real-Time for Critical Events (As Needed)
# Only high-value or alert-worthy events trigger immediate processing
# Everything else stays in the micro-batch worker
if event["total"] > 5000:
process_immediately(event)
else:
queue.push(event)
# What This Replaces
| Old approach | New approach |
|---|---|
| Everything runs daily via cron | Right pattern for each use case |
| Building real-time when batch is fine | Decision framework prevents over-engineering |
| Custom queue implementations | Redis-backed event queue |
| No event-driven capability | Webhook receiver + queue + worker |
# Next Steps
Start with batch. It covers 80% of reporting and automation use cases. Add the hybrid pattern when users ask "why is the dashboard not up to date?" Add real-time only when latency directly impacts outcomes.
For building the batch pipelines, see How to Design Data Pipelines for Reliable Reporting. For making any pipeline pattern recover from failures, see How to Build Self-Healing Data Pipelines.
Data analytics services include designing pipeline architectures that match your actual latency and freshness requirements.
Get in touch to discuss the right pipeline architecture for your data.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.