Testing Data Pipelines: A Practical Guide with pytest

· 10 min read · Data & Dashboards

Write tests that catch broken pipelines before they produce wrong reports — with pytest fixtures, DataFrame assertions, mock APIs, and CI integration.

Testing Data Pipelines: A Practical Guide with pytest

You built a data pipeline. It runs every morning. It worked for three weeks. Then someone changed a column name in the source file and the pipeline produced a report with half the rows missing. Nobody noticed for five days.

Testing prevents this. Not manual “run it and eyeball the output” testing — automated tests that catch broken transformations, missing columns, and wrong aggregations before they reach production.

This guide covers how to test data pipelines with pytest. By the end, you will have unit tests, integration tests, fixture patterns, and CI configuration that you can apply to any Python pipeline.

Who This Is For

  • Data engineers whose pipelines have no tests and have been burned by silent failures
  • Developers who test application code but are unsure how to test data transformations
  • Vibe coders who built something that works but want confidence it keeps working as they change it
  • Teams adopting CI/CD for data workflows and needing a testing foundation

Basic Python is enough. If you have written a function and run it successfully, you can learn to test it. No prior pytest experience required — the guide starts from your first test.

The Testing Architecture

Unit tests verify individual functions. Integration tests verify stages work together. End-to-end tests run the full pipeline on sample data. All three run automatically on every code change.

What You Will Need

pip install pytest pandas openpyxl pytest-cov
  • pytest — test framework
  • pandas — data processing (the code you are testing)
  • pytest-cov — test coverage reporting

Step 1: Structure Your Pipeline for Testing

The key to testable pipelines: separate logic from I/O. Every transformation should be a pure function that takes data in and returns data out.

# pipeline/transform.py

import pandas as pd
import numpy as np

def standardise_columns(df):
    """Normalise column names to lowercase with underscores."""
    df = df.copy()
    df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
    return df

def clean_revenue(df, column="revenue"):
    """Convert revenue column to numeric, coerce errors to NaN."""
    df = df.copy()
    df[column] = pd.to_numeric(df[column], errors="coerce")
    return df

def remove_duplicates(df, key_columns):
    """Remove duplicate rows based on key columns."""
    before = len(df)
    df = df.drop_duplicates(subset=key_columns, keep="last")
    removed = before - len(df)
    return df, removed

def calculate_summary(df, group_by, value_column):
    """Aggregate data by group with standard metrics."""
    return (
        df.groupby(group_by)
        .agg(
            total=(value_column, "sum"),
            count=(value_column, "count"),
            average=(value_column, "mean"),
        )
        .reset_index()
        .round(2)
    )

def validate_schema(df, required_columns):
    """Check that all required columns exist."""
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    return True

Each function is pure — no file reads, no API calls, no side effects. This makes them trivially testable.

Step 2: Write Unit Tests

Basic DataFrame Tests

# tests/test_transform.py

import pytest
import pandas as pd
import numpy as np
from pipeline.transform import (
    standardise_columns,
    clean_revenue,
    remove_duplicates,
    calculate_summary,
    validate_schema,
)


class TestStandardiseColumns:
    def test_lowercases_columns(self):
        df = pd.DataFrame({"First Name": [1], "LAST NAME": [2]})
        result = standardise_columns(df)
        assert list(result.columns) == ["first_name", "last_name"]

    def test_strips_whitespace(self):
        df = pd.DataFrame({" name ": [1], "  age  ": [2]})
        result = standardise_columns(df)
        assert list(result.columns) == ["name", "age"]

    def test_does_not_modify_original(self):
        df = pd.DataFrame({"Name": [1]})
        standardise_columns(df)
        assert list(df.columns) == ["Name"]


class TestCleanRevenue:
    def test_converts_strings_to_numbers(self):
        df = pd.DataFrame({"revenue": ["100.50", "200", "300.75"]})
        result = clean_revenue(df)
        assert result["revenue"].dtype == np.float64
        assert result["revenue"].tolist() == [100.50, 200.0, 300.75]

    def test_coerces_invalid_to_nan(self):
        df = pd.DataFrame({"revenue": ["100", "N/A", "not a number"]})
        result = clean_revenue(df)
        assert result["revenue"].iloc[0] == 100.0
        assert pd.isna(result["revenue"].iloc[1])
        assert pd.isna(result["revenue"].iloc[2])

    def test_handles_empty_dataframe(self):
        df = pd.DataFrame({"revenue": pd.Series([], dtype=str)})
        result = clean_revenue(df)
        assert len(result) == 0


