Ecommerce Data Pipeline: Reporting Architecture That Scales
Design a data pipeline architecture for ecommerce reporting that handles multi-platform ingestion, incremental loads, dimension modelling, and automated report generation — from raw API data to business-ready dashboards.
AI Generated ImageEcommerce reporting starts simple. Pull yesterday's orders from Shopify, dump them in a spreadsheet, email it. Then someone asks for inventory data. Then returns. Then customer lifetime value. Then multi-channel comparisons. Within months, you have twelve scripts, four spreadsheets, and nobody trusts the numbers because each report calculates revenue differently.
The fix is architecture: a pipeline that ingests raw data from every platform into a single warehouse, transforms it through consistent business logic, and produces reports that everyone trusts because they all use the same source of truth.
# Who This Is For
- Ecommerce operators running stores across Shopify, WooCommerce, or Amazon who need unified reporting
- Data engineers designing their first ecommerce analytics pipeline
- Vibe coders building a data platform for an ecommerce client and wanting a proven architecture
- CTOs evaluating whether to build or buy their analytics infrastructure
Familiarity with Python and SQL is helpful. The architecture patterns apply regardless of your specific tech stack.
# The Full Architecture
flowchart TD
subgraph Sources["Data Sources"]
SHOP[Shopify API]
WOO[WooCommerce API]
GA[Google Analytics]
ADS[Ad Platforms]
end
subgraph Ingest["Ingestion Layer"]
EXT["Platform\nExtractors"]
RAW["Raw Data\nStore"]
end
subgraph Transform["Transformation Layer"]
STAGE["Staging\n(Clean & Validate)"]
DIM["Dimension\nModelling"]
METRICS["Metric\nCalculations"]
end
subgraph Warehouse["Data Warehouse"]
FACT["Fact Tables\n(Orders, Sessions)"]
DIMS["Dimension Tables\n(Products, Customers)"]
AGG["Aggregate Tables\n(Daily, Weekly)"]
end
subgraph Serve["Serving Layer"]
DASH["Dashboards"]
REPORT["Scheduled Reports"]
API["Reporting API"]
end
SHOP --> EXT
WOO --> EXT
GA --> EXT
ADS --> EXT
EXT --> RAW
RAW --> STAGE
STAGE --> DIM
DIM --> METRICS
METRICS --> FACT
METRICS --> DIMS
METRICS --> AGG
FACT --> DASH
AGG --> REPORT
DIMS --> API# What You Will Need
pip install pandas sqlalchemy requests prefect jinja2
- A database (PostgreSQL recommended, SQLite for prototyping)
- API credentials for your ecommerce platforms
# Step 1: Design the Ingestion Layer
Each platform gets its own extractor. They all output data in a standardised raw format.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
@dataclass
class ExtractionResult:
"""Standardised output from any platform extractor."""
platform: str
entity: str
records: pd.DataFrame
extracted_at: datetime
is_incremental: bool
watermark: str # Last processed ID or timestamp
class PlatformExtractor(ABC):
"""Base class for ecommerce platform extractors."""
@abstractmethod
def extract_orders(self, since: datetime = None) -> ExtractionResult:
"""Extract orders, optionally since a given timestamp."""
pass
@abstractmethod
def extract_products(self) -> ExtractionResult:
"""Extract the full product catalogue."""
pass
@abstractmethod
def extract_customers(self, since: datetime = None) -> ExtractionResult:
"""Extract customer data."""
pass
class ShopifyExtractor(PlatformExtractor):
"""Extract data from Shopify Admin API."""
def __init__(self, store_url: str, api_token: str):
"""Initialise with Shopify credentials."""
self.base_url = f"https://{store_url}/admin/api/2026-01"
self.headers = {"X-Shopify-Access-Token": api_token}
self.session = requests.Session()
self.session.headers.update(self.headers)
def _fetch_paginated(self, endpoint: str, params: dict) -> list:
"""Fetch all pages from a Shopify API endpoint."""
url = f"{self.base_url}/{endpoint}"
all_records = []
while url:
resp = self.session.get(url, params=params)
resp.raise_for_status()
data = resp.json()
key = list(data.keys())[0]
all_records.extend(data[key])
url = None
params = {}
link = resp.headers.get("Link", "")
if 'rel="next"' in link:
for part in link.split(","):
if 'rel="next"' in part:
url = part.split("<")[1].split(">")[0]
return all_records
def extract_orders(self, since: datetime = None) -> ExtractionResult:
"""Extract orders from Shopify."""
params = {"limit": 250, "status": "any"}
if since:
params["updated_at_min"] = since.isoformat()
raw = self._fetch_paginated("orders.json", params)
df = pd.json_normalize(raw)
print(f"Extracted {len(df)} orders from Shopify")
return ExtractionResult(
platform="shopify",
entity="orders",
records=df,
extracted_at=datetime.now(),
is_incremental=since is not None,
watermark=df["updated_at"].max() if len(df) > 0 else "",
)
def extract_products(self) -> ExtractionResult:
"""Extract the complete product catalogue."""
raw = self._fetch_paginated("products.json", {"limit": 250})
df = pd.json_normalize(raw)
print(f"Extracted {len(df)} products from Shopify")
return ExtractionResult(
platform="shopify",
entity="products",
records=df,
extracted_at=datetime.now(),
is_incremental=False,
watermark="",
)
def extract_customers(self, since: datetime = None) -> ExtractionResult:
"""Extract customer records."""
params = {"limit": 250}
if since:
params["updated_at_min"] = since.isoformat()
raw = self._fetch_paginated("customers.json", params)
df = pd.json_normalize(raw)
print(f"Extracted {len(df)} customers from Shopify")
return ExtractionResult(
platform="shopify",
entity="customers",
records=df,
extracted_at=datetime.now(),
is_incremental=since is not None,
watermark=df["updated_at"].max() if len(df) > 0 else "",
)
# Step 2: Build the Staging Layer
The staging layer cleans and normalises raw data from different platforms into a consistent schema.
class StagingTransformer:
"""Transform raw platform data into standardised staging tables."""
def stage_orders(self, result: ExtractionResult) -> pd.DataFrame:
"""Normalise orders from any platform into a standard schema."""
df = result.records.copy()
if result.platform == "shopify":
staged = pd.DataFrame({
"platform": "shopify",
"order_id": df["id"].astype(str),
"order_number": df["order_number"],
"created_at": pd.to_datetime(df["created_at"]),
"updated_at": pd.to_datetime(df["updated_at"]),
"status": df["financial_status"],
"currency": df["currency"],
"subtotal": pd.to_numeric(df["subtotal_price"], errors="coerce"),
"tax": pd.to_numeric(df["total_tax"], errors="coerce"),
"shipping": pd.to_numeric(df.get("total_shipping_price_set.shop_money.amount", 0), errors="coerce"),
"total": pd.to_numeric(df["total_price"], errors="coerce"),
"discount": pd.to_numeric(df["total_discounts"], errors="coerce"),
"refunded": pd.to_numeric(df.get("refunds", 0), errors="coerce").fillna(0),
"customer_email": df.get("email", ""),
"item_count": df.get("line_items", []).apply(len) if "line_items" in df.columns else 0,
})
else:
raise ValueError(f"Unsupported platform: {result.platform}")
# Calculate net revenue
staged["net_revenue"] = staged["total"] - staged["refunded"]
print(f"Staged {len(staged)} orders from {result.platform}")
return staged
# Step 3: Design the Dimension Model
Star schema with fact and dimension tables. This is what makes reporting consistent.
flowchart LR
subgraph Dimensions
D_DATE["dim_date\n─────────\ndate_key\nday_of_week\nweek_number\nmonth\nquarter\nyear\nis_weekend"]
D_PRODUCT["dim_product\n─────────\nproduct_key\ntitle\ncategory\nprice\nplatform"]
D_CUSTOMER["dim_customer\n─────────\ncustomer_key\nemail\nfirst_order\norder_count\ntotal_spent\nsegment"]
end
subgraph Facts
F_ORDER["fact_orders\n─────────\norder_key\ndate_key\ncustomer_key\nrevenue\ntax\nshipping\nnet_revenue\nitem_count"]
F_ITEM["fact_order_items\n─────────\nitem_key\norder_key\nproduct_key\nquantity\nprice\ndiscount"]
end
D_DATE --> F_ORDER
D_CUSTOMER --> F_ORDER
D_PRODUCT --> F_ITEM
F_ORDER --> F_ITEMfrom sqlalchemy import create_engine, text
class WarehouseBuilder:
"""Build and maintain the ecommerce data warehouse."""
def __init__(self, db_url: str):
"""Initialise with database connection."""
self.engine = create_engine(db_url)
def build_date_dimension(self, start_year: int = 2024, end_year: int = 2027):
"""Generate a date dimension table."""
dates = pd.date_range(f"{start_year}-01-01", f"{end_year}-12-31")
dim = pd.DataFrame({
"date_key": dates.strftime("%Y%m%d").astype(int),
"date": dates,
"day_of_week": dates.day_name(),
"day_number": dates.dayofweek,
"week_number": dates.isocalendar().week.astype(int),
"month": dates.month,
"month_name": dates.month_name(),
"quarter": dates.quarter,
"year": dates.year,
"is_weekend": dates.dayofweek >= 5,
})
dim.to_sql("dim_date", self.engine, if_exists="replace", index=False)
print(f"Date dimension built: {len(dim)} rows")
def build_customer_dimension(self, staged_orders: pd.DataFrame):
"""Build customer dimension from order history."""
customers = staged_orders.groupby("customer_email").agg(
first_order=("created_at", "min"),
last_order=("created_at", "max"),
order_count=("order_id", "nunique"),
total_spent=("net_revenue", "sum"),
avg_order_value=("net_revenue", "mean"),
).reset_index()
# Segment by value
customers["segment"] = pd.cut(
customers["total_spent"],
bins=[0, 50, 200, 1000, float("inf")],
labels=["low", "medium", "high", "vip"],
)
customers["customer_key"] = range(1, len(customers) + 1)
customers.to_sql("dim_customer", self.engine, if_exists="replace", index=False)
print(f"Customer dimension built: {len(customers)} rows")
return customers
# Step 4: Build Aggregate Tables
Pre-computed aggregates make dashboards fast. Rebuild them daily.
class AggregateBuilder:
"""Build pre-computed aggregate tables for reporting."""
def __init__(self, engine):
"""Initialise with database engine."""
self.engine = engine
def daily_summary(self):
"""Build daily summary aggregate."""
query = """
INSERT INTO agg_daily_summary
SELECT
date(created_at) as date,
platform,
COUNT(DISTINCT order_id) as orders,
SUM(net_revenue) as revenue,
AVG(net_revenue) as avg_order_value,
COUNT(DISTINCT customer_email) as unique_customers,
SUM(item_count) as items_sold,
SUM(discount) as total_discounts
FROM staged_orders
GROUP BY date(created_at), platform
ON CONFLICT (date, platform) DO UPDATE SET
orders = EXCLUDED.orders,
revenue = EXCLUDED.revenue,
avg_order_value = EXCLUDED.avg_order_value,
unique_customers = EXCLUDED.unique_customers,
items_sold = EXCLUDED.items_sold,
total_discounts = EXCLUDED.total_discounts
"""
with self.engine.connect() as conn:
conn.execute(text(query))
conn.commit()
print("Daily summary aggregate rebuilt")
def weekly_trends(self):
"""Build weekly trend comparisons."""
query = """
SELECT
date_trunc('week', date) as week_start,
platform,
SUM(revenue) as revenue,
SUM(orders) as orders,
AVG(avg_order_value) as aov,
SUM(unique_customers) as customers,
LAG(SUM(revenue)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_revenue,
LAG(SUM(orders)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_orders
FROM agg_daily_summary
GROUP BY date_trunc('week', date), platform
"""
df = pd.read_sql(query, self.engine)
df["revenue_change_pct"] = ((df["revenue"] - df["prev_revenue"]) / df["prev_revenue"] * 100).round(1)
df["order_change_pct"] = ((df["orders"] - df["prev_orders"]) / df["prev_orders"] * 100).round(1)
df.to_sql("agg_weekly_trends", self.engine, if_exists="replace", index=False)
print(f"Weekly trends built: {len(df)} weeks")
# Step 5: Orchestrate the Pipeline
Wire everything together with Prefect for scheduling and monitoring.
from prefect import flow, task
from datetime import datetime, timedelta
@flow(name="ecommerce-data-pipeline")
def run_pipeline(db_url: str, shopify_url: str, shopify_token: str):
"""Full ecommerce data pipeline: extract → stage → warehouse → aggregate."""
# Extract
extractor = ShopifyExtractor(shopify_url, shopify_token)
yesterday = datetime.now() - timedelta(days=1)
orders = extractor.extract_orders(since=yesterday)
products = extractor.extract_products()
# Stage
stager = StagingTransformer()
staged = stager.stage_orders(orders)
# Load staged data
from sqlalchemy import create_engine
engine = create_engine(db_url)
staged.to_sql("staged_orders", engine, if_exists="append", index=False)
# Build dimensions
warehouse = WarehouseBuilder(db_url)
warehouse.build_customer_dimension(staged)
# Build aggregates
agg = AggregateBuilder(engine)
agg.daily_summary()
agg.weekly_trends()
print(f"Pipeline complete: {len(staged)} orders processed")
# Schedule
run_pipeline.serve(
name="daily-ecommerce-pipeline",
schedules=[{"cron": "0 5 * * *", "timezone": "Europe/London"}],
)
# What This Replaces
| Ad-Hoc Approach | Pipeline Architecture |
|---|---|
| Separate scripts per report | Single pipeline, multiple outputs |
| Each script defines "revenue" differently | One staging layer with consistent business logic |
| No historical tracking | Warehouse with full history |
| Manual data pulls each morning | Automated daily ingestion |
| Spreadsheet-based comparisons | Pre-computed aggregates with trends |
| Platform-specific dashboards | Unified cross-platform reporting |
# Next Steps
For building the API extraction layer in detail, see Shopify Reporting API: Pull Sales, Inventory, and Customer Data. For creating dashboards from the warehouse data, see How to Build Dashboards Without BI Tools.
For making this pipeline fault-tolerant, see Build Self-Healing Data Pipelines. For the unified API layer across platforms, see Ecommerce Reporting API: Automate Store Data.
For data quality checks between stages, see Build a Data Quality Framework in Python.
Data analytics services include ecommerce data pipeline design, warehouse architecture, and reporting automation.
Get in touch to discuss building a data pipeline for your ecommerce business.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.