Prefect Reusable Task Blocks: Build Composable Workflow Components
Build reusable task blocks in Prefect that snap together into different workflows — shared extractors, validators, loaders, and notification tasks that eliminate duplication across pipelines.
Every Prefect workflow starts simple: extract data, transform it, load it somewhere. Then you build a second workflow that extracts data from a different source but loads it the same way. Then a third. By the fifth pipeline, you have five copies of the same database loader, three variations of the same email notification task, and no consistency between them.
The fix is reusable task blocks — self-contained units of work that snap together into any workflow. Build the extraction logic once. Build the validation once. Build the loader once. Then compose workflows by connecting these blocks like building bricks.
Who This Is For
- Data engineers maintaining multiple Prefect workflows with duplicated task logic
- Developers who want to build a shared task library for their team
- Vibe coders designing workflows that are modular enough to rearrange without rewriting
- Teams standardising how data moves through their pipelines
This guide assumes you know the basics of Prefect flows and tasks. If you are starting from scratch, see Schedule and Orchestrate Workflows with Prefect first.
The Composable Architecture
Same validation block. Same transform block. Same notification block. Different sources and destinations.
What You Will Need
pip install prefect pandas requests sqlalchemy
Step 1: Build Extraction Blocks
Each extractor handles one source type. They all return the same output format: a pandas DataFrame.
from prefect import task
import pandas as pd
import requests
from sqlalchemy import create_engine
@task(retries=3, retry_delay_seconds=30, name="extract-api")
def extract_from_api(url: str, headers: dict = None, params: dict = None) -> pd.DataFrame:
"""Extract data from a REST API endpoint."""
response = requests.get(url, headers=headers or {}, params=params or {})
response.raise_for_status()
data = response.json()
# Handle common API response patterns
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, dict):
# Look for a results key
for key in ["results", "data", "items", "records"]:
if key in data:
df = pd.DataFrame(data[key])
break
else:
df = pd.DataFrame([data])
else:
raise ValueError(f"Unexpected response type: {type(data)}")
print(f"Extracted {len(df)} rows from {url}")
return df
@task(retries=2, retry_delay_seconds=10, name="extract-database")
def extract_from_database(connection_string: str, query: str) -> pd.DataFrame:
"""Extract data from a SQL database."""
engine = create_engine(connection_string)
df = pd.read_sql(query, engine)
print(f"Extracted {len(df)} rows from database")
return df
@task(name="extract-file")
def extract_from_file(filepath: str, file_type: str = "csv") -> pd.DataFrame:
"""Extract data from a local file."""
readers = {
"csv": pd.read_csv,
"excel": pd.read_excel,
"json": pd.read_json,
"parquet": pd.read_parquet,
}
reader = readers.get(file_type)
if not reader:
raise ValueError(f"Unsupported file type: {file_type}")
df = reader(filepath)
print(f"Extracted {len(df)} rows from {filepath}")
return df
Step 2: Build Validation Blocks
Validation catches bad data before it reaches your warehouse. These blocks raise on failure and log warnings on anomalies.
@task(name="validate-schema")
def validate_schema(df: pd.DataFrame, required_columns: list[str], type_checks: dict = None) -> pd.DataFrame:
"""Validate DataFrame has required columns and correct types."""
# Check required columns exist
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Check types if specified
type_checks = type_checks or {}
for col, expected_type in type_checks.items():
if col in df.columns:
actual_type = df[col].dtype
if not pd.api.types.is_dtype_equal(actual_type, expected_type):
print(f"Warning: {col} is {actual_type}, expected {expected_type}")
print(f"Schema validation passed: {len(df)} rows, {len(df.columns)} columns")
return df
@task(name="validate-completeness")
def validate_completeness(df: pd.DataFrame, max_null_pct: float = 0.1) -> pd.DataFrame:
"""Check that no column exceeds the null percentage threshold."""
null_pcts = df.isnull().mean()
violations = null_pcts[null_pcts > max_null_pct]
if len(violations) > 0:
for col, pct in violations.items():
print(f"Warning: {col} is {pct:.1%} null (threshold: {max_null_pct:.1%})")
raise ValueError(f"{len(violations)} columns exceed null threshold")
print(f"Completeness check passed (max null: {null_pcts.max():.1%})")
return df
@task(name="validate-freshness")
def validate_freshness(df: pd.DataFrame, date_column: str, max_age_hours: int = 48) -> pd.DataFrame:
"""Verify the data is not stale."""
from datetime import datetime, timedelta
df[date_column] = pd.to_datetime(df[date_column])
latest = df[date_column].max()
cutoff = datetime.now() - timedelta(hours=max_age_hours)
if latest < cutoff:
raise ValueError(f"Data is stale: latest record is {latest}, cutoff is {cutoff}")
print(f"Freshness check passed: latest record {latest}")
return df
Step 3: Build Transform Blocks
Transforms clean and reshape data. Each one does one thing and returns a DataFrame.
@task(name="clean-strings")
def clean_string_columns(df: pd.DataFrame, columns: list[str] = None) -> pd.DataFrame:
"""Strip whitespace and normalise string columns."""
cols = columns or df.select_dtypes(include=["object"]).columns.tolist()
for col in cols:
if col in df.columns:
df[col] = df[col].str.strip().str.lower()
print(f"Cleaned {len(cols)} string columns")
return df
@task(name="deduplicate")
def deduplicate(df: pd.DataFrame, subset: list[str] = None, keep: str = "last") -> pd.DataFrame:
"""Remove duplicate rows."""
before = len(df)
df = df.drop_duplicates(subset=subset, keep=keep)
removed = before - len(df)
print(f"Removed {removed} duplicates ({len(df)} rows remaining)")
return df
@task(name="aggregate-metrics")
def aggregate_metrics(df: pd.DataFrame, group_by: list[str], metrics: dict) -> pd.DataFrame:
"""Aggregate data by specified dimensions and metrics."""
result = df.groupby(group_by).agg(**metrics).reset_index()
print(f"Aggregated to {len(result)} rows by {group_by}")
return result
Step 4: Build Loader Blocks
Loaders write data to destinations. Each one handles connection management and error reporting.
@task(retries=2, retry_delay_seconds=15, name="load-database")
def load_to_database(df: pd.DataFrame, connection_string: str, table_name: str, if_exists: str = "append") -> int:
"""Load DataFrame into a SQL database table."""
engine = create_engine(connection_string)
df.to_sql(table_name, engine, if_exists=if_exists, index=False)
print(f"Loaded {len(df)} rows into {table_name}")
return len(df)
@task(name="load-file")
def load_to_file(df: pd.DataFrame, filepath: str, file_type: str = "csv") -> str:
"""Export DataFrame to a file."""
writers = {
"csv": lambda: df.to_csv(filepath, index=False),
"excel": lambda: df.to_excel(filepath, index=False),
"json": lambda: df.to_json(filepath, orient="records", indent=2),
"parquet": lambda: df.to_parquet(filepath, index=False),
}
writer = writers.get(file_type)
if not writer:
raise ValueError(f"Unsupported file type: {file_type}")
writer()
print(f"Exported {len(df)} rows to {filepath}")
return filepath
Step 5: Build Notification Blocks
Every pipeline should report its outcome. Build notification tasks that work across all workflows.
import os
import json
@task(name="notify-slack")
def notify_slack(message: str, channel: str = None, status: str = "info"):
"""Send a notification to Slack via webhook."""
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if not webhook_url:
print("Slack webhook not configured, skipping notification")
return
color_map = {"info": "#04d9ff", "success": "#22c55e", "error": "#ef4444", "warning": "#f59e0b"}
payload = {
"channel": channel,
"attachments": [{
"color": color_map.get(status, "#04d9ff"),
"text": message,
}],
}
requests.post(webhook_url, json=payload)
print(f"Slack notification sent: {status}")
@task(name="notify-summary")
def build_summary(pipeline_name: str, rows_processed: int, duration_seconds: float, errors: list = None) -> str:
"""Build a human-readable pipeline summary."""
status = "completed" if not errors else "completed with errors"
summary = f"""Pipeline: {pipeline_name}
Status: {status}
Rows processed: {rows_processed:,}
Duration: {duration_seconds:.1f}s"""
if errors:
summary += f"\nErrors ({len(errors)}):\n" + "\n".join(f" - {e}" for e in errors)
print(summary)
return summary
Step 6: Compose Workflows from Blocks
Now assemble blocks into complete workflows. Each workflow reads like a recipe.
from prefect import flow
import time
@flow(name="api-to-warehouse")
def api_to_warehouse_pipeline(api_url: str, db_connection: str, table_name: str):
"""Extract from API, validate, clean, and load to warehouse."""
start = time.time()
# Extract
raw = extract_from_api(api_url)
# Validate
validated = validate_schema(raw, required_columns=["id", "created_at", "amount"])
validated = validate_completeness(validated, max_null_pct=0.05)
# Transform
cleaned = clean_string_columns(validated)
deduped = deduplicate(cleaned, subset=["id"])
# Load
rows = load_to_database(deduped, db_connection, table_name)
# Notify
duration = time.time() - start
summary = build_summary("api-to-warehouse", rows, duration)
notify_slack(summary, status="success")
@flow(name="file-to-report")
def file_to_report_pipeline(input_file: str, output_file: str, group_by: list[str]):
"""Extract from file, aggregate, and export report."""
start = time.time()
# Extract
raw = extract_from_file(input_file, file_type="csv")
# Validate
validated = validate_completeness(raw)
# Transform
cleaned = clean_string_columns(validated)
aggregated = aggregate_metrics(cleaned, group_by=group_by, metrics={
"total": ("amount", "sum"),
"count": ("id", "count"),
"average": ("amount", "mean"),
})
# Load
filepath = load_to_file(aggregated, output_file, file_type="excel")
# Notify
duration = time.time() - start
summary = build_summary("file-to-report", len(aggregated), duration)
notify_slack(summary, status="success")
Scheduling Composed Workflows
if __name__ == "__main__":
api_to_warehouse_pipeline.serve(
name="daily-api-ingest",
schedules=[{"cron": "0 6 * * *", "timezone": "Europe/London"}],
)
What This Replaces
| Duplicated Pattern | Reusable Block |
|---|---|
| Copy-pasted API fetching logic in 5 workflows | Single extract_from_api task |
| Different validation logic per pipeline | Shared validate_schema + validate_completeness |
| Database loading code repeated everywhere | One load_to_database task with retry logic |
| Some pipelines notify, others do not | Standard notify_slack block on every flow |
| Inconsistent error handling | Validation blocks with clear raise-on-failure |
Next Steps
For scheduling these workflows with cron and interval patterns, see Schedule and Orchestrate Workflows with Prefect. If you are migrating from Prefect 2 schedules, see Prefect IntervalSchedule Migration Guide.
For adding data quality checks to your pipeline blocks, see Build a Data Quality Framework in Python. For testing these reusable blocks properly, see Testing Data Pipelines with Pytest.
For structuring logs within your task blocks, see Structured Logging for Python Data Pipelines.
Automation services include designing reusable pipeline architectures and Prefect workflow consulting.
Get in touch to discuss building a modular pipeline architecture.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.
Get in touch