How to Design Data Pipelines for Reliable Reporting Systems
Design data pipelines that handle multiple sources, ensure data quality, automate scheduling, and produce reliable reports — with architecture patterns and working Python examples.

A reporting system is only as reliable as the pipeline behind it. If the data flow breaks — because a source changes format, a file is missing, or a transformation silently drops rows — the report is wrong. And nobody knows until someone says "these numbers don't look right."
This guide covers how to design data pipelines that handle real-world problems: multiple sources, data quality checks, failure recovery, and scheduling.
# Pipeline Architecture
Every reliable reporting pipeline follows this structure:
flowchart LR E["Extract\n(sources)"] --> V["Validate\n(quality)"] V --> T["Transform\n(clean)"] T --> L["Load\n(store)"] L --> R["Report\n(output)"] E -.-> Log[Logging] V -.-> Log T -.-> Log L -.-> Log R -.-> Log V -.-> AL["Alerting\n(on failure)"]
Each stage is isolated. Each stage logs what it does. Quality checks happen before transformation, not after.
# Design Principles
# 1. Fail Loudly, Not Silently
The worst pipeline failure is one you do not notice. Silent failures — where the pipeline runs but produces wrong output — are more damaging than crashes.
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("pipeline.log"),
logging.StreamHandler(),
],
)
logger = logging.getLogger("pipeline")
# 2. Validate at Boundaries
Check data quality at every boundary — when data enters the system and when it moves between stages:
class DataValidationError(Exception):
"""Raised when data fails quality checks."""
pass
def validate_dataframe(df, name, rules):
"""Validate a DataFrame against a set of rules."""
errors = []
for rule_name, check_fn in rules.items():
if not check_fn(df):
errors.append(f"{name}: {rule_name}")
logger.error(f"Validation failed: {name} — {rule_name}")
if errors:
raise DataValidationError(f"Data validation failed:\n" + "\n".join(errors))
logger.info(f"Validation passed: {name} ({len(df)} rows)")
return df
# 3. Make Every Stage Idempotent
Running the same pipeline twice with the same inputs should produce the same output. This means:
- Use
if_exists="replace"when writing to tables - Deduplicate before loading
- Use deterministic timestamps (pipeline run time, not wall clock)
# Building the Pipeline
# Stage 1: Extract
import pandas as pd
import os
from datetime import datetime
class Extractor:
"""Extract data from multiple source types with logging and error handling."""
def __init__(self):
self.run_timestamp = datetime.now().isoformat()
def extract_excel(self, filepath):
"""Extract from Excel with existence check."""
if not os.path.exists(filepath):
logger.error(f"Source file not found: {filepath}")
raise FileNotFoundError(f"Source missing: {filepath}")
df = pd.read_excel(filepath)
df["_source"] = filepath
df["_extracted_at"] = self.run_timestamp
logger.info(f"Extracted {len(df)} rows from {filepath}")
return df
def extract_csv(self, filepath, encoding="utf-8"):
"""Extract from CSV with encoding handling."""
if not os.path.exists(filepath):
raise FileNotFoundError(f"Source missing: {filepath}")
df = pd.read_csv(filepath, encoding=encoding)
df["_source"] = filepath
df["_extracted_at"] = self.run_timestamp
logger.info(f"Extracted {len(df)} rows from {filepath}")
return df
def extract_database(self, connection_string, query):
"""Extract from database."""
import sqlite3
conn = sqlite3.connect(connection_string)
df = pd.read_sql(query, conn)
conn.close()
df["_source"] = connection_string
df["_extracted_at"] = self.run_timestamp
logger.info(f"Extracted {len(df)} rows from database")
return df
def extract_api(self, url, headers=None, params=None):
"""Extract from REST API with timeout and retry."""
import requests
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data if isinstance(data, list) else data.get("results", []))
df["_source"] = url
df["_extracted_at"] = self.run_timestamp
logger.info(f"Extracted {len(df)} rows from API: {url}")
return df
# Stage 2: Validate
Define validation rules per source:
def build_validation_rules(required_columns, min_rows=1):
"""Build a standard set of validation rules."""
return {
"has_required_columns": lambda df: all(col in df.columns for col in required_columns),
f"has_at_least_{min_rows}_rows": lambda df: len(df) >= min_rows,
"no_completely_empty_rows": lambda df: not df.dropna(how="all").empty,
"no_duplicate_columns": lambda df: len(df.columns) == len(set(df.columns)),
}
# Define rules per source
SALES_RULES = build_validation_rules(
required_columns=["date", "region", "product", "revenue"],
min_rows=10,
)
CUSTOMER_RULES = build_validation_rules(
required_columns=["customer_id", "name", "region"],
min_rows=1,
)
# Stage 3: Transform
import numpy as np
class Transformer:
"""Transform data with logging and quality tracking."""
def __init__(self):
self.quality_metrics = {}
def standardise_columns(self, df, name="dataset"):
"""Normalise column names."""
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
return df
def clean_strings(self, df, columns):
"""Strip whitespace and normalise casing."""
for col in columns:
if col in df.columns:
df[col] = df[col].astype(str).str.strip().str.title()
df[col] = df[col].replace({"Nan": np.nan, "None": np.nan, "N/A": np.nan})
return df
def parse_dates(self, df, date_columns):
"""Parse date columns with error tracking."""
for col in date_columns:
if col in df.columns:
before_nulls = df[col].isna().sum()
df[col] = pd.to_datetime(df[col], errors="coerce")
after_nulls = df[col].isna().sum()
failed = after_nulls - before_nulls
if failed > 0:
logger.warning(f"{failed} dates could not be parsed in '{col}'")
return df
def fix_numerics(self, df, numeric_columns):
"""Convert to numeric with error tracking."""
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
def deduplicate(self, df, key_columns, name="dataset"):
"""Remove duplicates with count tracking."""
before = len(df)
df = df.drop_duplicates(subset=key_columns, keep="last")
removed = before - len(df)
if removed > 0:
logger.info(f"Deduplicated {name}: removed {removed} rows ({before} → {len(df)})")
self.quality_metrics[f"{name}_duplicates_removed"] = removed
return df
def handle_missing(self, df, fill_rules):
"""Apply missing value strategies per column.
fill_rules: dict of column → strategy
Strategies: 'drop', 'zero', 'unknown', 'forward_fill', or a specific value
"""
for col, strategy in fill_rules.items():
if col not in df.columns:
continue
if strategy == "drop":
df = df.dropna(subset=[col])
elif strategy == "zero":
df[col] = df[col].fillna(0)
elif strategy == "unknown":
df[col] = df[col].fillna("Unknown")
elif strategy == "forward_fill":
df[col] = df[col].ffill()
else:
df[col] = df[col].fillna(strategy)
return df
# Stage 4: Load
import sqlite3
class Loader:
"""Load transformed data to storage destinations."""
def to_database(self, df, db_path, table_name):
"""Load to SQLite with replace semantics (idempotent)."""
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists="replace", index=False)
conn.close()
logger.info(f"Loaded {len(df)} rows to {db_path}:{table_name}")
def to_excel(self, dataframes, output_path):
"""Load multiple DataFrames to Excel sheets."""
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
for name, df in dataframes.items():
sheet_name = name.replace("_", " ").title()[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logger.info(f"Exported {len(dataframes)} sheets to {output_path}")
def to_csv(self, df, output_path):
"""Load to CSV."""
df.to_csv(output_path, index=False)
logger.info(f"Exported {len(df)} rows to {output_path}")
# Stage 5: Report
def generate_report_summary(df, group_by, metrics):
"""Generate aggregated report from clean data."""
agg_dict = {}
for metric_name, (column, func) in metrics.items():
agg_dict[metric_name] = (column, func)
report = df.groupby(group_by).agg(**agg_dict).reset_index().round(2)
logger.info(f"Generated report: {len(report)} rows, grouped by {group_by}")
return report
# Wiring It Together: The Pipeline Runner
class ReportingPipeline:
"""Orchestrate the full pipeline with error handling and logging."""
def __init__(self, db_path="reporting.db"):
self.extractor = Extractor()
self.transformer = Transformer()
self.loader = Loader()
self.db_path = db_path
def run(self):
"""Execute the full pipeline."""
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.info(f"Pipeline run started: {run_id}")
try:
# EXTRACT
sales = self.extractor.extract_excel("data/sales.xlsx")
customers = self.extractor.extract_csv("data/customers.csv")
# VALIDATE
sales = validate_dataframe(sales, "sales", SALES_RULES)
customers = validate_dataframe(customers, "customers", CUSTOMER_RULES)
# TRANSFORM
sales = self.transformer.standardise_columns(sales, "sales")
sales = self.transformer.parse_dates(sales, ["date"])
sales = self.transformer.fix_numerics(sales, ["revenue", "quantity"])
sales = self.transformer.clean_strings(sales, ["region", "product"])
sales = self.transformer.deduplicate(sales, ["date", "region", "product"], "sales")
sales = self.transformer.handle_missing(sales, {
"date": "drop",
"region": "drop",
"revenue": "zero",
"quantity": "zero",
})
customers = self.transformer.standardise_columns(customers, "customers")
customers = self.transformer.clean_strings(customers, ["name", "region"])
customers = self.transformer.deduplicate(customers, ["customer_id"], "customers")
# COMBINE
combined = sales.merge(customers, on="customer_id", how="left", suffixes=("", "_cust"))
# REPORT
monthly = generate_report_summary(
combined,
group_by=[combined["date"].dt.to_period("M").rename("month")],
metrics={
"revenue": ("revenue", "sum"),
"orders": ("revenue", "count"),
"avg_order": ("revenue", "mean"),
},
)
by_region = generate_report_summary(
combined,
group_by=["region"],
metrics={
"revenue": ("revenue", "sum"),
"orders": ("revenue", "count"),
},
)
# LOAD
self.loader.to_database(combined, self.db_path, "fact_sales")
self.loader.to_database(monthly, self.db_path, "report_monthly")
self.loader.to_database(by_region, self.db_path, "report_by_region")
self.loader.to_excel(
{"monthly": monthly, "by_region": by_region},
f"reports/report_{run_id}.xlsx",
)
logger.info(f"Pipeline run completed: {run_id}")
return {"status": "success", "rows_processed": len(combined), "run_id": run_id}
except DataValidationError as e:
logger.error(f"Pipeline failed (validation): {e}")
return {"status": "failed", "error": str(e), "run_id": run_id}
except FileNotFoundError as e:
logger.error(f"Pipeline failed (source missing): {e}")
return {"status": "failed", "error": str(e), "run_id": run_id}
except Exception as e:
logger.error(f"Pipeline failed (unexpected): {e}")
raise
if __name__ == "__main__":
pipeline = ReportingPipeline()
result = pipeline.run()
print(f"\nResult: {result}")
# Scheduling
# Linux / macOS
# Run pipeline every weekday at 6:00 AM
0 6 * * 1-5 cd /path/to/project && /usr/bin/python3 pipeline.py >> logs/cron.log 2>&1
# Windows
$action = New-ScheduledTaskAction -Execute "python" -Argument "C:\pipelines\pipeline.py" -WorkingDirectory "C:\pipelines"
$trigger = New-ScheduledTaskTrigger -Weekly -DaysOfWeek Monday,Tuesday,Wednesday,Thursday,Friday -At 6am
Register-ScheduledTask -Action $action -Trigger $trigger -TaskName "ReportingPipeline"
# Data Quality Monitoring
Add quality metrics that accumulate over time:
def log_quality_metrics(run_id, metrics, db_path):
"""Store quality metrics for trend monitoring."""
import sqlite3
conn = sqlite3.connect(db_path)
for metric_name, value in metrics.items():
conn.execute(
"INSERT INTO quality_log (run_id, metric, value, timestamp) VALUES (?, ?, ?, ?)",
(run_id, metric_name, value, datetime.now().isoformat()),
)
conn.commit()
conn.close()
Track metrics like:
| Metric | What it catches |
|---|---|
| Row count per source | Source went empty or grew unexpectedly |
| Null percentage per column | Data quality degradation |
| Duplicates removed | Source deduplication issues |
| Date parse failures | Format changes in source |
| Run duration | Performance regressions |
# Architecture Patterns
# Pattern 1: Single-File Pipeline (small teams)
pipeline.py — everything in one file, run via cron
Best for: 1–3 sources, weekly reports, small team
# Pattern 2: Modular Pipeline (growing systems)
extract/ — one module per source
transform/ — shared cleaning + source-specific transforms
load/ — one module per destination
pipeline.py — orchestrator
Best for: 5+ sources, daily reports, need to add sources frequently
# Pattern 3: DAG-based (complex systems)
Use Airflow, Prefect, or Dagster for:
- Dependencies between 10+ data flows
- Need for retry, backfill, monitoring dashboards
- Multiple teams contributing pipelines
For most business reporting, Pattern 1 or 2 is sufficient. Do not over-engineer.
# Next Steps
The pipeline architecture above handles the majority of real-world reporting needs. The key design decisions:
- Validate before transforming — catch problems early
- Log everything — you will need the audit trail
- Make it idempotent — safe to re-run anytime
- Fail loudly — alert on errors, not silently produce wrong numbers
For cleaning the source data that feeds into these pipelines, see How to Clean Messy Excel Data Using Python. For building the dashboard layer on top, see How to Build a Data Dashboard Without Manual Excel Work.
Data & dashboard services include designing and building complete reporting pipeline systems.
Get in touch to discuss building reliable reporting for your data.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.