How to Automate Data Workflows Using APIs and Python
Build automated data pipelines that fetch from APIs, clean and transform data, combine multiple sources, and export results — with scheduling for hands-free operation.
Most teams pull data from APIs the same way: log in, click export, open in Excel, filter, paste somewhere else. Every day.
That entire process — fetch, clean, combine, export — is a pipeline. And pipelines should run themselves.
This guide builds a complete API-to-output data pipeline in Python. By the end, you will have a working system that fetches data from multiple APIs, transforms it, and produces clean output on a schedule.
# The Pipeline Architecture
API 1 (JSON) ──┐
├── Fetch ── Validate ── Clean ── Combine ── Transform ── Output
API 2 (JSON) ──┘ │
┌──────┴──────┐
Excel Database
└──────┬──────┘
Schedule
(cron / Task Scheduler)
Every stage is a function. Every function does one thing. The pipeline is the composition.
# What You Will Need
pip install requests pandas openpyxl
- requests — HTTP client for API calls
- pandas — data manipulation and aggregation
- openpyxl — Excel output with formatting
# Step 1: Fetch Data from an API
Start with a single API endpoint. The pattern is always the same — make the request, validate the response, extract the data:
import requests
from datetime import datetime, timedelta
def fetch_api_data(base_url, endpoint, api_key, params=None):
"""Fetch data from a REST API with error handling."""
url = f"{base_url}/{endpoint}"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
print(f"Fetched {len(data.get('results', []))} records from {endpoint}")
return data["results"]
# Handling Pagination
Most APIs return paginated results. Handle that automatically:
def fetch_all_pages(base_url, endpoint, api_key, params=None):
"""Fetch all pages from a paginated API."""
all_results = []
page = 1
params = params or {}
while True:
params["page"] = page
response = requests.get(
f"{base_url}/{endpoint}",
headers={"Authorization": f"Bearer {api_key}"},
params=params,
timeout=30,
)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
if not results:
break
all_results.extend(results)
page += 1
if page > data.get("total_pages", page):
break
print(f"Fetched {len(all_results)} total records from {endpoint}")
return all_results
# Step 2: Validate and Clean
Raw API data is rarely ready to use. Common issues:
- Missing fields
- Inconsistent date formats
- Null values where you expect numbers
- Nested JSON structures
import pandas as pd
def clean_api_data(raw_records):
"""Convert raw API records to a clean DataFrame."""
df = pd.DataFrame(raw_records)
# Flatten nested fields
if "metadata" in df.columns:
meta = pd.json_normalize(df["metadata"])
df = pd.concat([df.drop(columns=["metadata"]), meta], axis=1)
# Standardise dates
for col in ["created_at", "updated_at"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
# Drop records with missing critical fields
required = ["id", "created_at", "amount"]
existing_required = [c for c in required if c in df.columns]
df = df.dropna(subset=existing_required)
# Remove duplicates
if "id" in df.columns:
df = df.drop_duplicates(subset=["id"], keep="last")
return df
# Step 3: Combine Multiple Sources
The real power is combining data from different APIs into a unified view:
def build_combined_dataset(api_key):
"""Fetch and combine data from multiple API endpoints."""
base_url = "https://api.example.com/v1"
# Source 1: Orders
orders_raw = fetch_api_data(base_url, "orders", api_key, {
"since": (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d"),
})
orders = clean_api_data(orders_raw)
# Source 2: Customers
customers_raw = fetch_api_data(base_url, "customers", api_key)
customers = clean_api_data(customers_raw)
customers = customers[["id", "name", "region", "segment"]].rename(
columns={"id": "customer_id"}
)
# Source 3: Products
products_raw = fetch_api_data(base_url, "products", api_key)
products = clean_api_data(products_raw)
products = products[["id", "name", "category"]].rename(
columns={"id": "product_id", "name": "product_name"}
)
# Combine
combined = orders.merge(customers, on="customer_id", how="left")
combined = combined.merge(products, on="product_id", how="left")
print(f"Combined dataset: {len(combined)} rows, {len(combined.columns)} columns")
return combined
# What This Creates
| order_id | date | customer | region | product | category | amount |
|---|---|---|---|---|---|---|
| 1001 | 2026-03-15 | Acme Corp | North | Widget Pro | Hardware | 4500.00 |
| 1002 | 2026-03-16 | Beta Ltd | South | Data Suite | Software | 2200.00 |
Three API calls. One unified dataset. Zero manual work.
# Step 4: Transform and Aggregate
With clean, combined data, build the summaries your team actually needs:
def generate_summaries(df):
"""Generate multiple summary views from combined data."""
# By region
by_region = (
df.groupby("region")
.agg(
orders=("order_id", "count"),
revenue=("amount", "sum"),
avg_order=("amount", "mean"),
customers=("customer_id", "nunique"),
)
.reset_index()
.sort_values("revenue", ascending=False)
)
# By category
by_category = (
df.groupby("category")
.agg(
orders=("order_id", "count"),
revenue=("amount", "sum"),
avg_order=("amount", "mean"),
)
.reset_index()
.sort_values("revenue", ascending=False)
)
# Daily trend
df["date"] = df["created_at"].dt.date
daily = (
df.groupby("date")
.agg(orders=("order_id", "count"), revenue=("amount", "sum"))
.reset_index()
.sort_values("date")
)
return {
"by_region": by_region,
"by_category": by_category,
"daily_trend": daily,
}
# Step 5: Export Results
# To Excel (Formatted)
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
def export_to_excel(summaries, output_path):
"""Export all summaries to a formatted Excel workbook."""
wb = Workbook()
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
for idx, (name, df) in enumerate(summaries.items()):
ws = wb.active if idx == 0 else wb.create_sheet()
ws.title = name.replace("_", " ").title()
# Headers
for col, header in enumerate(df.columns, 1):
cell = ws.cell(row=1, column=col, value=header.replace("_", " ").title())
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
# Data
for r, row in enumerate(df.itertuples(index=False), 2):
for c, value in enumerate(row, 1):
ws.cell(row=r, column=c, value=value)
# Auto-fit columns
for col in ws.columns:
max_len = max(len(str(c.value or "")) for c in col)
ws.column_dimensions[col[0].column_letter].width = max_len + 3
wb.save(output_path)
print(f"Excel report saved: {output_path}")
# To a Database
import sqlite3
def export_to_database(df, db_path, table_name):
"""Append data to a SQLite database for historical tracking."""
conn = sqlite3.connect(db_path)
df["pipeline_run"] = datetime.now().isoformat()
df.to_sql(table_name, conn, if_exists="append", index=False)
conn.close()
print(f"Inserted {len(df)} rows into {table_name}")
# Step 6: Wire It Together
The complete pipeline — one function call runs everything:
import os
def run_pipeline():
"""Execute the full API-to-report pipeline."""
api_key = os.environ["API_KEY"]
timestamp = datetime.now().strftime("%Y%m%d")
print(f"Pipeline started: {datetime.now().isoformat()}")
# Fetch and combine
combined = build_combined_dataset(api_key)
# Summarise
summaries = generate_summaries(combined)
# Export
export_to_excel(summaries, f"api_report_{timestamp}.xlsx")
export_to_database(combined, "pipeline_data.db", "api_orders")
print(f"Pipeline complete: {len(combined)} records processed")
if __name__ == "__main__":
run_pipeline()
# Step 7: Schedule It
# Linux / macOS (cron)
# Run the API pipeline every day at 6:00 AM
0 6 * * * API_KEY=your-key /usr/bin/python3 /path/to/pipeline.py >> /var/log/pipeline.log 2>&1
# Windows (Task Scheduler)
$action = New-ScheduledTaskAction -Execute "python" -Argument "C:\pipelines\pipeline.py"
$trigger = New-ScheduledTaskTrigger -Daily -At 6am
Register-ScheduledTask -Action $action -Trigger $trigger -TaskName "APIPipeline"
# Error Handling for Production
Real pipelines need to handle failures gracefully:
import logging
logging.basicConfig(
filename="pipeline.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
def run_pipeline_safe():
"""Pipeline with error handling and logging."""
try:
logging.info("Pipeline started")
run_pipeline()
logging.info("Pipeline completed successfully")
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {e}")
except pd.errors.EmptyDataError as e:
logging.error(f"No data returned: {e}")
except Exception as e:
logging.error(f"Pipeline failed: {e}")
raise
# Common API Patterns
| API type | Library | Notes |
|---|---|---|
| REST (JSON) | requests |
Most common. Handle pagination and rate limits |
| GraphQL | requests with POST |
Send query in request body |
| CSV export | requests + pandas |
Download file, parse with pd.read_csv() |
| Database | sqlite3, psycopg2 |
Direct SQL queries |
| Webhook/streaming | flask |
Receive data pushed to your endpoint |
# What This Replaces
| Manual process | Automated equivalent |
|---|---|
| Log into dashboard, click export | requests.get() with auth |
| Open CSV in Excel, filter rows | pd.DataFrame() + filtering |
| Copy between spreadsheets | pd.merge() on shared keys |
| Manually update tracker | df.to_sql() or df.to_excel() |
| Email report to stakeholders | smtplib attachment |
| Remember to do it tomorrow | cron or Task Scheduler |
# Next Steps
This pipeline handles the most common API automation scenario. Production systems typically add:
- Rate limiting — respect API quotas with
time.sleep()between requests - Retry logic — exponential backoff for transient failures
- Data validation — schema checks before processing
- Alerting — notifications when pipelines fail or data quality drops
- Incremental loading — only fetch new records since last run
If your team manually pulls data from APIs and pastes it into spreadsheets, automation services can replace the entire workflow. The pipeline above is a starting point — real systems handle edge cases, failures, and scaling.
For more on Excel-specific automation, see Automate Excel Reports with Python. For a broader overview of automation workflows, see Python Automation: Real Workflows.
Get in touch to discuss building automated data pipelines for your team.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.