Ecommerce Data Pipeline: Reporting Architecture That Scales
Design a data pipeline architecture for ecommerce reporting that handles multi-platform ingestion, incremental loads, dimension modelling, and automated report generation — from raw API data to business-ready dashboards.
Ecommerce reporting starts simple. Pull yesterday’s orders from Shopify, dump them in a spreadsheet, email it. Then someone asks for inventory data. Then returns. Then customer lifetime value. Then multi-channel comparisons. Within months, you have twelve scripts, four spreadsheets, and nobody trusts the numbers because each report calculates revenue differently.
The fix is architecture: a pipeline that ingests raw data from every platform into a single warehouse, transforms it through consistent business logic, and produces reports that everyone trusts because they all use the same source of truth.
Who This Is For
- Ecommerce operators running stores across Shopify, WooCommerce, or Amazon who need unified reporting
- Data engineers designing their first ecommerce analytics pipeline
- Vibe coders building a data platform for an ecommerce client and wanting a proven architecture
- CTOs evaluating whether to build or buy their analytics infrastructure
Familiarity with Python and SQL is helpful. The architecture patterns apply regardless of your specific tech stack.
The Full Architecture
What You Will Need
pip install pandas sqlalchemy requests prefect jinja2
- A database (PostgreSQL recommended, SQLite for prototyping)
- API credentials for your ecommerce platforms
Step 1: Design the Ingestion Layer
Each platform gets its own extractor. They all output data in a standardised raw format.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
@dataclass
class ExtractionResult:
"""Standardised output from any platform extractor."""
platform: str
entity: str
records: pd.DataFrame
extracted_at: datetime
is_incremental: bool
watermark: str # Last processed ID or timestamp
class PlatformExtractor(ABC):
"""Base class for ecommerce platform extractors."""
@abstractmethod
def extract_orders(self, since: datetime = None) -> ExtractionResult:
"""Extract orders, optionally since a given timestamp."""
pass
@abstractmethod
def extract_products(self) -> ExtractionResult:
"""Extract the full product catalogue."""
pass
@abstractmethod
def extract_customers(self, since: datetime = None) -> ExtractionResult:
"""Extract customer data."""
pass
class ShopifyExtractor(PlatformExtractor):
"""Extract data from Shopify Admin API."""
def __init__(self, store_url: str, api_token: str):
"""Initialise with Shopify credentials."""
self.base_url = f"https://{store_url}/admin/api/2026-01"
self.headers = {"X-Shopify-Access-Token": api_token}
self.session = requests.Session()
self.session.headers.update(self.headers)
def _fetch_paginated(self, endpoint: str, params: dict) -> list:
"""Fetch all pages from a Shopify API endpoint."""
url = f"{self.base_url}/{endpoint}"
all_records = []
while url:
resp = self.session.get(url, params=params)
resp.raise_for_status()
data = resp.json()
key = list(data.keys())[0]
all_records.extend(data[key])
url = None
params = {}
link = resp.headers.get("Link", "")
if 'rel="next"' in link:
for part in link.split(","):
if 'rel="next"' in part:
url = part.split("<")[1].split(">")[0]
return all_records
def extract_orders(self, since: datetime = None) -> ExtractionResult:
"""Extract orders from Shopify."""
params = {"limit": 250, "status": "any"}
if since:
params["updated_at_min"] = since.isoformat()
raw = self._fetch_paginated("orders.json", params)
df = pd.json_normalize(raw)
print(f"Extracted {len(df)} orders from Shopify")
return ExtractionResult(
platform="shopify",
entity="orders",
records=df,
extracted_at=datetime.now(),
is_incremental=since is not None,
watermark=df["updated_at"].max() if len(df) > 0 else "",
)
def extract_products(self) -> ExtractionResult:
"""Extract the complete product catalogue."""
raw = self._fetch_paginated("products.json", {"limit": 250})
df = pd.json_normalize(raw)
print(f"Extracted {len(df)} products from Shopify")
return ExtractionResult(
platform="shopify",
entity="products",
records=df,
extracted_at=datetime.now(),
is_incremental=False,
watermark="",
)
def extract_customers(self, since: datetime = None) -> ExtractionResult:
"""Extract customer records."""
params = {"limit": 250}
if since:
params["updated_at_min"] = since.isoformat()
raw = self._fetch_paginated("customers.json", params)
df = pd.json_normalize(raw)
print(f"Extracted {len(df)} customers from Shopify")
return ExtractionResult(
platform="shopify",
entity="customers",
records=df,
extracted_at=datetime.now(),
is_incremental=since is not None,
watermark=df["updated_at"].max() if len(df) > 0 else "",
)
Step 2: Build the Staging Layer
The staging layer cleans and normalises raw data from different platforms into a consistent schema.
class StagingTransformer:
"""Transform raw platform data into standardised staging tables."""
def stage_orders(self, result: ExtractionResult) -> pd.DataFrame:
"""Normalise orders from any platform into a standard schema."""
df = result.records.copy()
if result.platform == "shopify":
staged = pd.DataFrame({
"platform": "shopify",
"order_id": df["id"].astype(str),
"order_number": df["order_number"],
"created_at": pd.to_datetime(df["created_at"]),
"updated_at": pd.to_datetime(df["updated_at"]),
"status": df["financial_status"],
"currency": df["currency"],
"subtotal": pd.to_numeric(df["subtotal_price"], errors="coerce"),
"tax": pd.to_numeric(df["total_tax"], errors="coerce"),
"shipping": pd.to_numeric(df.get("total_shipping_price_set.shop_money.amount", 0), errors="coerce"),
"total": pd.to_numeric(df["total_price"], errors="coerce"),
"discount": pd.to_numeric(df["total_discounts"], errors="coerce"),
"refunded": pd.to_numeric(df.get("refunds", 0), errors="coerce").fillna(0),
"customer_email": df.get("email", ""),
"item_count": df.get("line_items", []).apply(len) if "line_items" in df.columns else 0,
})
else:
raise ValueError(f"Unsupported platform: {result.platform}")
# Calculate net revenue
staged["net_revenue"] = staged["total"] - staged["refunded"]
print(f"Staged {len(staged)} orders from {result.platform}")
return staged
Step 3: Design the Dimension Model
Star schema with fact and dimension tables. This is what makes reporting consistent.
from sqlalchemy import create_engine, text
class WarehouseBuilder:
"""Build and maintain the ecommerce data warehouse."""
def __init__(self, db_url: str):
"""Initialise with database connection."""
self.engine = create_engine(db_url)
def build_date_dimension(self, start_year: int = 2024, end_year: int = 2027):
"""Generate a date dimension table."""
dates = pd.date_range(f"{start_year}-01-01", f"{end_year}-12-31")
dim = pd.DataFrame({
"date_key": dates.strftime("%Y%m%d").astype(int),
"date": dates,
"day_of_week": dates.day_name(),
"day_number": dates.dayofweek,
"week_number": dates.isocalendar().week.astype(int),
"month": dates.month,
"month_name": dates.month_name(),
"quarter": dates.quarter,
"year": dates.year,
"is_weekend": dates.dayofweek >= 5,
})
dim.to_sql("dim_date", self.engine, if_exists="replace", index=False)
print(f"Date dimension built: {len(dim)} rows")
def build_customer_dimension(self, staged_orders: pd.DataFrame):
"""Build customer dimension from order history."""
customers = staged_orders.groupby("customer_email").agg(
first_order=("created_at", "min"),
last_order=("created_at", "max"),
order_count=("order_id", "nunique"),
total_spent=("net_revenue", "sum"),
avg_order_value=("net_revenue", "mean"),
).reset_index()
# Segment by value
customers["segment"] = pd.cut(
customers["total_spent"],
bins=[0, 50, 200, 1000, float("inf")],
labels=["low", "medium", "high", "vip"],
)
customers["customer_key"] = range(1, len(customers) + 1)
customers.to_sql("dim_customer", self.engine, if_exists="replace", index=False)
print(f"Customer dimension built: {len(customers)} rows")
return customers
Step 4: Build Aggregate Tables
Pre-computed aggregates make dashboards fast. Rebuild them daily.
class AggregateBuilder:
"""Build pre-computed aggregate tables for reporting."""
def __init__(self, engine):
"""Initialise with database engine."""
self.engine = engine
def daily_summary(self):
"""Build daily summary aggregate."""
query = """
INSERT INTO agg_daily_summary
SELECT
date(created_at) as date,
platform,
COUNT(DISTINCT order_id) as orders,
SUM(net_revenue) as revenue,
AVG(net_revenue) as avg_order_value,
COUNT(DISTINCT customer_email) as unique_customers,
SUM(item_count) as items_sold,
SUM(discount) as total_discounts
FROM staged_orders
GROUP BY date(created_at), platform
ON CONFLICT (date, platform) DO UPDATE SET
orders = EXCLUDED.orders,
revenue = EXCLUDED.revenue,
avg_order_value = EXCLUDED.avg_order_value,
unique_customers = EXCLUDED.unique_customers,
items_sold = EXCLUDED.items_sold,
total_discounts = EXCLUDED.total_discounts
"""
with self.engine.connect() as conn:
conn.execute(text(query))
conn.commit()
print("Daily summary aggregate rebuilt")
def weekly_trends(self):
"""Build weekly trend comparisons."""
query = """
SELECT
date_trunc('week', date) as week_start,
platform,
SUM(revenue) as revenue,
SUM(orders) as orders,
AVG(avg_order_value) as aov,
SUM(unique_customers) as customers,
LAG(SUM(revenue)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_revenue,
LAG(SUM(orders)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_orders
FROM agg_daily_summary
GROUP BY date_trunc('week', date), platform
"""
df = pd.read_sql(query, self.engine)
df["revenue_change_pct"] = ((df["revenue"] - df["prev_revenue"]) / df["prev_revenue"] * 100).round(1)
df["order_change_pct"] = ((df["orders"] - df["prev_orders"]) / df["prev_orders"] * 100).round(1)
df.to_sql("agg_weekly_trends", self.engine, if_exists="replace", index=False)
print(f"Weekly trends built: {len(df)} weeks")
Step 5: Orchestrate the Pipeline
Wire everything together with Prefect for scheduling and monitoring.
from prefect import flow, task
from datetime import datetime, timedelta
@flow(name="ecommerce-data-pipeline")
def run_pipeline(db_url: str, shopify_url: str, shopify_token: str):
"""Full ecommerce data pipeline: extract → stage → warehouse → aggregate."""
# Extract
extractor = ShopifyExtractor(shopify_url, shopify_token)
yesterday = datetime.now() - timedelta(days=1)
orders = extractor.extract_orders(since=yesterday)
products = extractor.extract_products()
# Stage
stager = StagingTransformer()
staged = stager.stage_orders(orders)
# Load staged data
from sqlalchemy import create_engine
engine = create_engine(db_url)
staged.to_sql("staged_orders", engine, if_exists="append", index=False)
# Build dimensions
warehouse = WarehouseBuilder(db_url)
warehouse.build_customer_dimension(staged)
# Build aggregates
agg = AggregateBuilder(engine)
agg.daily_summary()
agg.weekly_trends()
print(f"Pipeline complete: {len(staged)} orders processed")
# Schedule
run_pipeline.serve(
name="daily-ecommerce-pipeline",
schedules=[{"cron": "0 5 * * *", "timezone": "Europe/London"}],
)
What This Replaces
| Ad-Hoc Approach | Pipeline Architecture |
|---|---|
| Separate scripts per report | Single pipeline, multiple outputs |
| Each script defines “revenue” differently | One staging layer with consistent business logic |
| No historical tracking | Warehouse with full history |
| Manual data pulls each morning | Automated daily ingestion |
| Spreadsheet-based comparisons | Pre-computed aggregates with trends |
| Platform-specific dashboards | Unified cross-platform reporting |
Next Steps
For building the API extraction layer in detail, see Shopify Reporting API: Pull Sales, Inventory, and Customer Data. For creating dashboards from the warehouse data, see How to Build Dashboards Without BI Tools.
For making this pipeline fault-tolerant, see Build Self-Healing Data Pipelines. For the unified API layer across platforms, see Ecommerce Reporting API: Automate Store Data.
For data quality checks between stages, see Build a Data Quality Framework in Python.
Data analytics services include ecommerce data pipeline design, warehouse architecture, and reporting automation.
Get in touch to discuss building a data pipeline for your ecommerce business.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.
Get in touch