How to Automate Data Workflows Using APIs and Python

· 7 min read · Automation

Build automated data pipelines that fetch from APIs, clean and transform data, combine multiple sources, and export results — with scheduling for hands-free operation.

How to Automate Data Workflows Using APIs and Python

Most teams pull data from APIs the same way: log in, click export, open in Excel, filter, paste somewhere else. Every day.

That entire process — fetch, clean, combine, export — is a pipeline. And pipelines should run themselves.

This guide builds a complete API-to-output data pipeline in Python. By the end, you will have a working system that fetches data from multiple APIs, transforms it, and produces clean output on a schedule.

The Pipeline Architecture

Every stage is a function. Every function does one thing. The pipeline is the composition.

What You Will Need

pip install requests pandas openpyxl
  • requests — HTTP client for API calls
  • pandas — data manipulation and aggregation
  • openpyxl — Excel output with formatting

Step 1: Fetch Data from an API

Start with a single API endpoint. The pattern is always the same — make the request, validate the response, extract the data:

import requests
from datetime import datetime, timedelta

def fetch_api_data(base_url, endpoint, api_key, params=None):
    """Fetch data from a REST API with error handling."""
    url = f"{base_url}/{endpoint}"
    headers = {"Authorization": f"Bearer {api_key}"}

    response = requests.get(url, headers=headers, params=params, timeout=30)
    response.raise_for_status()

    data = response.json()
    print(f"Fetched {len(data.get('results', []))} records from {endpoint}")
    return data["results"]

Handling Pagination

Most APIs return paginated results. Handle that automatically:

def fetch_all_pages(base_url, endpoint, api_key, params=None):
    """Fetch all pages from a paginated API."""
    all_results = []
    page = 1
    params = params or {}

    while True:
        params["page"] = page
        response = requests.get(
            f"{base_url}/{endpoint}",
            headers={"Authorization": f"Bearer {api_key}"},
            params=params,
            timeout=30,
        )
        response.raise_for_status()
        data = response.json()

        results = data.get("results", [])
        if not results:
            break

        all_results.extend(results)
        page += 1

        if page > data.get("total_pages", page):
            break

    print(f"Fetched {len(all_results)} total records from {endpoint}")
    return all_results

Step 2: Validate and Clean

Raw API data is rarely ready to use. Common issues:

  • Missing fields
  • Inconsistent date formats
  • Null values where you expect numbers
  • Nested JSON structures
import pandas as pd

def clean_api_data(raw_records):
    """Convert raw API records to a clean DataFrame."""
    df = pd.DataFrame(raw_records)

    # Flatten nested fields
    if "metadata" in df.columns:
        meta = pd.json_normalize(df["metadata"])
        df = pd.concat([df.drop(columns=["metadata"]), meta], axis=1)

    # Standardise dates
    for col in ["created_at", "updated_at"]:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    # Drop records with missing critical fields
    required = ["id", "created_at", "amount"]
    existing_required = [c for c in required if c in df.columns]
    df = df.dropna(subset=existing_required)

    # Remove duplicates
    if "id" in df.columns:
        df = df.drop_duplicates(subset=["id"], keep="last")

    return df

Step 3: Combine Multiple Sources

The real power is combining data from different APIs into a unified view:

def build_combined_dataset(api_key):
    """Fetch and combine data from multiple API endpoints."""
    base_url = "https://api.example.com/v1"

    # Source 1: Orders
    orders_raw = fetch_api_data(base_url, "orders", api_key, {
        "since": (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d"),
    })
    orders = clean_api_data(orders_raw)

    # Source 2: Customers
    customers_raw = fetch_api_data(base_url, "customers", api_key)
    customers = clean_api_data(customers_raw)
    customers = customers[["id", "name", "region", "segment"]].rename(
        columns={"id": "customer_id"}
    )

    # Source 3: Products
    products_raw = fetch_api_data(base_url, "products", api_key)
    products = clean_api_data(products_raw)
    products = products[["id", "name", "category"]].rename(
        columns={"id": "product_id", "name": "product_name"}
    )

    # Combine
    combined = orders.merge(customers, on="customer_id", how="left")
    combined = combined.merge(products, on="product_id", how="left")

    print(f"Combined dataset: {len(combined)} rows, {len(combined.columns)} columns")
    return combined

What This Creates

order_iddatecustomerregionproductcategoryamount
10012026-03-15Acme CorpNorthWidget ProHardware4500.00
10022026-03-16Beta LtdSouthData SuiteSoftware2200.00

Three API calls. One unified dataset. Zero manual work.

Step 4: Transform and Aggregate

With clean, combined data, build the summaries your team actually needs:

def generate_summaries(df):
    """Generate multiple summary views from combined data."""

    # By region
    by_region = (
        df.groupby("region")
        .agg(
            orders=("order_id", "count"),
            revenue=("amount", "sum"),
            avg_order=("amount", "mean"),
            customers=("customer_id", "nunique"),
        )
        .reset_index()
        .sort_values("revenue", ascending=False)
    )

    # By category
    by_category = (
        df.groupby("category")
        .agg(
            orders=("order_id", "count"),
            revenue=("amount", "sum"),
            avg_order=("amount", "mean"),
        )
        .reset_index()
        .sort_values("revenue", ascending=False)
    )

    # Daily trend
    df["date"] = df["created_at"].dt.date
    daily = (
        df.groupby("date")
        .agg(orders=("order_id", "count"), revenue=("amount", "sum"))
        .reset_index()
        .sort_values("date")
    )

    return {
        "by_region": by_region,
        "by_category": by_category,
        "daily_trend": daily,
    }

Step 5: Export Results

To Excel (Formatted)

from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment

def export_to_excel(summaries, output_path):
    """Export all summaries to a formatted Excel workbook."""
    wb = Workbook()
    header_font = Font(bold=True, color="FFFFFF")
    header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")

    for idx, (name, df) in enumerate(summaries.items()):
        ws = wb.active if idx == 0 else wb.create_sheet()
        ws.title = name.replace("_", " ").title()

        # Headers
        for col, header in enumerate(df.columns, 1):
            cell = ws.cell(row=1, column=col, value=header.replace("_", " ").title())
            cell.font = header_font
            cell.fill = header_fill
            cell.alignment = Alignment(horizontal="center")

        # Data
        for r, row in enumerate(df.itertuples(index=False), 2):
            for c, value in enumerate(row, 1):
                ws.cell(row=r, column=c, value=value)

        # Auto-fit columns
        for col in ws.columns:
            max_len = max(len(str(c.value or "")) for c in col)
            ws.column_dimensions[col[0].column_letter].width = max_len + 3

    wb.save(output_path)
    print(f"Excel report saved: {output_path}")

To a Database

import sqlite3

def export_to_database(df, db_path, table_name):
    """Append data to a SQLite database for historical tracking."""
    conn = sqlite3.connect(db_path)
    df["pipeline_run"] = datetime.now().isoformat()
    df.to_sql(table_name, conn, if_exists="append", index=False)
    conn.close()
    print(f"Inserted {len(df)} rows into {table_name}")

Step 6: Wire It Together

The complete pipeline — one function call runs everything:

import os

def run_pipeline():
    """Execute the full API-to-report pipeline."""
    api_key = os.environ["API_KEY"]
    timestamp = datetime.now().strftime("%Y%m%d")

    print(f"Pipeline started: {datetime.now().isoformat()}")

    # Fetch and combine
    combined = build_combined_dataset(api_key)

    # Summarise
    summaries = generate_summaries(combined)

    # Export
    export_to_excel(summaries, f"api_report_{timestamp}.xlsx")
    export_to_database(combined, "pipeline_data.db", "api_orders")

    print(f"Pipeline complete: {len(combined)} records processed")

if __name__ == "__main__":
    run_pipeline()

Step 7: Schedule It

Linux / macOS (cron)

# Run the API pipeline every day at 6:00 AM
0 6 * * * API_KEY=your-key /usr/bin/python3 /path/to/pipeline.py >> /var/log/pipeline.log 2>&1

Windows (Task Scheduler)

$action = New-ScheduledTaskAction -Execute "python" -Argument "C:\pipelines\pipeline.py"
$trigger = New-ScheduledTaskTrigger -Daily -At 6am
Register-ScheduledTask -Action $action -Trigger $trigger -TaskName "APIPipeline"

Error Handling for Production

Real pipelines need to handle failures gracefully:

import logging

logging.basicConfig(
    filename="pipeline.log",
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)

def run_pipeline_safe():
    """Pipeline with error handling and logging."""
    try:
        logging.info("Pipeline started")
        run_pipeline()
        logging.info("Pipeline completed successfully")
    except requests.exceptions.RequestException as e:
        logging.error(f"API request failed: {e}")
    except pd.errors.EmptyDataError as e:
        logging.error(f"No data returned: {e}")
    except Exception as e:
        logging.error(f"Pipeline failed: {e}")
        raise

Common API Patterns

API typeLibraryNotes
REST (JSON)requestsMost common. Handle pagination and rate limits
GraphQLrequests with POSTSend query in request body
CSV exportrequests + pandasDownload file, parse with pd.read_csv()
Databasesqlite3, psycopg2Direct SQL queries
Webhook/streamingflaskReceive data pushed to your endpoint

What This Replaces

Manual processAutomated equivalent
Log into dashboard, click exportrequests.get() with auth
Open CSV in Excel, filter rowspd.DataFrame() + filtering
Copy between spreadsheetspd.merge() on shared keys
Manually update trackerdf.to_sql() or df.to_excel()
Email report to stakeholderssmtplib attachment
Remember to do it tomorrowcron or Task Scheduler

Next Steps

This pipeline handles the most common API automation scenario. Production systems typically add:

  • Rate limiting — respect API quotas with time.sleep() between requests
  • Retry logic — exponential backoff for transient failures
  • Data validation — schema checks before processing
  • Alerting — notifications when pipelines fail or data quality drops
  • Incremental loading — only fetch new records since last run

If your team manually pulls data from APIs and pastes it into spreadsheets, automation services can replace the entire workflow. The pipeline above is a starting point — real systems handle edge cases, failures, and scaling.

For more on Excel-specific automation, see Automate Excel Reports with Python. For a broader overview of automation workflows, see Python Automation: Real Workflows. For securing the API keys and tokens your pipelines depend on, see Python Secrets Management for Automation Pipelines. For high-volume data fetching, async Python can significantly reduce total fetch time by making multiple requests concurrently.

Get in touch to discuss building automated data pipelines for your team.

automate data workflows python python api automation api data pipeline python fetch data from api python python data pipeline automate api calls python combine multiple apis python scheduled data pipeline python etl pipeline api to excel python

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles