Ecommerce Reporting API: How to Automate Store Data Collection and Analysis
Build a unified ecommerce reporting API layer that pulls data from Shopify, WooCommerce, and other platforms — normalise metrics, automate collection, and feed dashboards without manual exports.
AI Generated ImageEvery ecommerce platform has an API. Most teams still export CSV files manually.
The gap between available data and how teams actually access it is enormous. Shopify, WooCommerce, BigCommerce — they all expose orders, products, customers, and inventory through REST or GraphQL APIs. Yet the reporting workflow in most organisations is still someone downloading a spreadsheet every Monday morning.
This guide builds a unified reporting API layer: one Python module that connects to any ecommerce platform, normalises the data into a consistent schema, and feeds it into dashboards and reports automatically.
# Who This Is For
- Ecommerce managers who want real-time access to sales and inventory data without logging into multiple admin panels
- Data engineers building reporting pipelines across multiple storefronts or platforms
- Vibe coders who want a clean abstraction over messy ecommerce APIs
- Agencies managing multiple client stores who need a single data model for all of them
Basic Python and REST API familiarity is enough. If you have used requests to call an API, you can follow this guide.
# The Reporting Architecture
flowchart TD
subgraph Sources["Ecommerce Platforms"]
S1[Shopify API]
S2[WooCommerce API]
S3[BigCommerce API]
end
subgraph Adapter["Normalisation Layer"]
A[Platform Adapter]
end
subgraph Output["Reporting Layer"]
DB[(Data Warehouse)]
D[Dashboard]
R[Scheduled Reports]
end
S1 --> A
S2 --> A
S3 --> A
A --> DB
DB --> D
DB --> REach platform adapter handles authentication and pagination. The normalisation layer maps platform-specific fields to a shared schema. Downstream consumers never know which platform the data came from.
# What You Will Need
pip install requests pandas sqlalchemy
- API credentials for your ecommerce platform (Shopify Admin API token, WooCommerce REST API key, etc.)
- A database for historical storage (SQLite for development, PostgreSQL for production)
# Step 1: Define the Shared Data Schema
The key insight: every ecommerce platform has the same core entities. Orders have line items, products have variants, customers have contact details. The field names differ but the structure is identical.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class NormalisedOrder:
"""Platform-agnostic order representation."""
order_id: str
platform: str
created_at: datetime
customer_email: str
total: float
currency: str
line_items: list = field(default_factory=list)
status: str = "unknown"
source: Optional[str] = None
def to_dict(self):
"""Convert to dictionary for DataFrame creation."""
return {
"order_id": self.order_id,
"platform": self.platform,
"created_at": self.created_at.isoformat(),
"customer_email": self.customer_email,
"total": self.total,
"currency": self.currency,
"status": self.status,
"item_count": len(self.line_items),
"source": self.source,
}
@dataclass
class NormalisedProduct:
"""Platform-agnostic product representation."""
product_id: str
platform: str
title: str
price: float
inventory_quantity: int
status: str = "active"
category: Optional[str] = None
This schema is the contract. Every platform adapter must produce these objects regardless of what the source API returns.
# Step 2: Build Platform Adapters
Each adapter inherits from a base class and implements the platform-specific logic. The base class enforces the interface.
import requests
from abc import ABC, abstractmethod
class EcommerceAdapter(ABC):
"""Base adapter for ecommerce platform APIs."""
def __init__(self, config: dict):
"""Initialise with platform-specific configuration."""
self.config = config
self.session = requests.Session()
@abstractmethod
def fetch_orders(self, since_date: datetime) -> list[NormalisedOrder]:
"""Fetch orders created after the given date."""
pass
@abstractmethod
def fetch_products(self) -> list[NormalisedProduct]:
"""Fetch all active products."""
pass
def _paginate(self, url: str, params: dict = None) -> list:
"""Handle API pagination. Override for platform-specific patterns."""
results = []
params = params or {}
while url:
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
results.extend(self._extract_items(data))
url = self._next_page_url(data, response)
params = {}
return results
@abstractmethod
def _extract_items(self, response_data: dict) -> list:
"""Extract the item list from a response payload."""
pass
@abstractmethod
def _next_page_url(self, response_data: dict, response) -> str | None:
"""Extract the next page URL from pagination metadata."""
pass
# Shopify Adapter
class ShopifyAdapter(EcommerceAdapter):
"""Adapter for the Shopify Admin REST API."""
def __init__(self, shop_url: str, api_token: str, api_version: str = "2026-01"):
"""Set up Shopify API connection."""
config = {
"base_url": f"https://{shop_url}/admin/api/{api_version}",
"shop_url": shop_url,
}
super().__init__(config)
self.session.headers["X-Shopify-Access-Token"] = api_token
def fetch_orders(self, since_date: datetime) -> list[NormalisedOrder]:
"""Fetch orders from Shopify since the given date."""
url = f"{self.config['base_url']}/orders.json"
params = {
"created_at_min": since_date.isoformat(),
"status": "any",
"limit": 250,
}
raw_orders = self._paginate(url, params)
print(f"Fetched {len(raw_orders)} orders from Shopify")
return [self._normalise_order(o) for o in raw_orders]
def _normalise_order(self, raw: dict) -> NormalisedOrder:
"""Map Shopify order fields to normalised schema."""
return NormalisedOrder(
order_id=str(raw["id"]),
platform="shopify",
created_at=datetime.fromisoformat(raw["created_at"].replace("Z", "+00:00")),
customer_email=raw.get("email", ""),
total=float(raw["total_price"]),
currency=raw["currency"],
line_items=[item["title"] for item in raw.get("line_items", [])],
status=raw.get("financial_status", "unknown"),
source=raw.get("source_name", None),
)
def fetch_products(self) -> list[NormalisedProduct]:
"""Fetch all products from Shopify."""
url = f"{self.config['base_url']}/products.json"
raw_products = self._paginate(url, {"limit": 250})
print(f"Fetched {len(raw_products)} products from Shopify")
return [self._normalise_product(p) for p in raw_products]
def _normalise_product(self, raw: dict) -> NormalisedProduct:
"""Map Shopify product fields to normalised schema."""
variant = raw.get("variants", [{}])[0]
return NormalisedProduct(
product_id=str(raw["id"]),
platform="shopify",
title=raw["title"],
price=float(variant.get("price", 0)),
inventory_quantity=variant.get("inventory_quantity", 0),
status=raw.get("status", "active"),
category=raw.get("product_type", None),
)
def _extract_items(self, response_data: dict) -> list:
"""Extract items from Shopify response."""
key = "orders" if "orders" in response_data else "products"
return response_data.get(key, [])
def _next_page_url(self, response_data: dict, response) -> str | None:
"""Extract next page URL from Shopify Link header."""
link_header = response.headers.get("Link", "")
if 'rel="next"' in link_header:
for part in link_header.split(","):
if 'rel="next"' in part:
return part.split("<")[1].split(">")[0]
return None
# WooCommerce Adapter
class WooCommerceAdapter(EcommerceAdapter):
"""Adapter for the WooCommerce REST API."""
def __init__(self, store_url: str, consumer_key: str, consumer_secret: str):
"""Set up WooCommerce API connection."""
config = {"base_url": f"{store_url}/wp-json/wc/v3"}
super().__init__(config)
self.session.auth = (consumer_key, consumer_secret)
def fetch_orders(self, since_date: datetime) -> list[NormalisedOrder]:
"""Fetch orders from WooCommerce since the given date."""
url = f"{self.config['base_url']}/orders"
params = {
"after": since_date.isoformat(),
"per_page": 100,
"page": 1,
}
raw_orders = self._paginate(url, params)
print(f"Fetched {len(raw_orders)} orders from WooCommerce")
return [self._normalise_order(o) for o in raw_orders]
def _normalise_order(self, raw: dict) -> NormalisedOrder:
"""Map WooCommerce order fields to normalised schema."""
return NormalisedOrder(
order_id=str(raw["id"]),
platform="woocommerce",
created_at=datetime.fromisoformat(raw["date_created"]),
customer_email=raw.get("billing", {}).get("email", ""),
total=float(raw["total"]),
currency=raw["currency"],
line_items=[item["name"] for item in raw.get("line_items", [])],
status=raw.get("status", "unknown"),
)
def fetch_products(self) -> list[NormalisedProduct]:
"""Fetch all products from WooCommerce."""
url = f"{self.config['base_url']}/products"
raw_products = self._paginate(url, {"per_page": 100, "page": 1})
print(f"Fetched {len(raw_products)} products from WooCommerce")
return [self._normalise_product(p) for p in raw_products]
def _normalise_product(self, raw: dict) -> NormalisedProduct:
"""Map WooCommerce product fields to normalised schema."""
return NormalisedProduct(
product_id=str(raw["id"]),
platform="woocommerce",
title=raw["name"],
price=float(raw.get("price", 0) or 0),
inventory_quantity=raw.get("stock_quantity", 0) or 0,
status=raw.get("status", "active"),
category=raw.get("categories", [{}])[0].get("name") if raw.get("categories") else None,
)
def _extract_items(self, response_data) -> list:
"""WooCommerce returns a list directly."""
return response_data if isinstance(response_data, list) else []
def _next_page_url(self, response_data, response) -> str | None:
"""WooCommerce uses X-WP-TotalPages header."""
total_pages = int(response.headers.get("X-WP-TotalPages", 1))
current_page = int(response.request.path_url.split("page=")[-1].split("&")[0]) if "page=" in response.request.path_url else 1
if current_page < total_pages:
return response.request.url.replace(f"page={current_page}", f"page={current_page + 1}")
return None
# Step 3: Build the Aggregation Layer
Raw orders are useful for audit trails. For reporting, you need aggregations: daily revenue, conversion rates, top products.
import pandas as pd
class ReportingAggregator:
"""Aggregate normalised ecommerce data into reporting metrics."""
def __init__(self, orders: list[NormalisedOrder]):
"""Initialise with a list of normalised orders."""
self.df = pd.DataFrame([o.to_dict() for o in orders])
self.df["created_at"] = pd.to_datetime(self.df["created_at"])
self.df["date"] = self.df["created_at"].dt.date
print(f"Loaded {len(self.df)} orders for aggregation")
def daily_revenue(self) -> pd.DataFrame:
"""Calculate daily revenue totals."""
return (
self.df.groupby("date")
.agg(
orders=("order_id", "count"),
revenue=("total", "sum"),
avg_order_value=("total", "mean"),
)
.round(2)
.reset_index()
)
def platform_breakdown(self) -> pd.DataFrame:
"""Revenue breakdown by platform."""
return (
self.df.groupby("platform")
.agg(
orders=("order_id", "count"),
revenue=("total", "sum"),
avg_order_value=("total", "mean"),
)
.round(2)
.reset_index()
)
def top_sources(self, n: int = 10) -> pd.DataFrame:
"""Top traffic sources by order count."""
return (
self.df[self.df["source"].notna()]
.groupby("source")
.agg(orders=("order_id", "count"), revenue=("total", "sum"))
.sort_values("orders", ascending=False)
.head(n)
.reset_index()
)
# Sample Output
Daily Revenue Report
─────────────────────
Date │ Orders │ Revenue │ AOV
2026-05-01 │ 47 │ £3,241.50 │ £68.97
2026-05-02 │ 52 │ £3,890.00 │ £74.81
2026-05-03 │ 39 │ £2,710.25 │ £69.49
2026-05-04 │ 61 │ £4,502.75 │ £73.82
# Step 4: Store Historical Data
Aggregated snapshots are useful for trends. Store each run in a database so you can track week-over-week changes.
from sqlalchemy import create_engine
class DataWarehouse:
"""Store normalised ecommerce data for historical analysis."""
def __init__(self, connection_string: str = "sqlite:///ecommerce_warehouse.db"):
"""Connect to the data warehouse."""
self.engine = create_engine(connection_string)
print(f"Connected to warehouse: {connection_string}")
def store_orders(self, orders: list[NormalisedOrder]):
"""Append new orders to the warehouse."""
df = pd.DataFrame([o.to_dict() for o in orders])
df.to_sql("orders", self.engine, if_exists="append", index=False)
print(f"Stored {len(df)} orders in warehouse")
def store_daily_snapshot(self, metrics: pd.DataFrame, snapshot_date: str):
"""Store a daily metrics snapshot."""
metrics["snapshot_date"] = snapshot_date
metrics.to_sql("daily_snapshots", self.engine, if_exists="append", index=False)
print(f"Stored daily snapshot for {snapshot_date}")
def get_trend(self, days: int = 30) -> pd.DataFrame:
"""Get revenue trend for the last N days."""
query = f"""
SELECT date, SUM(revenue) as revenue, SUM(orders) as orders
FROM daily_snapshots
WHERE date >= date('now', '-{days} days')
GROUP BY date
ORDER BY date
"""
return pd.read_sql(query, self.engine)
# Step 5: Wire It All Together
The final piece connects adapters, aggregation, and storage into a single reporting pipeline.
from datetime import datetime, timedelta
def run_reporting_pipeline(adapters: list[EcommerceAdapter], warehouse: DataWarehouse):
"""Execute the full reporting pipeline across all platforms."""
since = datetime.now() - timedelta(days=7)
all_orders = []
for adapter in adapters:
orders = adapter.fetch_orders(since)
all_orders.extend(orders)
print(f"\nTotal orders across all platforms: {len(all_orders)}")
warehouse.store_orders(all_orders)
aggregator = ReportingAggregator(all_orders)
daily = aggregator.daily_revenue()
warehouse.store_daily_snapshot(daily, datetime.now().strftime("%Y-%m-%d"))
platform = aggregator.platform_breakdown()
sources = aggregator.top_sources()
print("\n--- Platform Breakdown ---")
print(platform.to_string(index=False))
print("\n--- Top Sources ---")
print(sources.to_string(index=False))
return {"daily": daily, "platform": platform, "sources": sources}
# Usage
if __name__ == "__main__":
adapters = [
ShopifyAdapter("my-store.myshopify.com", "shpat_xxxx"),
WooCommerceAdapter("https://mysite.com", "ck_xxxx", "cs_xxxx"),
]
warehouse = DataWarehouse()
results = run_reporting_pipeline(adapters, warehouse)
# What This Replaces
| Manual Process | Automated Equivalent |
|---|---|
| Log into Shopify admin, export CSV | API pulls data automatically |
| Log into WooCommerce, download orders | Same normalised schema, zero manual work |
| Copy data into master spreadsheet | Warehouse stores all data with history |
| Manually calculate daily revenue | Aggregation runs in seconds |
| Email spreadsheet to team | Dashboard updates in real time |
| Compare last week manually | Historical snapshots enable trend queries |
# Next Steps
For automating the Excel report output from this pipeline, see Automate Excel Reports with Python. To build a visual dashboard on top of the warehouse data, see How to Build a Data Dashboard Without Manual Excel Work.
If you are running a Shopify store specifically, see How to Automate Shopify Reports with Python for a deep dive on the Shopify API and Shopify Reporting API: Pull Sales and Inventory Data for advanced data extraction patterns.
For scheduling this pipeline to run automatically, see Schedule and Orchestrate Workflows with Prefect.
Ecommerce optimisation services include building custom reporting APIs and data pipelines tailored to your store.
Get in touch to discuss automating your ecommerce reporting.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.