Ecommerce Data Pipeline: Reporting Architecture That Scales

·7 min read·Data & Dashboards

Design a data pipeline architecture for ecommerce reporting that handles multi-platform ingestion, incremental loads, dimension modelling, and automated report generation — from raw API data to business-ready dashboards.

Ecommerce Data Pipeline: Reporting Architecture That ScalesAI Generated Image

Ecommerce reporting starts simple. Pull yesterday's orders from Shopify, dump them in a spreadsheet, email it. Then someone asks for inventory data. Then returns. Then customer lifetime value. Then multi-channel comparisons. Within months, you have twelve scripts, four spreadsheets, and nobody trusts the numbers because each report calculates revenue differently.

The fix is architecture: a pipeline that ingests raw data from every platform into a single warehouse, transforms it through consistent business logic, and produces reports that everyone trusts because they all use the same source of truth.

# Who This Is For

  • Ecommerce operators running stores across Shopify, WooCommerce, or Amazon who need unified reporting
  • Data engineers designing their first ecommerce analytics pipeline
  • Vibe coders building a data platform for an ecommerce client and wanting a proven architecture
  • CTOs evaluating whether to build or buy their analytics infrastructure

Familiarity with Python and SQL is helpful. The architecture patterns apply regardless of your specific tech stack.

# The Full Architecture

flowchart TD
  subgraph Sources["Data Sources"]
    SHOP[Shopify API]
    WOO[WooCommerce API]
    GA[Google Analytics]
    ADS[Ad Platforms]
  end
  subgraph Ingest["Ingestion Layer"]
    EXT["Platform\nExtractors"]
    RAW["Raw Data\nStore"]
  end
  subgraph Transform["Transformation Layer"]
    STAGE["Staging\n(Clean & Validate)"]
    DIM["Dimension\nModelling"]
    METRICS["Metric\nCalculations"]
  end
  subgraph Warehouse["Data Warehouse"]
    FACT["Fact Tables\n(Orders, Sessions)"]
    DIMS["Dimension Tables\n(Products, Customers)"]
    AGG["Aggregate Tables\n(Daily, Weekly)"]
  end
  subgraph Serve["Serving Layer"]
    DASH["Dashboards"]
    REPORT["Scheduled Reports"]
    API["Reporting API"]
  end
  SHOP --> EXT
  WOO --> EXT
  GA --> EXT
  ADS --> EXT
  EXT --> RAW
  RAW --> STAGE
  STAGE --> DIM
  DIM --> METRICS
  METRICS --> FACT
  METRICS --> DIMS
  METRICS --> AGG
  FACT --> DASH
  AGG --> REPORT
  DIMS --> API

# What You Will Need

bash
pip install pandas sqlalchemy requests prefect jinja2
  • A database (PostgreSQL recommended, SQLite for prototyping)
  • API credentials for your ecommerce platforms

# Step 1: Design the Ingestion Layer

Each platform gets its own extractor. They all output data in a standardised raw format.

python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import pandas as pd

@dataclass
class ExtractionResult:
    """Standardised output from any platform extractor."""
    platform: str
    entity: str
    records: pd.DataFrame
    extracted_at: datetime
    is_incremental: bool
    watermark: str  # Last processed ID or timestamp

class PlatformExtractor(ABC):
    """Base class for ecommerce platform extractors."""

    @abstractmethod
    def extract_orders(self, since: datetime = None) -> ExtractionResult:
        """Extract orders, optionally since a given timestamp."""
        pass

    @abstractmethod
    def extract_products(self) -> ExtractionResult:
        """Extract the full product catalogue."""
        pass

    @abstractmethod
    def extract_customers(self, since: datetime = None) -> ExtractionResult:
        """Extract customer data."""
        pass


class ShopifyExtractor(PlatformExtractor):
    """Extract data from Shopify Admin API."""

    def __init__(self, store_url: str, api_token: str):
        """Initialise with Shopify credentials."""
        self.base_url = f"https://{store_url}/admin/api/2026-01"
        self.headers = {"X-Shopify-Access-Token": api_token}
        self.session = requests.Session()
        self.session.headers.update(self.headers)

    def _fetch_paginated(self, endpoint: str, params: dict) -> list:
        """Fetch all pages from a Shopify API endpoint."""
        url = f"{self.base_url}/{endpoint}"
        all_records = []
        while url:
            resp = self.session.get(url, params=params)
            resp.raise_for_status()
            data = resp.json()
            key = list(data.keys())[0]
            all_records.extend(data[key])
            url = None
            params = {}
            link = resp.headers.get("Link", "")
            if 'rel="next"' in link:
                for part in link.split(","):
                    if 'rel="next"' in part:
                        url = part.split("<")[1].split(">")[0]
        return all_records

    def extract_orders(self, since: datetime = None) -> ExtractionResult:
        """Extract orders from Shopify."""
        params = {"limit": 250, "status": "any"}
        if since:
            params["updated_at_min"] = since.isoformat()

        raw = self._fetch_paginated("orders.json", params)
        df = pd.json_normalize(raw)
        print(f"Extracted {len(df)} orders from Shopify")

        return ExtractionResult(
            platform="shopify",
            entity="orders",
            records=df,
            extracted_at=datetime.now(),
            is_incremental=since is not None,
            watermark=df["updated_at"].max() if len(df) > 0 else "",
        )

    def extract_products(self) -> ExtractionResult:
        """Extract the complete product catalogue."""
        raw = self._fetch_paginated("products.json", {"limit": 250})
        df = pd.json_normalize(raw)
        print(f"Extracted {len(df)} products from Shopify")

        return ExtractionResult(
            platform="shopify",
            entity="products",
            records=df,
            extracted_at=datetime.now(),
            is_incremental=False,
            watermark="",
        )

    def extract_customers(self, since: datetime = None) -> ExtractionResult:
        """Extract customer records."""
        params = {"limit": 250}
        if since:
            params["updated_at_min"] = since.isoformat()

        raw = self._fetch_paginated("customers.json", params)
        df = pd.json_normalize(raw)
        print(f"Extracted {len(df)} customers from Shopify")

        return ExtractionResult(
            platform="shopify",
            entity="customers",
            records=df,
            extracted_at=datetime.now(),
            is_incremental=since is not None,
            watermark=df["updated_at"].max() if len(df) > 0 else "",
        )

# Step 2: Build the Staging Layer

The staging layer cleans and normalises raw data from different platforms into a consistent schema.

python
class StagingTransformer:
    """Transform raw platform data into standardised staging tables."""

    def stage_orders(self, result: ExtractionResult) -> pd.DataFrame:
        """Normalise orders from any platform into a standard schema."""
        df = result.records.copy()

        if result.platform == "shopify":
            staged = pd.DataFrame({
                "platform": "shopify",
                "order_id": df["id"].astype(str),
                "order_number": df["order_number"],
                "created_at": pd.to_datetime(df["created_at"]),
                "updated_at": pd.to_datetime(df["updated_at"]),
                "status": df["financial_status"],
                "currency": df["currency"],
                "subtotal": pd.to_numeric(df["subtotal_price"], errors="coerce"),
                "tax": pd.to_numeric(df["total_tax"], errors="coerce"),
                "shipping": pd.to_numeric(df.get("total_shipping_price_set.shop_money.amount", 0), errors="coerce"),
                "total": pd.to_numeric(df["total_price"], errors="coerce"),
                "discount": pd.to_numeric(df["total_discounts"], errors="coerce"),
                "refunded": pd.to_numeric(df.get("refunds", 0), errors="coerce").fillna(0),
                "customer_email": df.get("email", ""),
                "item_count": df.get("line_items", []).apply(len) if "line_items" in df.columns else 0,
            })
        else:
            raise ValueError(f"Unsupported platform: {result.platform}")

        # Calculate net revenue
        staged["net_revenue"] = staged["total"] - staged["refunded"]

        print(f"Staged {len(staged)} orders from {result.platform}")
        return staged

# Step 3: Design the Dimension Model

Star schema with fact and dimension tables. This is what makes reporting consistent.

flowchart LR
  subgraph Dimensions
    D_DATE["dim_date\n─────────\ndate_key\nday_of_week\nweek_number\nmonth\nquarter\nyear\nis_weekend"]
    D_PRODUCT["dim_product\n─────────\nproduct_key\ntitle\ncategory\nprice\nplatform"]
    D_CUSTOMER["dim_customer\n─────────\ncustomer_key\nemail\nfirst_order\norder_count\ntotal_spent\nsegment"]
  end
  subgraph Facts
    F_ORDER["fact_orders\n─────────\norder_key\ndate_key\ncustomer_key\nrevenue\ntax\nshipping\nnet_revenue\nitem_count"]
    F_ITEM["fact_order_items\n─────────\nitem_key\norder_key\nproduct_key\nquantity\nprice\ndiscount"]
  end
  D_DATE --> F_ORDER
  D_CUSTOMER --> F_ORDER
  D_PRODUCT --> F_ITEM
  F_ORDER --> F_ITEM
python
from sqlalchemy import create_engine, text

class WarehouseBuilder:
    """Build and maintain the ecommerce data warehouse."""

    def __init__(self, db_url: str):
        """Initialise with database connection."""
        self.engine = create_engine(db_url)

    def build_date_dimension(self, start_year: int = 2024, end_year: int = 2027):
        """Generate a date dimension table."""
        dates = pd.date_range(f"{start_year}-01-01", f"{end_year}-12-31")
        dim = pd.DataFrame({
            "date_key": dates.strftime("%Y%m%d").astype(int),
            "date": dates,
            "day_of_week": dates.day_name(),
            "day_number": dates.dayofweek,
            "week_number": dates.isocalendar().week.astype(int),
            "month": dates.month,
            "month_name": dates.month_name(),
            "quarter": dates.quarter,
            "year": dates.year,
            "is_weekend": dates.dayofweek >= 5,
        })
        dim.to_sql("dim_date", self.engine, if_exists="replace", index=False)
        print(f"Date dimension built: {len(dim)} rows")

    def build_customer_dimension(self, staged_orders: pd.DataFrame):
        """Build customer dimension from order history."""
        customers = staged_orders.groupby("customer_email").agg(
            first_order=("created_at", "min"),
            last_order=("created_at", "max"),
            order_count=("order_id", "nunique"),
            total_spent=("net_revenue", "sum"),
            avg_order_value=("net_revenue", "mean"),
        ).reset_index()

        # Segment by value
        customers["segment"] = pd.cut(
            customers["total_spent"],
            bins=[0, 50, 200, 1000, float("inf")],
            labels=["low", "medium", "high", "vip"],
        )

        customers["customer_key"] = range(1, len(customers) + 1)
        customers.to_sql("dim_customer", self.engine, if_exists="replace", index=False)
        print(f"Customer dimension built: {len(customers)} rows")
        return customers

# Step 4: Build Aggregate Tables

Pre-computed aggregates make dashboards fast. Rebuild them daily.

python
class AggregateBuilder:
    """Build pre-computed aggregate tables for reporting."""

    def __init__(self, engine):
        """Initialise with database engine."""
        self.engine = engine

    def daily_summary(self):
        """Build daily summary aggregate."""
        query = """
            INSERT INTO agg_daily_summary
            SELECT
                date(created_at) as date,
                platform,
                COUNT(DISTINCT order_id) as orders,
                SUM(net_revenue) as revenue,
                AVG(net_revenue) as avg_order_value,
                COUNT(DISTINCT customer_email) as unique_customers,
                SUM(item_count) as items_sold,
                SUM(discount) as total_discounts
            FROM staged_orders
            GROUP BY date(created_at), platform
            ON CONFLICT (date, platform) DO UPDATE SET
                orders = EXCLUDED.orders,
                revenue = EXCLUDED.revenue,
                avg_order_value = EXCLUDED.avg_order_value,
                unique_customers = EXCLUDED.unique_customers,
                items_sold = EXCLUDED.items_sold,
                total_discounts = EXCLUDED.total_discounts
        """
        with self.engine.connect() as conn:
            conn.execute(text(query))
            conn.commit()
        print("Daily summary aggregate rebuilt")

    def weekly_trends(self):
        """Build weekly trend comparisons."""
        query = """
            SELECT
                date_trunc('week', date) as week_start,
                platform,
                SUM(revenue) as revenue,
                SUM(orders) as orders,
                AVG(avg_order_value) as aov,
                SUM(unique_customers) as customers,
                LAG(SUM(revenue)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_revenue,
                LAG(SUM(orders)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_orders
            FROM agg_daily_summary
            GROUP BY date_trunc('week', date), platform
        """
        df = pd.read_sql(query, self.engine)
        df["revenue_change_pct"] = ((df["revenue"] - df["prev_revenue"]) / df["prev_revenue"] * 100).round(1)
        df["order_change_pct"] = ((df["orders"] - df["prev_orders"]) / df["prev_orders"] * 100).round(1)
        df.to_sql("agg_weekly_trends", self.engine, if_exists="replace", index=False)
        print(f"Weekly trends built: {len(df)} weeks")

# Step 5: Orchestrate the Pipeline

Wire everything together with Prefect for scheduling and monitoring.

python
from prefect import flow, task
from datetime import datetime, timedelta

@flow(name="ecommerce-data-pipeline")
def run_pipeline(db_url: str, shopify_url: str, shopify_token: str):
    """Full ecommerce data pipeline: extract → stage → warehouse → aggregate."""

    # Extract
    extractor = ShopifyExtractor(shopify_url, shopify_token)
    yesterday = datetime.now() - timedelta(days=1)
    orders = extractor.extract_orders(since=yesterday)
    products = extractor.extract_products()

    # Stage
    stager = StagingTransformer()
    staged = stager.stage_orders(orders)

    # Load staged data
    from sqlalchemy import create_engine
    engine = create_engine(db_url)
    staged.to_sql("staged_orders", engine, if_exists="append", index=False)

    # Build dimensions
    warehouse = WarehouseBuilder(db_url)
    warehouse.build_customer_dimension(staged)

    # Build aggregates
    agg = AggregateBuilder(engine)
    agg.daily_summary()
    agg.weekly_trends()

    print(f"Pipeline complete: {len(staged)} orders processed")

# Schedule
run_pipeline.serve(
    name="daily-ecommerce-pipeline",
    schedules=[{"cron": "0 5 * * *", "timezone": "Europe/London"}],
)

# What This Replaces

Ad-Hoc Approach Pipeline Architecture
Separate scripts per report Single pipeline, multiple outputs
Each script defines "revenue" differently One staging layer with consistent business logic
No historical tracking Warehouse with full history
Manual data pulls each morning Automated daily ingestion
Spreadsheet-based comparisons Pre-computed aggregates with trends
Platform-specific dashboards Unified cross-platform reporting

# Next Steps

For building the API extraction layer in detail, see Shopify Reporting API: Pull Sales, Inventory, and Customer Data. For creating dashboards from the warehouse data, see How to Build Dashboards Without BI Tools.

For making this pipeline fault-tolerant, see Build Self-Healing Data Pipelines. For the unified API layer across platforms, see Ecommerce Reporting API: Automate Store Data.

For data quality checks between stages, see Build a Data Quality Framework in Python.

Data analytics services include ecommerce data pipeline design, warehouse architecture, and reporting automation.

Get in touch to discuss building a data pipeline for your ecommerce business.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

ecommerce data pipelineecommerce reporting architectureecommerce data warehouseecommerce reporting apiecommerce data pipeline designecommerce analytics pipelineshopify data pipelineecommerce etl pipelineecommerce metrics pipelinedata pipeline reporting architecture