class TestRemoveDuplicates:
    def test_removes_exact_duplicates(self):
        df = pd.DataFrame({
            "id": [1, 2, 2, 3],
            "value": [10, 20, 25, 30],
        })
        result, removed = remove_duplicates(df, ["id"])
        assert len(result) == 3
        assert removed == 1

    def test_keeps_last_duplicate(self):
        df = pd.DataFrame({
            "id": [1, 1],
            "value": ["old", "new"],
        })
        result, _ = remove_duplicates(df, ["id"])
        assert result.iloc[0]["value"] == "new"

    def test_no_duplicates_returns_same(self):
        df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
        result, removed = remove_duplicates(df, ["id"])
        assert len(result) == 3
        assert removed == 0

Testing Aggregations

class TestCalculateSummary:
    def test_groups_and_aggregates(self):
        df = pd.DataFrame({
            "region": ["North", "North", "South"],
            "revenue": [100, 200, 150],
        })
        result = calculate_summary(df, "region", "revenue")

        north = result[result["region"] == "North"].iloc[0]
        assert north["total"] == 300
        assert north["count"] == 2
        assert north["average"] == 150.0

    def test_single_group(self):
        df = pd.DataFrame({
            "region": ["North", "North"],
            "revenue": [100, 200],
        })
        result = calculate_summary(df, "region", "revenue")
        assert len(result) == 1
        assert result.iloc[0]["total"] == 300

    def test_rounds_to_two_decimals(self):
        df = pd.DataFrame({
            "region": ["A", "A", "A"],
            "revenue": [10, 20, 33],
        })
        result = calculate_summary(df, "region", "revenue")
        assert result.iloc[0]["average"] == 21.0


class TestValidateSchema:
    def test_passes_with_all_columns(self):
        df = pd.DataFrame({"id": [1], "name": ["test"], "value": [100]})
        assert validate_schema(df, ["id", "name", "value"]) is True

    def test_fails_with_missing_columns(self):
        df = pd.DataFrame({"id": [1], "name": ["test"]})
        with pytest.raises(ValueError, match="Missing required columns"):
            validate_schema(df, ["id", "name", "value"])

    def test_passes_with_extra_columns(self):
        df = pd.DataFrame({"id": [1], "name": ["test"], "extra": [True]})
        assert validate_schema(df, ["id", "name"]) is True

Step 3: Test Fixtures for Reusable Data

Create fixtures that provide consistent test data across all tests:

# tests/conftest.py

import pytest
import pandas as pd
from datetime import datetime, timedelta


@pytest.fixture
def sample_orders():
    """Realistic order data for testing."""
    return pd.DataFrame({
        "order_id": [1001, 1002, 1003, 1004, 1005],
        "date": pd.date_range("2026-03-01", periods=5, freq="D"),
        "customer_id": [101, 102, 101, 103, 102],
        "product": ["Widget", "Gadget", "Widget", "Doohickey", "Gadget"],
        "region": ["North", "South", "North", "East", "South"],
        "revenue": [150.00, 250.00, 175.00, 320.00, 280.00],
        "quantity": [2, 1, 3, 1, 2],
    })


@pytest.fixture
def messy_orders():
    """Orders with common real-world data quality issues."""
    return pd.DataFrame({
        "Order ID": [1001, 1002, 1002, 1003, 1004],
        " Date ": ["2026-03-01", "2026-03-02", "2026-03-02", "invalid", "2026-03-04"],
        "Customer_ID": [101, 102, 102, 103, 104],
        "REVENUE": ["150.00", "N/A", "250.00", "300", "not_a_number"],
        "Region": ["north", " South ", "SOUTH", "East", ""],
    })


@pytest.fixture
def empty_dataframe():
    """Empty DataFrame with expected schema."""
    return pd.DataFrame(columns=["order_id", "date", "customer_id", "revenue", "region"])


@pytest.fixture
def large_orders():
    """Large dataset for performance testing."""
    n = 10000
    return pd.DataFrame({
        "order_id": range(n),
        "date": pd.date_range("2025-01-01", periods=n, freq="h"),
        "customer_id": [i % 100 for i in range(n)],
        "revenue": [round(50 + (i * 7.3) % 500, 2) for i in range(n)],
        "region": ["North", "South", "East", "West"] * (n // 4),
    })

Using Fixtures in Tests

# tests/test_pipeline_integration.py

import pandas as pd
from pipeline.transform import (
    standardise_columns,
    clean_revenue,
    remove_duplicates,
    calculate_summary,
)


def test_full_transform_chain(messy_orders):
    """Test the complete transformation pipeline on messy data."""
    # Step 1: Standardise
    df = standardise_columns(messy_orders)
    assert "order_id" in df.columns
    assert "revenue" in df.columns

    # Step 2: Clean revenue
    df = clean_revenue(df)
    valid_revenue = df["revenue"].dropna()
    assert len(valid_revenue) >= 2  # Some should survive

    # Step 3: Deduplicate
    df, removed = remove_duplicates(df, ["order_id"])
    assert removed >= 1  # Order 1002 is duplicated

    # Step 4: Summarise
    df_valid = df.dropna(subset=["revenue"])
    if len(df_valid) > 0:
        result = calculate_summary(df_valid, "region", "revenue")
        assert "total" in result.columns
        assert result["total"].sum() > 0


def test_handles_empty_input(empty_dataframe):
    """Pipeline should handle empty DataFrames without crashing."""
    df = standardise_columns(empty_dataframe)
    assert len(df) == 0
    assert len(df.columns) == 5


def test_large_dataset_performance(large_orders):
    """Pipeline should handle 10k rows without issues."""
    df = standardise_columns(large_orders)
    df = clean_revenue(df)
    df, removed = remove_duplicates(df, ["order_id"])
    result = calculate_summary(df, "region", "revenue")

    assert len(result) == 4  # 4 regions
    assert result["count"].sum() == 10000

Step 4: Mock External Dependencies

APIs, databases, and file systems should not be called during unit tests. Mock them.

# tests/test_extract.py

import pytest
import pandas as pd
from unittest.mock import patch, MagicMock


# The extraction function we are testing
def fetch_orders_from_api(base_url, api_key):
    """Fetch orders from REST API."""
    import requests
    response = requests.get(
        f"{base_url}/orders",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=30,
    )
    response.raise_for_status()
    return pd.DataFrame(response.json()["results"])


class TestFetchOrders:
    @patch("requests.get")
    def test_successful_fetch(self, mock_get):
        """Test successful API response."""
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "results": [
                {"id": 1, "amount": 100, "date": "2026-03-01"},
                {"id": 2, "amount": 200, "date": "2026-03-02"},
            ]
        }
        mock_response.raise_for_status = MagicMock()
        mock_get.return_value = mock_response

        result = fetch_orders_from_api("https://api.example.com", "test-key")

        assert len(result) == 2
        assert "amount" in result.columns
        mock_get.assert_called_once()

    @patch("requests.get")
    def test_api_timeout(self, mock_get):
        """Test handling of API timeout."""
        import requests
        mock_get.side_effect = requests.exceptions.Timeout("Connection timed out")

        with pytest.raises(requests.exceptions.Timeout):
            fetch_orders_from_api("https://api.example.com", "test-key")

    @patch("requests.get")
    def test_empty_response(self, mock_get):
        """Test handling of empty API response."""
        mock_response = MagicMock()
        mock_response.json.return_value = {"results": []}
        mock_response.raise_for_status = MagicMock()
        mock_get.return_value = mock_response

        result = fetch_orders_from_api("https://api.example.com", "test-key")
        assert len(result) == 0

Mocking File System

# tests/test_file_extract.py

import pytest
import pandas as pd
from unittest.mock import patch
import os


def load_source_file(filepath):
    """Load data from Excel or CSV based on extension."""
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Source file not found: {filepath}")

    if filepath.endswith(".xlsx"):
        return pd.read_excel(filepath)
    elif filepath.endswith(".csv"):
        return pd.read_csv(filepath)
    else:
        raise ValueError(f"Unsupported file type: {filepath}")


class TestLoadSourceFile:
    @patch("os.path.exists", return_value=False)
    def test_missing_file_raises_error(self, mock_exists):
        with pytest.raises(FileNotFoundError, match="Source file not found"):
            load_source_file("data/missing.xlsx")

    @patch("pandas.read_excel")
    @patch("os.path.exists", return_value=True)
    def test_loads_excel(self, mock_exists, mock_read):
        mock_read.return_value = pd.DataFrame({"id": [1, 2]})
        result = load_source_file("data/orders.xlsx")
        assert len(result) == 2

    def test_unsupported_format(self, tmp_path):
        bad_file = tmp_path / "data.json"
        bad_file.write_text("{}")
        with pytest.raises(ValueError, match="Unsupported file type"):
            load_source_file(str(bad_file))

Step 5: Snapshot Testing for Report Output

Verify that your pipeline output doesn’t change unexpectedly:

# tests/test_report_output.py

import pytest
import pandas as pd
import json
import os

SNAPSHOT_DIR = "tests/snapshots"


def save_snapshot(name, df):
    """Save a DataFrame snapshot for comparison."""
    os.makedirs(SNAPSHOT_DIR, exist_ok=True)
    path = os.path.join(SNAPSHOT_DIR, f"{name}.json")
    snapshot = {
        "columns": list(df.columns),
        "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
        "shape": list(df.shape),
        "sample": df.head(5).to_dict(orient="records"),
    }
    with open(path, "w") as f:
        json.dump(snapshot, f, indent=2, default=str)


def load_snapshot(name):
    """Load a saved snapshot."""
    path = os.path.join(SNAPSHOT_DIR, f"{name}.json")
    if not os.path.exists(path):
        return None
    with open(path, "r") as f:
        return json.load(f)


def test_report_output_matches_snapshot(sample_orders):
    """Verify report output matches expected structure."""
    from pipeline.transform import calculate_summary

    result = calculate_summary(sample_orders, "region", "revenue")

    # Check structure
    assert list(result.columns) == ["region", "total", "count", "average"]
    assert len(result) > 0

    # Check types
    assert result["total"].dtype in [float, "float64"]
    assert result["count"].dtype in [int, "int64"]

    # Check values are reasonable
    assert result["total"].sum() == sample_orders["revenue"].sum()
    assert result["count"].sum() == len(sample_orders)

Step 6: Test Configuration

pytest.ini

[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
    slow: marks tests as slow (deselect with '-m "not slow"')
    integration: marks integration tests
addopts = -v --tb=short

Running Tests

# Run all tests
pytest

# Run with coverage report
pytest --cov=pipeline --cov-report=term-missing

# Run only unit tests (fast)
pytest tests/test_transform.py

# Run only integration tests
pytest -m integration

# Skip slow tests
pytest -m "not slow"

Sample Coverage Output

---------- coverage: platform linux, python 3.11 ----------
Name                        Stmts   Miss  Cover   Missing
---------------------------------------------------------
pipeline/transform.py          45      2    96%   72-73
pipeline/extract.py            38      5    87%   45-49
pipeline/load.py               22      3    86%   31-33
---------------------------------------------------------
TOTAL                         105     10    90%

Step 7: CI Integration

GitHub Actions

# .github/workflows/test-pipeline.yml

name: Test Pipeline
on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run tests
        run: pytest --cov=pipeline --cov-report=xml

      - name: Check coverage threshold
        run: |
          coverage=$(python -c "import xml.etree.ElementTree as ET; print(float(ET.parse('coverage.xml').getroot().attrib['line-rate'])*100)")
          echo "Coverage: ${coverage}%"
          python -c "assert $coverage >= 80, f'Coverage {$coverage}% below 80% threshold'"

Common Pipeline Test Patterns

What to testHow to test itWhy it matters
Column names after transformassert list(df.columns) == expectedSchema changes break downstream
Row count after filteringassert len(result) == expected_countSilent row drops produce wrong totals
Aggregation totalsassert result["total"].sum() == df["value"].sum()Groupby errors lose data
Null handlingassert result["col"].isna().sum() == 0Nulls propagate through calculations
Date parsingassert result["date"].dtype == "datetime64[ns]"String dates break time-series logic
Duplicate removalassert result["id"].is_uniqueDuplicates inflate counts
Edge casesTest with empty DataFrame, single row, all nullsPrevents crashes on unusual inputs

What This Replaces

Manual processAutomated test equivalent
”Run it and check the numbers look right”Assertion on exact expected values
”It worked last time”CI runs tests on every change
”The report looks weird, what changed?”Snapshot comparison catches drift
”Did that refactor break anything?”Full test suite runs in seconds
”Works on my machine”CI tests in a clean environment

Next Steps

Start with unit tests for your transformation functions — they catch the most common bugs with the least effort. Add integration tests for stage-to-stage handoffs. Add CI once you have 10+ tests.

The return on investment is immediate: the first time a test catches a broken column rename or a bad aggregation before it reaches production, the testing setup has paid for itself.

For building the pipelines these tests protect, see How to Design Data Pipelines for Reliable Reporting. For making those pipelines recover from failures automatically, see How to Build Self-Healing Data Pipelines.

Data analytics services include building tested, reliable reporting systems with full CI/CD integration.

Get in touch to discuss adding tests to your data pipelines.

test data pipeline python pytest data pipeline python pipeline testing dataframe testing pytest mock api python testing data pipeline ci cd pytest fixtures data test etl pipeline data validation tests pipeline regression testing

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles