How to Schedule and Orchestrate Multi-Step Workflows with Prefect

· 9 min read · Automation

Move beyond cron jobs — use Prefect to orchestrate data pipelines with dependency management, automatic retries, parallel execution, and a monitoring dashboard.

How to Schedule and Orchestrate Multi-Step Workflows with Prefect

Cron jobs work until they do not. You have five scripts that need to run in order. Script 3 depends on the output of scripts 1 and 2. Script 4 should retry twice if it fails. Script 5 should only run on weekdays. Managing this with cron means managing a web of dependencies in your head.

Workflow orchestrators solve this. They let you define task dependencies as code, handle retries, run tasks in parallel where possible, and give you a dashboard showing what ran, what failed, and why.

This guide uses Prefect — a Python-native orchestrator that works without a separate scheduler server. By the end, you will have a production workflow with dependencies, retries, scheduling, and monitoring.

Who This Is For

  • Data engineers managing multiple scripts with cron and losing track of which depends on which
  • Developers who want retry logic, parallel execution, and a visual dashboard without building it from scratch
  • Vibe coders whose multi-step automations have grown too complex to manage with simple schedulers
  • Teams that need visibility into what ran, what failed, and why — without digging through logs

Basic Python is the only prerequisite. If you have a script that runs on a schedule (or should), this guide shows you how to make it reliable and observable.

The Orchestration Architecture

Tasks A and B run in parallel (no dependency between them). Task C waits for both. The entire flow runs on a schedule with automatic retries and monitoring.

What You Will Need

pip install prefect pandas requests openpyxl
  • prefect — workflow orchestration framework
  • pandas — data processing
  • requests — HTTP client for API calls

Cron vs Prefect: Why Switch?

FeaturecronPrefect
Dependencies between tasksManual (chain scripts)Automatic (DAG)
Retry on failureCustom wrapper scriptsBuilt-in decorator
Parallel executionBackground processesConcurrent task runner
MonitoringParse log filesWeb dashboard
Alerting on failureCustom email scriptsBuilt-in notifications
Run historyLog filesQueryable database
Parameterised runsEnvironment variablesFlow parameters
Local developmentSame as productionRun flows directly

Step 1: Tasks — The Building Blocks

A Prefect task is a Python function decorated with @task. Each task is a single unit of work.

from prefect import task, flow
import pandas as pd
import requests
import os
import logging

logger = logging.getLogger("pipeline")


@task(retries=3, retry_delay_seconds=10, log_prints=True)
def extract_orders(api_url, api_key, days=30):
    """Fetch orders from API with automatic retry."""
    from datetime import datetime, timedelta

    params = {
        "since": (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d"),
    }
    response = requests.get(
        f"{api_url}/orders",
        headers={"Authorization": f"Bearer {api_key}"},
        params=params,
        timeout=30,
    )
    response.raise_for_status()

    data = response.json()["results"]
    df = pd.DataFrame(data)
    print(f"Extracted {len(df)} orders")
    return df


@task(retries=3, retry_delay_seconds=10, log_prints=True)
def extract_customers(api_url, api_key):
    """Fetch customer data from API."""
    response = requests.get(
        f"{api_url}/customers",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=30,
    )
    response.raise_for_status()

    data = response.json()["results"]
    df = pd.DataFrame(data)
    print(f"Extracted {len(df)} customers")
    return df


@task(log_prints=True)
def transform_data(orders_df, customers_df):
    """Clean and combine order and customer data."""
    # Standardise columns
    orders_df.columns = orders_df.columns.str.strip().str.lower().str.replace(" ", "_")
    customers_df.columns = customers_df.columns.str.strip().str.lower().str.replace(" ", "_")

    # Clean types
    orders_df["revenue"] = pd.to_numeric(orders_df["revenue"], errors="coerce")
    orders_df["created_at"] = pd.to_datetime(orders_df["created_at"], errors="coerce")

    # Deduplicate
    orders_df = orders_df.drop_duplicates(subset=["id"], keep="last")

    # Merge
    customers_subset = customers_df[["id", "name", "region"]].rename(
        columns={"id": "customer_id"}
    )
    combined = orders_df.merge(customers_subset, on="customer_id", how="left")

    print(f"Transformed: {len(combined)} rows, {len(combined.columns)} columns")
    return combined


@task(retries=2, retry_delay_seconds=5, log_prints=True)
def load_to_database(df, db_path, table_name):
    """Load DataFrame to SQLite database."""
    import sqlite3

    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists="replace", index=False)
    conn.close()
    print(f"Loaded {len(df)} rows to {db_path}:{table_name}")


@task(log_prints=True)
def generate_report(df):
    """Generate summary report from combined data."""
    from datetime import datetime

    by_region = (
        df.groupby("region")
        .agg(
            orders=("id", "count"),
            revenue=("revenue", "sum"),
            avg_order=("revenue", "mean"),
        )
        .reset_index()
        .round(2)
    )

    daily = (
        df.assign(date=df["created_at"].dt.date)
        .groupby("date")
        .agg(orders=("id", "count"), revenue=("revenue", "sum"))
        .reset_index()
    )

    print(f"Report: {len(by_region)} regions, {len(daily)} days")
    return {"by_region": by_region, "daily": daily}


@task(retries=2, retry_delay_seconds=30, log_prints=True)
def send_email_report(report, recipients):
    """Send report summary via email."""
    by_region = report["by_region"]
    total_revenue = by_region["revenue"].sum()
    total_orders = by_region["orders"].sum()

    subject = f"Daily Report: {total_orders} orders, ${total_revenue:,.2f} revenue"
    print(f"Email sent to {len(recipients)} recipients: {subject}")
    # In production: use smtplib or an email service API

Step 2: Flows — Wiring Tasks Together

A flow defines the execution order and dependencies between tasks.

@flow(name="daily-sales-report", log_prints=True)
def daily_sales_report(
    api_url: str = None,
    api_key: str = None,
    db_path: str = "reports.db",
    days: int = 30,
):
    """Complete daily sales reporting pipeline."""
    api_url = api_url or os.environ["API_URL"]
    api_key = api_key or os.environ["API_KEY"]

    # Extract — these run in parallel (no dependency between them)
    orders = extract_orders(api_url, api_key, days=days)
    customers = extract_customers(api_url, api_key)

    # Transform — waits for both extracts to complete
    combined = transform_data(orders, customers)

    # Load
    load_to_database(combined, db_path, "daily_orders")

    # Report
    report = generate_report(combined)

    # Deliver
    send_email_report(report, recipients=["[email protected]"])

    print(f"Pipeline completed: {len(combined)} records processed")
    return {"status": "success", "rows": len(combined)}

Running a Flow

# Run directly (development)
if __name__ == "__main__":
    result = daily_sales_report(days=7)
    print(result)
# From the command line
python pipeline.py

Step 3: Scheduling

Cron-Style Scheduling

# Deploy with a cron schedule — runs weekdays at 6am
daily_sales_report.serve(
    name="weekday-morning",
    schedules=[
        {"cron": "0 6 * * 1-5", "timezone": "Europe/Zurich"}
    ],
    parameters={"days": 30, "db_path": "reports.db"},
)

Interval Scheduling

# Run every 4 hours
daily_sales_report.serve(
    name="every-4-hours",
    schedules=[
        {"interval": 14400}  # 4 hours in seconds
    ],
)

Multiple Schedules

Prefect supports multiple schedules on a single deployment:

daily_sales_report.serve(
    name="multi-schedule",
    schedules=[
        {"cron": "0 6 * * 1-5", "timezone": "Europe/London"},  # weekday morning
        {"cron": "0 9 1 * *", "timezone": "Europe/London"},     # monthly on the 1st
    ],
)

Step 4: Parallel Execution

When tasks are independent, run them simultaneously:

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner


@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def multi_source_pipeline():
    """Pipeline that extracts from multiple sources in parallel."""
    api_key = os.environ["API_KEY"]
    api_url = os.environ["API_URL"]

    # These three tasks run in parallel
    orders_future = extract_orders.submit(api_url, api_key)
    customers_future = extract_customers.submit(api_url, api_key)
    products_future = extract_products.submit(api_url, api_key)

    # Wait for all to complete and get results
    orders = orders_future.result()
    customers = customers_future.result()
    products = products_future.result()

    # Sequential from here
    combined = merge_all_sources(orders, customers, products)
    load_to_database(combined, "warehouse.db", "combined")

    print(f"Parallel pipeline complete: {len(combined)} rows")

Step 5: Error Handling and Notifications

Task-Level Retry Configuration

@task(
    retries=3,                      # Retry up to 3 times
    retry_delay_seconds=[10, 30, 60],  # Increasing delays: 10s, 30s, 60s
    retry_jitter_factor=0.5,        # Add randomness to prevent thundering herd
    log_prints=True,
)
def fetch_external_data(url, headers):
    """Fetch with exponential backoff retry."""
    response = requests.get(url, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

Flow-Level Error Handling

from prefect import flow
from prefect.blocks.notifications import SlackWebhook


@flow(name="monitored-pipeline", log_prints=True)
def monitored_pipeline():
    """Pipeline with error handling and notifications."""
    try:
        orders = extract_orders(os.environ["API_URL"], os.environ["API_KEY"])
        combined = transform_data(orders, extract_customers(os.environ["API_URL"], os.environ["API_KEY"]))
        load_to_database(combined, "reports.db", "orders")
        report = generate_report(combined)

        notify_success(f"Pipeline complete: {len(combined)} rows processed")

    except Exception as e:
        notify_failure(f"Pipeline failed: {e}")
        raise


@task
def notify_success(message):
    """Send success notification to Slack."""
    webhook_url = os.environ.get("SLACK_WEBHOOK")
    if webhook_url:
        requests.post(webhook_url, json={"text": f"[OK] {message}"}, timeout=10)


@task
def notify_failure(message):
    """Send failure notification to Slack."""
    webhook_url = os.environ.get("SLACK_WEBHOOK")
    if webhook_url:
        requests.post(webhook_url, json={"text": f"[FAIL] {message}"}, timeout=10)

Step 6: Parameterised Flows

Make pipelines flexible by accepting parameters:

@flow(name="flexible-report", log_prints=True)
def flexible_report(
    source: str = "api",
    days: int = 30,
    region: str = None,
    output_format: str = "database",
):
    """Pipeline with configurable parameters."""
    api_key = os.environ["API_KEY"]
    api_url = os.environ["API_URL"]

    # Extract
    orders = extract_orders(api_url, api_key, days=days)

    # Optional: filter by region
    if region:
        orders = orders[orders["region"] == region]
        print(f"Filtered to {region}: {len(orders)} rows")

    # Transform
    customers = extract_customers(api_url, api_key)
    combined = transform_data(orders, customers)

    # Load based on format parameter
    if output_format == "database":
        load_to_database(combined, "reports.db", "orders")
    elif output_format == "excel":
        export_to_excel(combined, f"report_{days}d.xlsx")

    return {"rows": len(combined), "region": region, "days": days}

Running with Different Parameters

# Default: 30 days, all regions, database output
prefect deployment run "flexible-report/default"

# Custom: 7 days, North region, Excel output
prefect deployment run "flexible-report/default" \
    --param days=7 \
    --param region=North \
    --param output_format=excel

Step 7: Sub-Flows for Complex Pipelines

Break large pipelines into composable sub-flows:

@flow(name="extract-all", log_prints=True)
def extract_all_sources(api_url, api_key):
    """Sub-flow: extract from all sources."""
    orders = extract_orders(api_url, api_key)
    customers = extract_customers(api_url, api_key)
    return {"orders": orders, "customers": customers}


@flow(name="transform-and-load", log_prints=True)
def transform_and_load(data, db_path):
    """Sub-flow: transform and load."""
    combined = transform_data(data["orders"], data["customers"])
    load_to_database(combined, db_path, "combined")
    return combined


@flow(name="master-pipeline", log_prints=True)
def master_pipeline():
    """Master flow that orchestrates sub-flows."""
    api_url = os.environ["API_URL"]
    api_key = os.environ["API_KEY"]

    # Sub-flow 1: Extract
    data = extract_all_sources(api_url, api_key)

    # Sub-flow 2: Transform and load
    combined = transform_and_load(data, "reports.db")

    # Sub-flow 3: Report
    report = generate_report(combined)
    send_email_report(report, ["[email protected]"])

Monitoring Dashboard

Prefect provides a built-in UI for monitoring:

# Start the Prefect server (local)
prefect server start

# Open the dashboard
# http://localhost:4200

What the Dashboard Shows

ViewWhat you see
Flow runsEvery execution with status, duration, timestamps
Task runsIndividual task status within each flow
LogsFull log output from every task
SchedulesUpcoming scheduled runs
DeploymentsAll deployed flows with parameters
NotificationsConfigured alert channels

Migration from cron

Before (cron + scripts)

# crontab
0 6 * * 1-5 python extract_orders.py && python extract_customers.py && python transform.py && python load.py && python report.py

Problems: runs sequentially, no retry, no monitoring, failure in step 2 still runs steps 3-5.

After (Prefect)

@flow(name="daily-pipeline", log_prints=True)
def daily_pipeline():
    orders = extract_orders.submit(api_url, api_key)      # parallel
    customers = extract_customers.submit(api_url, api_key)  # parallel
    combined = transform_data(orders.result(), customers.result())  # waits
    load_to_database(combined, "reports.db", "orders")    # retries
    report = generate_report(combined)
    send_email_report(report, ["[email protected]"])       # retries

Improvements: parallel extraction, automatic retries, dependency management, monitoring dashboard, full log history.

What This Replaces

cron approachPrefect equivalent
Chain scripts with &&Task dependencies (DAG)
Custom retry wrapper@task(retries=3)
Sequential executionConcurrentTaskRunner
Parse log filesBuilt-in dashboard
Custom email on failureBuilt-in notifications
Environment-variable parametersType-safe flow parameters
No run historyQueryable run database

Next Steps

Getting an import error with prefect.server.schemas.schedules? That module was removed in Prefect 3. See the dedicated Prefect IntervalSchedule Migration Guide for side-by-side code comparisons and the exact fix.

Start by converting your most complex cron pipeline to a Prefect flow. The biggest immediate win is the monitoring dashboard — seeing run history, task durations, and failure details without parsing log files.

For building the pipeline logic that Prefect orchestrates, see How to Design Data Pipelines for Reliable Reporting. For testing those pipelines before deploying them, see Testing Data Pipelines: A Practical Guide with pytest. For packaging orchestrated flows into containers, see Containerizing Your Python Pipelines with Docker. For adding alerting when pipelines succeed or fail, see How to Build a Notification System That Actually Gets Read.

Automation services include designing and deploying orchestrated pipeline systems that replace fragile cron setups.

Get in touch to discuss orchestrating your data workflows.

prefect python workflow python workflow orchestration prefect data pipeline python task scheduler dag workflow python prefect flow tutorial orchestrate python pipelines prefect retry scheduling python pipeline monitoring prefect vs cron

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles