Ecommerce Data Pipeline: Reporting Architecture That Scales

· 7 min read · Data & Dashboards

Design a data pipeline architecture for ecommerce reporting that handles multi-platform ingestion, incremental loads, dimension modelling, and automated report generation — from raw API data to business-ready dashboards.

Ecommerce Data Pipeline: Reporting Architecture That Scales

Ecommerce reporting starts simple. Pull yesterday’s orders from Shopify, dump them in a spreadsheet, email it. Then someone asks for inventory data. Then returns. Then customer lifetime value. Then multi-channel comparisons. Within months, you have twelve scripts, four spreadsheets, and nobody trusts the numbers because each report calculates revenue differently.

The fix is architecture: a pipeline that ingests raw data from every platform into a single warehouse, transforms it through consistent business logic, and produces reports that everyone trusts because they all use the same source of truth.

Who This Is For

  • Ecommerce operators running stores across Shopify, WooCommerce, or Amazon who need unified reporting
  • Data engineers designing their first ecommerce analytics pipeline
  • Vibe coders building a data platform for an ecommerce client and wanting a proven architecture
  • CTOs evaluating whether to build or buy their analytics infrastructure

Familiarity with Python and SQL is helpful. The architecture patterns apply regardless of your specific tech stack.

The Full Architecture

What You Will Need

pip install pandas sqlalchemy requests prefect jinja2
  • A database (PostgreSQL recommended, SQLite for prototyping)
  • API credentials for your ecommerce platforms

Step 1: Design the Ingestion Layer

Each platform gets its own extractor. They all output data in a standardised raw format.

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import pandas as pd

@dataclass
class ExtractionResult:
    """Standardised output from any platform extractor."""
    platform: str
    entity: str
    records: pd.DataFrame
    extracted_at: datetime
    is_incremental: bool
    watermark: str  # Last processed ID or timestamp

class PlatformExtractor(ABC):
    """Base class for ecommerce platform extractors."""

    @abstractmethod
    def extract_orders(self, since: datetime = None) -> ExtractionResult:
        """Extract orders, optionally since a given timestamp."""
        pass

    @abstractmethod
    def extract_products(self) -> ExtractionResult:
        """Extract the full product catalogue."""
        pass

    @abstractmethod
    def extract_customers(self, since: datetime = None) -> ExtractionResult:
        """Extract customer data."""
        pass


class ShopifyExtractor(PlatformExtractor):
    """Extract data from Shopify Admin API."""

    def __init__(self, store_url: str, api_token: str):
        """Initialise with Shopify credentials."""
        self.base_url = f"https://{store_url}/admin/api/2026-01"
        self.headers = {"X-Shopify-Access-Token": api_token}
        self.session = requests.Session()
        self.session.headers.update(self.headers)

    def _fetch_paginated(self, endpoint: str, params: dict) -> list:
        """Fetch all pages from a Shopify API endpoint."""
        url = f"{self.base_url}/{endpoint}"
        all_records = []
        while url:
            resp = self.session.get(url, params=params)
            resp.raise_for_status()
            data = resp.json()
            key = list(data.keys())[0]
            all_records.extend(data[key])
            url = None
            params = {}
            link = resp.headers.get("Link", "")
            if 'rel="next"' in link:
                for part in link.split(","):
                    if 'rel="next"' in part:
                        url = part.split("<")[1].split(">")[0]
        return all_records

    def extract_orders(self, since: datetime = None) -> ExtractionResult:
        """Extract orders from Shopify."""
        params = {"limit": 250, "status": "any"}
        if since:
            params["updated_at_min"] = since.isoformat()

        raw = self._fetch_paginated("orders.json", params)
        df = pd.json_normalize(raw)
        print(f"Extracted {len(df)} orders from Shopify")

        return ExtractionResult(
            platform="shopify",
            entity="orders",
            records=df,
            extracted_at=datetime.now(),
            is_incremental=since is not None,
            watermark=df["updated_at"].max() if len(df) > 0 else "",
        )

    def extract_products(self) -> ExtractionResult:
        """Extract the complete product catalogue."""
        raw = self._fetch_paginated("products.json", {"limit": 250})
        df = pd.json_normalize(raw)
        print(f"Extracted {len(df)} products from Shopify")

        return ExtractionResult(
            platform="shopify",
            entity="products",
            records=df,
            extracted_at=datetime.now(),
            is_incremental=False,
            watermark="",
        )

    def extract_customers(self, since: datetime = None) -> ExtractionResult:
        """Extract customer records."""
        params = {"limit": 250}
        if since:
            params["updated_at_min"] = since.isoformat()

        raw = self._fetch_paginated("customers.json", params)
        df = pd.json_normalize(raw)
        print(f"Extracted {len(df)} customers from Shopify")

        return ExtractionResult(
            platform="shopify",
            entity="customers",
            records=df,
            extracted_at=datetime.now(),
            is_incremental=since is not None,
            watermark=df["updated_at"].max() if len(df) > 0 else "",
        )

Step 2: Build the Staging Layer

The staging layer cleans and normalises raw data from different platforms into a consistent schema.

class StagingTransformer:
    """Transform raw platform data into standardised staging tables."""

    def stage_orders(self, result: ExtractionResult) -> pd.DataFrame:
        """Normalise orders from any platform into a standard schema."""
        df = result.records.copy()

        if result.platform == "shopify":
            staged = pd.DataFrame({
                "platform": "shopify",
                "order_id": df["id"].astype(str),
                "order_number": df["order_number"],
                "created_at": pd.to_datetime(df["created_at"]),
                "updated_at": pd.to_datetime(df["updated_at"]),
                "status": df["financial_status"],
                "currency": df["currency"],
                "subtotal": pd.to_numeric(df["subtotal_price"], errors="coerce"),
                "tax": pd.to_numeric(df["total_tax"], errors="coerce"),
                "shipping": pd.to_numeric(df.get("total_shipping_price_set.shop_money.amount", 0), errors="coerce"),
                "total": pd.to_numeric(df["total_price"], errors="coerce"),
                "discount": pd.to_numeric(df["total_discounts"], errors="coerce"),
                "refunded": pd.to_numeric(df.get("refunds", 0), errors="coerce").fillna(0),
                "customer_email": df.get("email", ""),
                "item_count": df.get("line_items", []).apply(len) if "line_items" in df.columns else 0,
            })
        else:
            raise ValueError(f"Unsupported platform: {result.platform}")

        # Calculate net revenue
        staged["net_revenue"] = staged["total"] - staged["refunded"]

        print(f"Staged {len(staged)} orders from {result.platform}")
        return staged

Step 3: Design the Dimension Model

Star schema with fact and dimension tables. This is what makes reporting consistent.

from sqlalchemy import create_engine, text

class WarehouseBuilder:
    """Build and maintain the ecommerce data warehouse."""

    def __init__(self, db_url: str):
        """Initialise with database connection."""
        self.engine = create_engine(db_url)

    def build_date_dimension(self, start_year: int = 2024, end_year: int = 2027):
        """Generate a date dimension table."""
        dates = pd.date_range(f"{start_year}-01-01", f"{end_year}-12-31")
        dim = pd.DataFrame({
            "date_key": dates.strftime("%Y%m%d").astype(int),
            "date": dates,
            "day_of_week": dates.day_name(),
            "day_number": dates.dayofweek,
            "week_number": dates.isocalendar().week.astype(int),
            "month": dates.month,
            "month_name": dates.month_name(),
            "quarter": dates.quarter,
            "year": dates.year,
            "is_weekend": dates.dayofweek >= 5,
        })
        dim.to_sql("dim_date", self.engine, if_exists="replace", index=False)
        print(f"Date dimension built: {len(dim)} rows")

    def build_customer_dimension(self, staged_orders: pd.DataFrame):
        """Build customer dimension from order history."""
        customers = staged_orders.groupby("customer_email").agg(
            first_order=("created_at", "min"),
            last_order=("created_at", "max"),
            order_count=("order_id", "nunique"),
            total_spent=("net_revenue", "sum"),
            avg_order_value=("net_revenue", "mean"),
        ).reset_index()

        # Segment by value
        customers["segment"] = pd.cut(
            customers["total_spent"],
            bins=[0, 50, 200, 1000, float("inf")],
            labels=["low", "medium", "high", "vip"],
        )

        customers["customer_key"] = range(1, len(customers) + 1)
        customers.to_sql("dim_customer", self.engine, if_exists="replace", index=False)
        print(f"Customer dimension built: {len(customers)} rows")
        return customers

Step 4: Build Aggregate Tables

Pre-computed aggregates make dashboards fast. Rebuild them daily.

class AggregateBuilder:
    """Build pre-computed aggregate tables for reporting."""

    def __init__(self, engine):
        """Initialise with database engine."""
        self.engine = engine

    def daily_summary(self):
        """Build daily summary aggregate."""
        query = """
            INSERT INTO agg_daily_summary
            SELECT
                date(created_at) as date,
                platform,
                COUNT(DISTINCT order_id) as orders,
                SUM(net_revenue) as revenue,
                AVG(net_revenue) as avg_order_value,
                COUNT(DISTINCT customer_email) as unique_customers,
                SUM(item_count) as items_sold,
                SUM(discount) as total_discounts
            FROM staged_orders
            GROUP BY date(created_at), platform
            ON CONFLICT (date, platform) DO UPDATE SET
                orders = EXCLUDED.orders,
                revenue = EXCLUDED.revenue,
                avg_order_value = EXCLUDED.avg_order_value,
                unique_customers = EXCLUDED.unique_customers,
                items_sold = EXCLUDED.items_sold,
                total_discounts = EXCLUDED.total_discounts
        """
        with self.engine.connect() as conn:
            conn.execute(text(query))
            conn.commit()
        print("Daily summary aggregate rebuilt")

    def weekly_trends(self):
        """Build weekly trend comparisons."""
        query = """
            SELECT
                date_trunc('week', date) as week_start,
                platform,
                SUM(revenue) as revenue,
                SUM(orders) as orders,
                AVG(avg_order_value) as aov,
                SUM(unique_customers) as customers,
                LAG(SUM(revenue)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_revenue,
                LAG(SUM(orders)) OVER (PARTITION BY platform ORDER BY date_trunc('week', date)) as prev_orders
            FROM agg_daily_summary
            GROUP BY date_trunc('week', date), platform
        """
        df = pd.read_sql(query, self.engine)
        df["revenue_change_pct"] = ((df["revenue"] - df["prev_revenue"]) / df["prev_revenue"] * 100).round(1)
        df["order_change_pct"] = ((df["orders"] - df["prev_orders"]) / df["prev_orders"] * 100).round(1)
        df.to_sql("agg_weekly_trends", self.engine, if_exists="replace", index=False)
        print(f"Weekly trends built: {len(df)} weeks")

Step 5: Orchestrate the Pipeline

Wire everything together with Prefect for scheduling and monitoring.

from prefect import flow, task
from datetime import datetime, timedelta

@flow(name="ecommerce-data-pipeline")
def run_pipeline(db_url: str, shopify_url: str, shopify_token: str):
    """Full ecommerce data pipeline: extract → stage → warehouse → aggregate."""

    # Extract
    extractor = ShopifyExtractor(shopify_url, shopify_token)
    yesterday = datetime.now() - timedelta(days=1)
    orders = extractor.extract_orders(since=yesterday)
    products = extractor.extract_products()

    # Stage
    stager = StagingTransformer()
    staged = stager.stage_orders(orders)

    # Load staged data
    from sqlalchemy import create_engine
    engine = create_engine(db_url)
    staged.to_sql("staged_orders", engine, if_exists="append", index=False)

    # Build dimensions
    warehouse = WarehouseBuilder(db_url)
    warehouse.build_customer_dimension(staged)

    # Build aggregates
    agg = AggregateBuilder(engine)
    agg.daily_summary()
    agg.weekly_trends()

    print(f"Pipeline complete: {len(staged)} orders processed")

# Schedule
run_pipeline.serve(
    name="daily-ecommerce-pipeline",
    schedules=[{"cron": "0 5 * * *", "timezone": "Europe/London"}],
)

What This Replaces

Ad-Hoc ApproachPipeline Architecture
Separate scripts per reportSingle pipeline, multiple outputs
Each script defines “revenue” differentlyOne staging layer with consistent business logic
No historical trackingWarehouse with full history
Manual data pulls each morningAutomated daily ingestion
Spreadsheet-based comparisonsPre-computed aggregates with trends
Platform-specific dashboardsUnified cross-platform reporting

Next Steps

For building the API extraction layer in detail, see Shopify Reporting API: Pull Sales, Inventory, and Customer Data. For creating dashboards from the warehouse data, see How to Build Dashboards Without BI Tools.

For making this pipeline fault-tolerant, see Build Self-Healing Data Pipelines. For the unified API layer across platforms, see Ecommerce Reporting API: Automate Store Data.

For data quality checks between stages, see Build a Data Quality Framework in Python.

Data analytics services include ecommerce data pipeline design, warehouse architecture, and reporting automation.

Get in touch to discuss building a data pipeline for your ecommerce business.

ecommerce data pipeline ecommerce reporting architecture ecommerce data warehouse ecommerce reporting api ecommerce data pipeline design ecommerce analytics pipeline shopify data pipeline ecommerce etl pipeline ecommerce metrics pipeline data pipeline reporting architecture

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles