Prefect Reusable Task Blocks: Build Composable Workflow Components

· 7 min read · Automation

Build reusable task blocks in Prefect that snap together into different workflows — shared extractors, validators, loaders, and notification tasks that eliminate duplication across pipelines.

Prefect Reusable Task Blocks: Build Composable Workflow Components

Every Prefect workflow starts simple: extract data, transform it, load it somewhere. Then you build a second workflow that extracts data from a different source but loads it the same way. Then a third. By the fifth pipeline, you have five copies of the same database loader, three variations of the same email notification task, and no consistency between them.

The fix is reusable task blocks — self-contained units of work that snap together into any workflow. Build the extraction logic once. Build the validation once. Build the loader once. Then compose workflows by connecting these blocks like building bricks.

Who This Is For

  • Data engineers maintaining multiple Prefect workflows with duplicated task logic
  • Developers who want to build a shared task library for their team
  • Vibe coders designing workflows that are modular enough to rearrange without rewriting
  • Teams standardising how data moves through their pipelines

This guide assumes you know the basics of Prefect flows and tasks. If you are starting from scratch, see Schedule and Orchestrate Workflows with Prefect first.

The Composable Architecture

Same validation block. Same transform block. Same notification block. Different sources and destinations.

What You Will Need

pip install prefect pandas requests sqlalchemy

Step 1: Build Extraction Blocks

Each extractor handles one source type. They all return the same output format: a pandas DataFrame.

from prefect import task
import pandas as pd
import requests
from sqlalchemy import create_engine

@task(retries=3, retry_delay_seconds=30, name="extract-api")
def extract_from_api(url: str, headers: dict = None, params: dict = None) -> pd.DataFrame:
    """Extract data from a REST API endpoint."""
    response = requests.get(url, headers=headers or {}, params=params or {})
    response.raise_for_status()
    data = response.json()

    # Handle common API response patterns
    if isinstance(data, list):
        df = pd.DataFrame(data)
    elif isinstance(data, dict):
        # Look for a results key
        for key in ["results", "data", "items", "records"]:
            if key in data:
                df = pd.DataFrame(data[key])
                break
        else:
            df = pd.DataFrame([data])
    else:
        raise ValueError(f"Unexpected response type: {type(data)}")

    print(f"Extracted {len(df)} rows from {url}")
    return df


@task(retries=2, retry_delay_seconds=10, name="extract-database")
def extract_from_database(connection_string: str, query: str) -> pd.DataFrame:
    """Extract data from a SQL database."""
    engine = create_engine(connection_string)
    df = pd.read_sql(query, engine)
    print(f"Extracted {len(df)} rows from database")
    return df


@task(name="extract-file")
def extract_from_file(filepath: str, file_type: str = "csv") -> pd.DataFrame:
    """Extract data from a local file."""
    readers = {
        "csv": pd.read_csv,
        "excel": pd.read_excel,
        "json": pd.read_json,
        "parquet": pd.read_parquet,
    }
    reader = readers.get(file_type)
    if not reader:
        raise ValueError(f"Unsupported file type: {file_type}")

    df = reader(filepath)
    print(f"Extracted {len(df)} rows from {filepath}")
    return df

Step 2: Build Validation Blocks

Validation catches bad data before it reaches your warehouse. These blocks raise on failure and log warnings on anomalies.

@task(name="validate-schema")
def validate_schema(df: pd.DataFrame, required_columns: list[str], type_checks: dict = None) -> pd.DataFrame:
    """Validate DataFrame has required columns and correct types."""
    # Check required columns exist
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Check types if specified
    type_checks = type_checks or {}
    for col, expected_type in type_checks.items():
        if col in df.columns:
            actual_type = df[col].dtype
            if not pd.api.types.is_dtype_equal(actual_type, expected_type):
                print(f"Warning: {col} is {actual_type}, expected {expected_type}")

    print(f"Schema validation passed: {len(df)} rows, {len(df.columns)} columns")
    return df


@task(name="validate-completeness")
def validate_completeness(df: pd.DataFrame, max_null_pct: float = 0.1) -> pd.DataFrame:
    """Check that no column exceeds the null percentage threshold."""
    null_pcts = df.isnull().mean()
    violations = null_pcts[null_pcts > max_null_pct]

    if len(violations) > 0:
        for col, pct in violations.items():
            print(f"Warning: {col} is {pct:.1%} null (threshold: {max_null_pct:.1%})")
        raise ValueError(f"{len(violations)} columns exceed null threshold")

    print(f"Completeness check passed (max null: {null_pcts.max():.1%})")
    return df


@task(name="validate-freshness")
def validate_freshness(df: pd.DataFrame, date_column: str, max_age_hours: int = 48) -> pd.DataFrame:
    """Verify the data is not stale."""
    from datetime import datetime, timedelta

    df[date_column] = pd.to_datetime(df[date_column])
    latest = df[date_column].max()
    cutoff = datetime.now() - timedelta(hours=max_age_hours)

    if latest < cutoff:
        raise ValueError(f"Data is stale: latest record is {latest}, cutoff is {cutoff}")

    print(f"Freshness check passed: latest record {latest}")
    return df

Step 3: Build Transform Blocks

Transforms clean and reshape data. Each one does one thing and returns a DataFrame.

@task(name="clean-strings")
def clean_string_columns(df: pd.DataFrame, columns: list[str] = None) -> pd.DataFrame:
    """Strip whitespace and normalise string columns."""
    cols = columns or df.select_dtypes(include=["object"]).columns.tolist()
    for col in cols:
        if col in df.columns:
            df[col] = df[col].str.strip().str.lower()
    print(f"Cleaned {len(cols)} string columns")
    return df


@task(name="deduplicate")
def deduplicate(df: pd.DataFrame, subset: list[str] = None, keep: str = "last") -> pd.DataFrame:
    """Remove duplicate rows."""
    before = len(df)
    df = df.drop_duplicates(subset=subset, keep=keep)
    removed = before - len(df)
    print(f"Removed {removed} duplicates ({len(df)} rows remaining)")
    return df


@task(name="aggregate-metrics")
def aggregate_metrics(df: pd.DataFrame, group_by: list[str], metrics: dict) -> pd.DataFrame:
    """Aggregate data by specified dimensions and metrics."""
    result = df.groupby(group_by).agg(**metrics).reset_index()
    print(f"Aggregated to {len(result)} rows by {group_by}")
    return result

Step 4: Build Loader Blocks

Loaders write data to destinations. Each one handles connection management and error reporting.

@task(retries=2, retry_delay_seconds=15, name="load-database")
def load_to_database(df: pd.DataFrame, connection_string: str, table_name: str, if_exists: str = "append") -> int:
    """Load DataFrame into a SQL database table."""
    engine = create_engine(connection_string)
    df.to_sql(table_name, engine, if_exists=if_exists, index=False)
    print(f"Loaded {len(df)} rows into {table_name}")
    return len(df)


@task(name="load-file")
def load_to_file(df: pd.DataFrame, filepath: str, file_type: str = "csv") -> str:
    """Export DataFrame to a file."""
    writers = {
        "csv": lambda: df.to_csv(filepath, index=False),
        "excel": lambda: df.to_excel(filepath, index=False),
        "json": lambda: df.to_json(filepath, orient="records", indent=2),
        "parquet": lambda: df.to_parquet(filepath, index=False),
    }
    writer = writers.get(file_type)
    if not writer:
        raise ValueError(f"Unsupported file type: {file_type}")

    writer()
    print(f"Exported {len(df)} rows to {filepath}")
    return filepath

Step 5: Build Notification Blocks

Every pipeline should report its outcome. Build notification tasks that work across all workflows.

import os
import json

@task(name="notify-slack")
def notify_slack(message: str, channel: str = None, status: str = "info"):
    """Send a notification to Slack via webhook."""
    webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
    if not webhook_url:
        print("Slack webhook not configured, skipping notification")
        return

    color_map = {"info": "#04d9ff", "success": "#22c55e", "error": "#ef4444", "warning": "#f59e0b"}

    payload = {
        "channel": channel,
        "attachments": [{
            "color": color_map.get(status, "#04d9ff"),
            "text": message,
        }],
    }

    requests.post(webhook_url, json=payload)
    print(f"Slack notification sent: {status}")


@task(name="notify-summary")
def build_summary(pipeline_name: str, rows_processed: int, duration_seconds: float, errors: list = None) -> str:
    """Build a human-readable pipeline summary."""
    status = "completed" if not errors else "completed with errors"
    summary = f"""Pipeline: {pipeline_name}
Status: {status}
Rows processed: {rows_processed:,}
Duration: {duration_seconds:.1f}s"""

    if errors:
        summary += f"\nErrors ({len(errors)}):\n" + "\n".join(f"  - {e}" for e in errors)

    print(summary)
    return summary

Step 6: Compose Workflows from Blocks

Now assemble blocks into complete workflows. Each workflow reads like a recipe.

from prefect import flow
import time

@flow(name="api-to-warehouse")
def api_to_warehouse_pipeline(api_url: str, db_connection: str, table_name: str):
    """Extract from API, validate, clean, and load to warehouse."""
    start = time.time()

    # Extract
    raw = extract_from_api(api_url)

    # Validate
    validated = validate_schema(raw, required_columns=["id", "created_at", "amount"])
    validated = validate_completeness(validated, max_null_pct=0.05)

    # Transform
    cleaned = clean_string_columns(validated)
    deduped = deduplicate(cleaned, subset=["id"])

    # Load
    rows = load_to_database(deduped, db_connection, table_name)

    # Notify
    duration = time.time() - start
    summary = build_summary("api-to-warehouse", rows, duration)
    notify_slack(summary, status="success")


@flow(name="file-to-report")
def file_to_report_pipeline(input_file: str, output_file: str, group_by: list[str]):
    """Extract from file, aggregate, and export report."""
    start = time.time()

    # Extract
    raw = extract_from_file(input_file, file_type="csv")

    # Validate
    validated = validate_completeness(raw)

    # Transform
    cleaned = clean_string_columns(validated)
    aggregated = aggregate_metrics(cleaned, group_by=group_by, metrics={
        "total": ("amount", "sum"),
        "count": ("id", "count"),
        "average": ("amount", "mean"),
    })

    # Load
    filepath = load_to_file(aggregated, output_file, file_type="excel")

    # Notify
    duration = time.time() - start
    summary = build_summary("file-to-report", len(aggregated), duration)
    notify_slack(summary, status="success")

Scheduling Composed Workflows

if __name__ == "__main__":
    api_to_warehouse_pipeline.serve(
        name="daily-api-ingest",
        schedules=[{"cron": "0 6 * * *", "timezone": "Europe/London"}],
    )

What This Replaces

Duplicated PatternReusable Block
Copy-pasted API fetching logic in 5 workflowsSingle extract_from_api task
Different validation logic per pipelineShared validate_schema + validate_completeness
Database loading code repeated everywhereOne load_to_database task with retry logic
Some pipelines notify, others do notStandard notify_slack block on every flow
Inconsistent error handlingValidation blocks with clear raise-on-failure

Next Steps

For scheduling these workflows with cron and interval patterns, see Schedule and Orchestrate Workflows with Prefect. If you are migrating from Prefect 2 schedules, see Prefect IntervalSchedule Migration Guide.

For adding data quality checks to your pipeline blocks, see Build a Data Quality Framework in Python. For testing these reusable blocks properly, see Testing Data Pipelines with Pytest.

For structuring logs within your task blocks, see Structured Logging for Python Data Pipelines.

Automation services include designing reusable pipeline architectures and Prefect workflow consulting.

Get in touch to discuss building a modular pipeline architecture.

prefect workflow reusable task blocks prefect reusable tasks prefect task library prefect composable workflows prefect shared tasks prefect task patterns prefect modular pipelines reusable data pipeline components prefect task best practices prefect workflow design patterns

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles