Ecommerce Reporting API: How to Automate Store Data Collection and Analysis

·8 min read·Ecommerce

Build a unified ecommerce reporting API layer that pulls data from Shopify, WooCommerce, and other platforms — normalise metrics, automate collection, and feed dashboards without manual exports.

Ecommerce Reporting API: How to Automate Store Data Collection and AnalysisAI Generated Image

Every ecommerce platform has an API. Most teams still export CSV files manually.

The gap between available data and how teams actually access it is enormous. Shopify, WooCommerce, BigCommerce — they all expose orders, products, customers, and inventory through REST or GraphQL APIs. Yet the reporting workflow in most organisations is still someone downloading a spreadsheet every Monday morning.

This guide builds a unified reporting API layer: one Python module that connects to any ecommerce platform, normalises the data into a consistent schema, and feeds it into dashboards and reports automatically.

# Who This Is For

  • Ecommerce managers who want real-time access to sales and inventory data without logging into multiple admin panels
  • Data engineers building reporting pipelines across multiple storefronts or platforms
  • Vibe coders who want a clean abstraction over messy ecommerce APIs
  • Agencies managing multiple client stores who need a single data model for all of them

Basic Python and REST API familiarity is enough. If you have used requests to call an API, you can follow this guide.

# The Reporting Architecture

flowchart TD
  subgraph Sources["Ecommerce Platforms"]
    S1[Shopify API]
    S2[WooCommerce API]
    S3[BigCommerce API]
  end
  subgraph Adapter["Normalisation Layer"]
    A[Platform Adapter]
  end
  subgraph Output["Reporting Layer"]
    DB[(Data Warehouse)]
    D[Dashboard]
    R[Scheduled Reports]
  end
  S1 --> A
  S2 --> A
  S3 --> A
  A --> DB
  DB --> D
  DB --> R

Each platform adapter handles authentication and pagination. The normalisation layer maps platform-specific fields to a shared schema. Downstream consumers never know which platform the data came from.

# What You Will Need

bash
pip install requests pandas sqlalchemy
  • API credentials for your ecommerce platform (Shopify Admin API token, WooCommerce REST API key, etc.)
  • A database for historical storage (SQLite for development, PostgreSQL for production)

# Step 1: Define the Shared Data Schema

The key insight: every ecommerce platform has the same core entities. Orders have line items, products have variants, customers have contact details. The field names differ but the structure is identical.

python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class NormalisedOrder:
    """Platform-agnostic order representation."""

    order_id: str
    platform: str
    created_at: datetime
    customer_email: str
    total: float
    currency: str
    line_items: list = field(default_factory=list)
    status: str = "unknown"
    source: Optional[str] = None

    def to_dict(self):
        """Convert to dictionary for DataFrame creation."""
        return {
            "order_id": self.order_id,
            "platform": self.platform,
            "created_at": self.created_at.isoformat(),
            "customer_email": self.customer_email,
            "total": self.total,
            "currency": self.currency,
            "status": self.status,
            "item_count": len(self.line_items),
            "source": self.source,
        }


@dataclass
class NormalisedProduct:
    """Platform-agnostic product representation."""

    product_id: str
    platform: str
    title: str
    price: float
    inventory_quantity: int
    status: str = "active"
    category: Optional[str] = None

This schema is the contract. Every platform adapter must produce these objects regardless of what the source API returns.

# Step 2: Build Platform Adapters

Each adapter inherits from a base class and implements the platform-specific logic. The base class enforces the interface.

python
import requests
from abc import ABC, abstractmethod

class EcommerceAdapter(ABC):
    """Base adapter for ecommerce platform APIs."""

    def __init__(self, config: dict):
        """Initialise with platform-specific configuration."""
        self.config = config
        self.session = requests.Session()

    @abstractmethod
    def fetch_orders(self, since_date: datetime) -> list[NormalisedOrder]:
        """Fetch orders created after the given date."""
        pass

    @abstractmethod
    def fetch_products(self) -> list[NormalisedProduct]:
        """Fetch all active products."""
        pass

    def _paginate(self, url: str, params: dict = None) -> list:
        """Handle API pagination. Override for platform-specific patterns."""
        results = []
        params = params or {}
        while url:
            response = self.session.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            results.extend(self._extract_items(data))
            url = self._next_page_url(data, response)
            params = {}
        return results

    @abstractmethod
    def _extract_items(self, response_data: dict) -> list:
        """Extract the item list from a response payload."""
        pass

    @abstractmethod
    def _next_page_url(self, response_data: dict, response) -> str | None:
        """Extract the next page URL from pagination metadata."""
        pass

# Shopify Adapter

python
class ShopifyAdapter(EcommerceAdapter):
    """Adapter for the Shopify Admin REST API."""

    def __init__(self, shop_url: str, api_token: str, api_version: str = "2026-01"):
        """Set up Shopify API connection."""
        config = {
            "base_url": f"https://{shop_url}/admin/api/{api_version}",
            "shop_url": shop_url,
        }
        super().__init__(config)
        self.session.headers["X-Shopify-Access-Token"] = api_token

    def fetch_orders(self, since_date: datetime) -> list[NormalisedOrder]:
        """Fetch orders from Shopify since the given date."""
        url = f"{self.config['base_url']}/orders.json"
        params = {
            "created_at_min": since_date.isoformat(),
            "status": "any",
            "limit": 250,
        }
        raw_orders = self._paginate(url, params)
        print(f"Fetched {len(raw_orders)} orders from Shopify")
        return [self._normalise_order(o) for o in raw_orders]

    def _normalise_order(self, raw: dict) -> NormalisedOrder:
        """Map Shopify order fields to normalised schema."""
        return NormalisedOrder(
            order_id=str(raw["id"]),
            platform="shopify",
            created_at=datetime.fromisoformat(raw["created_at"].replace("Z", "+00:00")),
            customer_email=raw.get("email", ""),
            total=float(raw["total_price"]),
            currency=raw["currency"],
            line_items=[item["title"] for item in raw.get("line_items", [])],
            status=raw.get("financial_status", "unknown"),
            source=raw.get("source_name", None),
        )

    def fetch_products(self) -> list[NormalisedProduct]:
        """Fetch all products from Shopify."""
        url = f"{self.config['base_url']}/products.json"
        raw_products = self._paginate(url, {"limit": 250})
        print(f"Fetched {len(raw_products)} products from Shopify")
        return [self._normalise_product(p) for p in raw_products]

    def _normalise_product(self, raw: dict) -> NormalisedProduct:
        """Map Shopify product fields to normalised schema."""
        variant = raw.get("variants", [{}])[0]
        return NormalisedProduct(
            product_id=str(raw["id"]),
            platform="shopify",
            title=raw["title"],
            price=float(variant.get("price", 0)),
            inventory_quantity=variant.get("inventory_quantity", 0),
            status=raw.get("status", "active"),
            category=raw.get("product_type", None),
        )

    def _extract_items(self, response_data: dict) -> list:
        """Extract items from Shopify response."""
        key = "orders" if "orders" in response_data else "products"
        return response_data.get(key, [])

    def _next_page_url(self, response_data: dict, response) -> str | None:
        """Extract next page URL from Shopify Link header."""
        link_header = response.headers.get("Link", "")
        if 'rel="next"' in link_header:
            for part in link_header.split(","):
                if 'rel="next"' in part:
                    return part.split("<")[1].split(">")[0]
        return None

# WooCommerce Adapter

python
class WooCommerceAdapter(EcommerceAdapter):
    """Adapter for the WooCommerce REST API."""

    def __init__(self, store_url: str, consumer_key: str, consumer_secret: str):
        """Set up WooCommerce API connection."""
        config = {"base_url": f"{store_url}/wp-json/wc/v3"}
        super().__init__(config)
        self.session.auth = (consumer_key, consumer_secret)

    def fetch_orders(self, since_date: datetime) -> list[NormalisedOrder]:
        """Fetch orders from WooCommerce since the given date."""
        url = f"{self.config['base_url']}/orders"
        params = {
            "after": since_date.isoformat(),
            "per_page": 100,
            "page": 1,
        }
        raw_orders = self._paginate(url, params)
        print(f"Fetched {len(raw_orders)} orders from WooCommerce")
        return [self._normalise_order(o) for o in raw_orders]

    def _normalise_order(self, raw: dict) -> NormalisedOrder:
        """Map WooCommerce order fields to normalised schema."""
        return NormalisedOrder(
            order_id=str(raw["id"]),
            platform="woocommerce",
            created_at=datetime.fromisoformat(raw["date_created"]),
            customer_email=raw.get("billing", {}).get("email", ""),
            total=float(raw["total"]),
            currency=raw["currency"],
            line_items=[item["name"] for item in raw.get("line_items", [])],
            status=raw.get("status", "unknown"),
        )

    def fetch_products(self) -> list[NormalisedProduct]:
        """Fetch all products from WooCommerce."""
        url = f"{self.config['base_url']}/products"
        raw_products = self._paginate(url, {"per_page": 100, "page": 1})
        print(f"Fetched {len(raw_products)} products from WooCommerce")
        return [self._normalise_product(p) for p in raw_products]

    def _normalise_product(self, raw: dict) -> NormalisedProduct:
        """Map WooCommerce product fields to normalised schema."""
        return NormalisedProduct(
            product_id=str(raw["id"]),
            platform="woocommerce",
            title=raw["name"],
            price=float(raw.get("price", 0) or 0),
            inventory_quantity=raw.get("stock_quantity", 0) or 0,
            status=raw.get("status", "active"),
            category=raw.get("categories", [{}])[0].get("name") if raw.get("categories") else None,
        )

    def _extract_items(self, response_data) -> list:
        """WooCommerce returns a list directly."""
        return response_data if isinstance(response_data, list) else []

    def _next_page_url(self, response_data, response) -> str | None:
        """WooCommerce uses X-WP-TotalPages header."""
        total_pages = int(response.headers.get("X-WP-TotalPages", 1))
        current_page = int(response.request.path_url.split("page=")[-1].split("&")[0]) if "page=" in response.request.path_url else 1
        if current_page < total_pages:
            return response.request.url.replace(f"page={current_page}", f"page={current_page + 1}")
        return None

# Step 3: Build the Aggregation Layer

Raw orders are useful for audit trails. For reporting, you need aggregations: daily revenue, conversion rates, top products.

python
import pandas as pd

class ReportingAggregator:
    """Aggregate normalised ecommerce data into reporting metrics."""

    def __init__(self, orders: list[NormalisedOrder]):
        """Initialise with a list of normalised orders."""
        self.df = pd.DataFrame([o.to_dict() for o in orders])
        self.df["created_at"] = pd.to_datetime(self.df["created_at"])
        self.df["date"] = self.df["created_at"].dt.date
        print(f"Loaded {len(self.df)} orders for aggregation")

    def daily_revenue(self) -> pd.DataFrame:
        """Calculate daily revenue totals."""
        return (
            self.df.groupby("date")
            .agg(
                orders=("order_id", "count"),
                revenue=("total", "sum"),
                avg_order_value=("total", "mean"),
            )
            .round(2)
            .reset_index()
        )

    def platform_breakdown(self) -> pd.DataFrame:
        """Revenue breakdown by platform."""
        return (
            self.df.groupby("platform")
            .agg(
                orders=("order_id", "count"),
                revenue=("total", "sum"),
                avg_order_value=("total", "mean"),
            )
            .round(2)
            .reset_index()
        )

    def top_sources(self, n: int = 10) -> pd.DataFrame:
        """Top traffic sources by order count."""
        return (
            self.df[self.df["source"].notna()]
            .groupby("source")
            .agg(orders=("order_id", "count"), revenue=("total", "sum"))
            .sort_values("orders", ascending=False)
            .head(n)
            .reset_index()
        )

# Sample Output

text
Daily Revenue Report
─────────────────────
Date        │ Orders │ Revenue   │ AOV
2026-05-01  │ 47     │ £3,241.50 │ £68.97
2026-05-02  │ 52     │ £3,890.00 │ £74.81
2026-05-03  │ 39     │ £2,710.25 │ £69.49
2026-05-04  │ 61     │ £4,502.75 │ £73.82

# Step 4: Store Historical Data

Aggregated snapshots are useful for trends. Store each run in a database so you can track week-over-week changes.

python
from sqlalchemy import create_engine

class DataWarehouse:
    """Store normalised ecommerce data for historical analysis."""

    def __init__(self, connection_string: str = "sqlite:///ecommerce_warehouse.db"):
        """Connect to the data warehouse."""
        self.engine = create_engine(connection_string)
        print(f"Connected to warehouse: {connection_string}")

    def store_orders(self, orders: list[NormalisedOrder]):
        """Append new orders to the warehouse."""
        df = pd.DataFrame([o.to_dict() for o in orders])
        df.to_sql("orders", self.engine, if_exists="append", index=False)
        print(f"Stored {len(df)} orders in warehouse")

    def store_daily_snapshot(self, metrics: pd.DataFrame, snapshot_date: str):
        """Store a daily metrics snapshot."""
        metrics["snapshot_date"] = snapshot_date
        metrics.to_sql("daily_snapshots", self.engine, if_exists="append", index=False)
        print(f"Stored daily snapshot for {snapshot_date}")

    def get_trend(self, days: int = 30) -> pd.DataFrame:
        """Get revenue trend for the last N days."""
        query = f"""
            SELECT date, SUM(revenue) as revenue, SUM(orders) as orders
            FROM daily_snapshots
            WHERE date >= date('now', '-{days} days')
            GROUP BY date
            ORDER BY date
        """
        return pd.read_sql(query, self.engine)

# Step 5: Wire It All Together

The final piece connects adapters, aggregation, and storage into a single reporting pipeline.

python
from datetime import datetime, timedelta

def run_reporting_pipeline(adapters: list[EcommerceAdapter], warehouse: DataWarehouse):
    """Execute the full reporting pipeline across all platforms."""
    since = datetime.now() - timedelta(days=7)
    all_orders = []

    for adapter in adapters:
        orders = adapter.fetch_orders(since)
        all_orders.extend(orders)

    print(f"\nTotal orders across all platforms: {len(all_orders)}")

    warehouse.store_orders(all_orders)

    aggregator = ReportingAggregator(all_orders)
    daily = aggregator.daily_revenue()
    warehouse.store_daily_snapshot(daily, datetime.now().strftime("%Y-%m-%d"))

    platform = aggregator.platform_breakdown()
    sources = aggregator.top_sources()

    print("\n--- Platform Breakdown ---")
    print(platform.to_string(index=False))
    print("\n--- Top Sources ---")
    print(sources.to_string(index=False))

    return {"daily": daily, "platform": platform, "sources": sources}


# Usage
if __name__ == "__main__":
    adapters = [
        ShopifyAdapter("my-store.myshopify.com", "shpat_xxxx"),
        WooCommerceAdapter("https://mysite.com", "ck_xxxx", "cs_xxxx"),
    ]
    warehouse = DataWarehouse()
    results = run_reporting_pipeline(adapters, warehouse)

# What This Replaces

Manual Process Automated Equivalent
Log into Shopify admin, export CSV API pulls data automatically
Log into WooCommerce, download orders Same normalised schema, zero manual work
Copy data into master spreadsheet Warehouse stores all data with history
Manually calculate daily revenue Aggregation runs in seconds
Email spreadsheet to team Dashboard updates in real time
Compare last week manually Historical snapshots enable trend queries

# Next Steps

For automating the Excel report output from this pipeline, see Automate Excel Reports with Python. To build a visual dashboard on top of the warehouse data, see How to Build a Data Dashboard Without Manual Excel Work.

If you are running a Shopify store specifically, see How to Automate Shopify Reports with Python for a deep dive on the Shopify API and Shopify Reporting API: Pull Sales and Inventory Data for advanced data extraction patterns.

For scheduling this pipeline to run automatically, see Schedule and Orchestrate Workflows with Prefect.

Ecommerce optimisation services include building custom reporting APIs and data pipelines tailored to your store.

Get in touch to discuss automating your ecommerce reporting.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

ecommerce reporting apiecommerce data apishopify reporting apiecommerce analytics apiautomate ecommerce dataecommerce dashboard apistore data collectionecommerce metrics apimulti-platform ecommerce reportingecommerce data pipeline