How to Schedule and Orchestrate Multi-Step Workflows with Prefect
Move beyond cron jobs — use Prefect to orchestrate data pipelines with dependency management, automatic retries, parallel execution, and a monitoring dashboard.
Cron jobs work until they do not. You have five scripts that need to run in order. Script 3 depends on the output of scripts 1 and 2. Script 4 should retry twice if it fails. Script 5 should only run on weekdays. Managing this with cron means managing a web of dependencies in your head.
Workflow orchestrators solve this. They let you define task dependencies as code, handle retries, run tasks in parallel where possible, and give you a dashboard showing what ran, what failed, and why.
This guide uses Prefect — a Python-native orchestrator that works without a separate scheduler server. By the end, you will have a production workflow with dependencies, retries, scheduling, and monitoring.
Who This Is For
- Data engineers managing multiple scripts with cron and losing track of which depends on which
- Developers who want retry logic, parallel execution, and a visual dashboard without building it from scratch
- Vibe coders whose multi-step automations have grown too complex to manage with simple schedulers
- Teams that need visibility into what ran, what failed, and why — without digging through logs
Basic Python is the only prerequisite. If you have a script that runs on a schedule (or should), this guide shows you how to make it reliable and observable.
The Orchestration Architecture
Tasks A and B run in parallel (no dependency between them). Task C waits for both. The entire flow runs on a schedule with automatic retries and monitoring.
What You Will Need
pip install prefect pandas requests openpyxl
- prefect — workflow orchestration framework
- pandas — data processing
- requests — HTTP client for API calls
Cron vs Prefect: Why Switch?
| Feature | cron | Prefect |
|---|---|---|
| Dependencies between tasks | Manual (chain scripts) | Automatic (DAG) |
| Retry on failure | Custom wrapper scripts | Built-in decorator |
| Parallel execution | Background processes | Concurrent task runner |
| Monitoring | Parse log files | Web dashboard |
| Alerting on failure | Custom email scripts | Built-in notifications |
| Run history | Log files | Queryable database |
| Parameterised runs | Environment variables | Flow parameters |
| Local development | Same as production | Run flows directly |
Step 1: Tasks — The Building Blocks
A Prefect task is a Python function decorated with @task. Each task is a single unit of work.
from prefect import task, flow
import pandas as pd
import requests
import os
import logging
logger = logging.getLogger("pipeline")
@task(retries=3, retry_delay_seconds=10, log_prints=True)
def extract_orders(api_url, api_key, days=30):
"""Fetch orders from API with automatic retry."""
from datetime import datetime, timedelta
params = {
"since": (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d"),
}
response = requests.get(
f"{api_url}/orders",
headers={"Authorization": f"Bearer {api_key}"},
params=params,
timeout=30,
)
response.raise_for_status()
data = response.json()["results"]
df = pd.DataFrame(data)
print(f"Extracted {len(df)} orders")
return df
@task(retries=3, retry_delay_seconds=10, log_prints=True)
def extract_customers(api_url, api_key):
"""Fetch customer data from API."""
response = requests.get(
f"{api_url}/customers",
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
response.raise_for_status()
data = response.json()["results"]
df = pd.DataFrame(data)
print(f"Extracted {len(df)} customers")
return df
@task(log_prints=True)
def transform_data(orders_df, customers_df):
"""Clean and combine order and customer data."""
# Standardise columns
orders_df.columns = orders_df.columns.str.strip().str.lower().str.replace(" ", "_")
customers_df.columns = customers_df.columns.str.strip().str.lower().str.replace(" ", "_")
# Clean types
orders_df["revenue"] = pd.to_numeric(orders_df["revenue"], errors="coerce")
orders_df["created_at"] = pd.to_datetime(orders_df["created_at"], errors="coerce")
# Deduplicate
orders_df = orders_df.drop_duplicates(subset=["id"], keep="last")
# Merge
customers_subset = customers_df[["id", "name", "region"]].rename(
columns={"id": "customer_id"}
)
combined = orders_df.merge(customers_subset, on="customer_id", how="left")
print(f"Transformed: {len(combined)} rows, {len(combined.columns)} columns")
return combined
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def load_to_database(df, db_path, table_name):
"""Load DataFrame to SQLite database."""
import sqlite3
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists="replace", index=False)
conn.close()
print(f"Loaded {len(df)} rows to {db_path}:{table_name}")
@task(log_prints=True)
def generate_report(df):
"""Generate summary report from combined data."""
from datetime import datetime
by_region = (
df.groupby("region")
.agg(
orders=("id", "count"),
revenue=("revenue", "sum"),
avg_order=("revenue", "mean"),
)
.reset_index()
.round(2)
)
daily = (
df.assign(date=df["created_at"].dt.date)
.groupby("date")
.agg(orders=("id", "count"), revenue=("revenue", "sum"))
.reset_index()
)
print(f"Report: {len(by_region)} regions, {len(daily)} days")
return {"by_region": by_region, "daily": daily}
@task(retries=2, retry_delay_seconds=30, log_prints=True)
def send_email_report(report, recipients):
"""Send report summary via email."""
by_region = report["by_region"]
total_revenue = by_region["revenue"].sum()
total_orders = by_region["orders"].sum()
subject = f"Daily Report: {total_orders} orders, ${total_revenue:,.2f} revenue"
print(f"Email sent to {len(recipients)} recipients: {subject}")
# In production: use smtplib or an email service API
Step 2: Flows — Wiring Tasks Together
A flow defines the execution order and dependencies between tasks.
@flow(name="daily-sales-report", log_prints=True)
def daily_sales_report(
api_url: str = None,
api_key: str = None,
db_path: str = "reports.db",
days: int = 30,
):
"""Complete daily sales reporting pipeline."""
api_url = api_url or os.environ["API_URL"]
api_key = api_key or os.environ["API_KEY"]
# Extract — these run in parallel (no dependency between them)
orders = extract_orders(api_url, api_key, days=days)
customers = extract_customers(api_url, api_key)
# Transform — waits for both extracts to complete
combined = transform_data(orders, customers)
# Load
load_to_database(combined, db_path, "daily_orders")
# Report
report = generate_report(combined)
# Deliver
send_email_report(report, recipients=["[email protected]"])
print(f"Pipeline completed: {len(combined)} records processed")
return {"status": "success", "rows": len(combined)}
Running a Flow
# Run directly (development)
if __name__ == "__main__":
result = daily_sales_report(days=7)
print(result)
# From the command line
python pipeline.py
Step 3: Scheduling
Cron-Style Scheduling
# Deploy with a cron schedule — runs weekdays at 6am
daily_sales_report.serve(
name="weekday-morning",
schedules=[
{"cron": "0 6 * * 1-5", "timezone": "Europe/Zurich"}
],
parameters={"days": 30, "db_path": "reports.db"},
)
Interval Scheduling
# Run every 4 hours
daily_sales_report.serve(
name="every-4-hours",
schedules=[
{"interval": 14400} # 4 hours in seconds
],
)
Multiple Schedules
Prefect supports multiple schedules on a single deployment:
daily_sales_report.serve(
name="multi-schedule",
schedules=[
{"cron": "0 6 * * 1-5", "timezone": "Europe/London"}, # weekday morning
{"cron": "0 9 1 * *", "timezone": "Europe/London"}, # monthly on the 1st
],
)
Step 4: Parallel Execution
When tasks are independent, run them simultaneously:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def multi_source_pipeline():
"""Pipeline that extracts from multiple sources in parallel."""
api_key = os.environ["API_KEY"]
api_url = os.environ["API_URL"]
# These three tasks run in parallel
orders_future = extract_orders.submit(api_url, api_key)
customers_future = extract_customers.submit(api_url, api_key)
products_future = extract_products.submit(api_url, api_key)
# Wait for all to complete and get results
orders = orders_future.result()
customers = customers_future.result()
products = products_future.result()
# Sequential from here
combined = merge_all_sources(orders, customers, products)
load_to_database(combined, "warehouse.db", "combined")
print(f"Parallel pipeline complete: {len(combined)} rows")
Step 5: Error Handling and Notifications
Task-Level Retry Configuration
@task(
retries=3, # Retry up to 3 times
retry_delay_seconds=[10, 30, 60], # Increasing delays: 10s, 30s, 60s
retry_jitter_factor=0.5, # Add randomness to prevent thundering herd
log_prints=True,
)
def fetch_external_data(url, headers):
"""Fetch with exponential backoff retry."""
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
Flow-Level Error Handling
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow(name="monitored-pipeline", log_prints=True)
def monitored_pipeline():
"""Pipeline with error handling and notifications."""
try:
orders = extract_orders(os.environ["API_URL"], os.environ["API_KEY"])
combined = transform_data(orders, extract_customers(os.environ["API_URL"], os.environ["API_KEY"]))
load_to_database(combined, "reports.db", "orders")
report = generate_report(combined)
notify_success(f"Pipeline complete: {len(combined)} rows processed")
except Exception as e:
notify_failure(f"Pipeline failed: {e}")
raise
@task
def notify_success(message):
"""Send success notification to Slack."""
webhook_url = os.environ.get("SLACK_WEBHOOK")
if webhook_url:
requests.post(webhook_url, json={"text": f"[OK] {message}"}, timeout=10)
@task
def notify_failure(message):
"""Send failure notification to Slack."""
webhook_url = os.environ.get("SLACK_WEBHOOK")
if webhook_url:
requests.post(webhook_url, json={"text": f"[FAIL] {message}"}, timeout=10)
Step 6: Parameterised Flows
Make pipelines flexible by accepting parameters:
@flow(name="flexible-report", log_prints=True)
def flexible_report(
source: str = "api",
days: int = 30,
region: str = None,
output_format: str = "database",
):
"""Pipeline with configurable parameters."""
api_key = os.environ["API_KEY"]
api_url = os.environ["API_URL"]
# Extract
orders = extract_orders(api_url, api_key, days=days)
# Optional: filter by region
if region:
orders = orders[orders["region"] == region]
print(f"Filtered to {region}: {len(orders)} rows")
# Transform
customers = extract_customers(api_url, api_key)
combined = transform_data(orders, customers)
# Load based on format parameter
if output_format == "database":
load_to_database(combined, "reports.db", "orders")
elif output_format == "excel":
export_to_excel(combined, f"report_{days}d.xlsx")
return {"rows": len(combined), "region": region, "days": days}
Running with Different Parameters
# Default: 30 days, all regions, database output
prefect deployment run "flexible-report/default"
# Custom: 7 days, North region, Excel output
prefect deployment run "flexible-report/default" \
--param days=7 \
--param region=North \
--param output_format=excel
Step 7: Sub-Flows for Complex Pipelines
Break large pipelines into composable sub-flows:
@flow(name="extract-all", log_prints=True)
def extract_all_sources(api_url, api_key):
"""Sub-flow: extract from all sources."""
orders = extract_orders(api_url, api_key)
customers = extract_customers(api_url, api_key)
return {"orders": orders, "customers": customers}
@flow(name="transform-and-load", log_prints=True)
def transform_and_load(data, db_path):
"""Sub-flow: transform and load."""
combined = transform_data(data["orders"], data["customers"])
load_to_database(combined, db_path, "combined")
return combined
@flow(name="master-pipeline", log_prints=True)
def master_pipeline():
"""Master flow that orchestrates sub-flows."""
api_url = os.environ["API_URL"]
api_key = os.environ["API_KEY"]
# Sub-flow 1: Extract
data = extract_all_sources(api_url, api_key)
# Sub-flow 2: Transform and load
combined = transform_and_load(data, "reports.db")
# Sub-flow 3: Report
report = generate_report(combined)
send_email_report(report, ["[email protected]"])
Monitoring Dashboard
Prefect provides a built-in UI for monitoring:
# Start the Prefect server (local)
prefect server start
# Open the dashboard
# http://localhost:4200
What the Dashboard Shows
| View | What you see |
|---|---|
| Flow runs | Every execution with status, duration, timestamps |
| Task runs | Individual task status within each flow |
| Logs | Full log output from every task |
| Schedules | Upcoming scheduled runs |
| Deployments | All deployed flows with parameters |
| Notifications | Configured alert channels |
Migration from cron
Before (cron + scripts)
# crontab
0 6 * * 1-5 python extract_orders.py && python extract_customers.py && python transform.py && python load.py && python report.py
Problems: runs sequentially, no retry, no monitoring, failure in step 2 still runs steps 3-5.
After (Prefect)
@flow(name="daily-pipeline", log_prints=True)
def daily_pipeline():
orders = extract_orders.submit(api_url, api_key) # parallel
customers = extract_customers.submit(api_url, api_key) # parallel
combined = transform_data(orders.result(), customers.result()) # waits
load_to_database(combined, "reports.db", "orders") # retries
report = generate_report(combined)
send_email_report(report, ["[email protected]"]) # retries
Improvements: parallel extraction, automatic retries, dependency management, monitoring dashboard, full log history.
What This Replaces
| cron approach | Prefect equivalent |
|---|---|
Chain scripts with && | Task dependencies (DAG) |
| Custom retry wrapper | @task(retries=3) |
| Sequential execution | ConcurrentTaskRunner |
| Parse log files | Built-in dashboard |
| Custom email on failure | Built-in notifications |
| Environment-variable parameters | Type-safe flow parameters |
| No run history | Queryable run database |
Next Steps
Getting an import error with
prefect.server.schemas.schedules? That module was removed in Prefect 3. See the dedicated Prefect IntervalSchedule Migration Guide for side-by-side code comparisons and the exact fix.
Start by converting your most complex cron pipeline to a Prefect flow. The biggest immediate win is the monitoring dashboard — seeing run history, task durations, and failure details without parsing log files.
For building the pipeline logic that Prefect orchestrates, see How to Design Data Pipelines for Reliable Reporting. For testing those pipelines before deploying them, see Testing Data Pipelines: A Practical Guide with pytest. For packaging orchestrated flows into containers, see Containerizing Your Python Pipelines with Docker. For adding alerting when pipelines succeed or fail, see How to Build a Notification System That Actually Gets Read.
Automation services include designing and deploying orchestrated pipeline systems that replace fragile cron setups.
Get in touch to discuss orchestrating your data workflows.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.
Get in touch