Testing Data Pipelines: A Practical Guide with pytest

·9 min read·Data & Dashboards

Write tests that catch broken pipelines before they produce wrong reports — with pytest fixtures, DataFrame assertions, mock APIs, and CI integration.

Testing Data Pipelines: A Practical Guide with pytest

You built a data pipeline. It runs every morning. It worked for three weeks. Then someone changed a column name in the source file and the pipeline produced a report with half the rows missing. Nobody noticed for five days.

Testing prevents this. Not manual "run it and eyeball the output" testing — automated tests that catch broken transformations, missing columns, and wrong aggregations before they reach production.

This guide covers how to test data pipelines with pytest. By the end, you will have unit tests, integration tests, fixture patterns, and CI configuration that you can apply to any Python pipeline.

# The Testing Architecture

flowchart TD
  U["Unit Tests\n(each function)"] --> I["Integration Tests\n(stage-to-stage)"]
  I --> E["End-to-End Tests\n(full pipeline)"]
  U --> CI["CI Pipeline\n(run on every push)"]
  I --> CI
  E --> CI
  CI -->|Pass| D[Deploy / Schedule]
  CI -->|Fail| B[Block & Alert]

Unit tests verify individual functions. Integration tests verify stages work together. End-to-end tests run the full pipeline on sample data. All three run automatically on every code change.

# What You Will Need

bash
pip install pytest pandas openpyxl pytest-cov
  • pytest — test framework
  • pandas — data processing (the code you are testing)
  • pytest-cov — test coverage reporting

# Step 1: Structure Your Pipeline for Testing

The key to testable pipelines: separate logic from I/O. Every transformation should be a pure function that takes data in and returns data out.

python
# pipeline/transform.py

import pandas as pd
import numpy as np

def standardise_columns(df):
    """Normalise column names to lowercase with underscores."""
    df = df.copy()
    df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
    return df

def clean_revenue(df, column="revenue"):
    """Convert revenue column to numeric, coerce errors to NaN."""
    df = df.copy()
    df[column] = pd.to_numeric(df[column], errors="coerce")
    return df

def remove_duplicates(df, key_columns):
    """Remove duplicate rows based on key columns."""
    before = len(df)
    df = df.drop_duplicates(subset=key_columns, keep="last")
    removed = before - len(df)
    return df, removed

def calculate_summary(df, group_by, value_column):
    """Aggregate data by group with standard metrics."""
    return (
        df.groupby(group_by)
        .agg(
            total=(value_column, "sum"),
            count=(value_column, "count"),
            average=(value_column, "mean"),
        )
        .reset_index()
        .round(2)
    )

def validate_schema(df, required_columns):
    """Check that all required columns exist."""
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    return True

Each function is pure — no file reads, no API calls, no side effects. This makes them trivially testable.

# Step 2: Write Unit Tests

# Basic DataFrame Tests

python
# tests/test_transform.py

import pytest
import pandas as pd
import numpy as np
from pipeline.transform import (
    standardise_columns,
    clean_revenue,
    remove_duplicates,
    calculate_summary,
    validate_schema,
)


class TestStandardiseColumns:
    def test_lowercases_columns(self):
        df = pd.DataFrame({"First Name": [1], "LAST NAME": [2]})
        result = standardise_columns(df)
        assert list(result.columns) == ["first_name", "last_name"]

    def test_strips_whitespace(self):
        df = pd.DataFrame({" name ": [1], "  age  ": [2]})
        result = standardise_columns(df)
        assert list(result.columns) == ["name", "age"]

    def test_does_not_modify_original(self):
        df = pd.DataFrame({"Name": [1]})
        standardise_columns(df)
        assert list(df.columns) == ["Name"]


class TestCleanRevenue:
    def test_converts_strings_to_numbers(self):
        df = pd.DataFrame({"revenue": ["100.50", "200", "300.75"]})
        result = clean_revenue(df)
        assert result["revenue"].dtype == np.float64
        assert result["revenue"].tolist() == [100.50, 200.0, 300.75]

    def test_coerces_invalid_to_nan(self):
        df = pd.DataFrame({"revenue": ["100", "N/A", "not a number"]})
        result = clean_revenue(df)
        assert result["revenue"].iloc[0] == 100.0
        assert pd.isna(result["revenue"].iloc[1])
        assert pd.isna(result["revenue"].iloc[2])

    def test_handles_empty_dataframe(self):
        df = pd.DataFrame({"revenue": pd.Series([], dtype=str)})
        result = clean_revenue(df)
        assert len(result) == 0


class TestRemoveDuplicates:
    def test_removes_exact_duplicates(self):
        df = pd.DataFrame({
            "id": [1, 2, 2, 3],
            "value": [10, 20, 25, 30],
        })
        result, removed = remove_duplicates(df, ["id"])
        assert len(result) == 3
        assert removed == 1

    def test_keeps_last_duplicate(self):
        df = pd.DataFrame({
            "id": [1, 1],
            "value": ["old", "new"],
        })
        result, _ = remove_duplicates(df, ["id"])
        assert result.iloc[0]["value"] == "new"

    def test_no_duplicates_returns_same(self):
        df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
        result, removed = remove_duplicates(df, ["id"])
        assert len(result) == 3
        assert removed == 0

# Testing Aggregations

python
class TestCalculateSummary:
    def test_groups_and_aggregates(self):
        df = pd.DataFrame({
            "region": ["North", "North", "South"],
            "revenue": [100, 200, 150],
        })
        result = calculate_summary(df, "region", "revenue")

        north = result[result["region"] == "North"].iloc[0]
        assert north["total"] == 300
        assert north["count"] == 2
        assert north["average"] == 150.0

    def test_single_group(self):
        df = pd.DataFrame({
            "region": ["North", "North"],
            "revenue": [100, 200],
        })
        result = calculate_summary(df, "region", "revenue")
        assert len(result) == 1
        assert result.iloc[0]["total"] == 300

    def test_rounds_to_two_decimals(self):
        df = pd.DataFrame({
            "region": ["A", "A", "A"],
            "revenue": [10, 20, 33],
        })
        result = calculate_summary(df, "region", "revenue")
        assert result.iloc[0]["average"] == 21.0


class TestValidateSchema:
    def test_passes_with_all_columns(self):
        df = pd.DataFrame({"id": [1], "name": ["test"], "value": [100]})
        assert validate_schema(df, ["id", "name", "value"]) is True

    def test_fails_with_missing_columns(self):
        df = pd.DataFrame({"id": [1], "name": ["test"]})
        with pytest.raises(ValueError, match="Missing required columns"):
            validate_schema(df, ["id", "name", "value"])

    def test_passes_with_extra_columns(self):
        df = pd.DataFrame({"id": [1], "name": ["test"], "extra": [True]})
        assert validate_schema(df, ["id", "name"]) is True

# Step 3: Test Fixtures for Reusable Data

Create fixtures that provide consistent test data across all tests:

python
# tests/conftest.py

import pytest
import pandas as pd
from datetime import datetime, timedelta


@pytest.fixture
def sample_orders():
    """Realistic order data for testing."""
    return pd.DataFrame({
        "order_id": [1001, 1002, 1003, 1004, 1005],
        "date": pd.date_range("2026-03-01", periods=5, freq="D"),
        "customer_id": [101, 102, 101, 103, 102],
        "product": ["Widget", "Gadget", "Widget", "Doohickey", "Gadget"],
        "region": ["North", "South", "North", "East", "South"],
        "revenue": [150.00, 250.00, 175.00, 320.00, 280.00],
        "quantity": [2, 1, 3, 1, 2],
    })


@pytest.fixture
def messy_orders():
    """Orders with common real-world data quality issues."""
    return pd.DataFrame({
        "Order ID": [1001, 1002, 1002, 1003, 1004],
        " Date ": ["2026-03-01", "2026-03-02", "2026-03-02", "invalid", "2026-03-04"],
        "Customer_ID": [101, 102, 102, 103, 104],
        "REVENUE": ["150.00", "N/A", "250.00", "300", "not_a_number"],
        "Region": ["north", " South ", "SOUTH", "East", ""],
    })


@pytest.fixture
def empty_dataframe():
    """Empty DataFrame with expected schema."""
    return pd.DataFrame(columns=["order_id", "date", "customer_id", "revenue", "region"])


@pytest.fixture
def large_orders():
    """Large dataset for performance testing."""
    n = 10000
    return pd.DataFrame({
        "order_id": range(n),
        "date": pd.date_range("2025-01-01", periods=n, freq="h"),
        "customer_id": [i % 100 for i in range(n)],
        "revenue": [round(50 + (i * 7.3) % 500, 2) for i in range(n)],
        "region": ["North", "South", "East", "West"] * (n // 4),
    })

# Using Fixtures in Tests

python
# tests/test_pipeline_integration.py

import pandas as pd
from pipeline.transform import (
    standardise_columns,
    clean_revenue,
    remove_duplicates,
    calculate_summary,
)


def test_full_transform_chain(messy_orders):
    """Test the complete transformation pipeline on messy data."""
    # Step 1: Standardise
    df = standardise_columns(messy_orders)
    assert "order_id" in df.columns
    assert "revenue" in df.columns

    # Step 2: Clean revenue
    df = clean_revenue(df)
    valid_revenue = df["revenue"].dropna()
    assert len(valid_revenue) >= 2  # Some should survive

    # Step 3: Deduplicate
    df, removed = remove_duplicates(df, ["order_id"])
    assert removed >= 1  # Order 1002 is duplicated

    # Step 4: Summarise
    df_valid = df.dropna(subset=["revenue"])
    if len(df_valid) > 0:
        result = calculate_summary(df_valid, "region", "revenue")
        assert "total" in result.columns
        assert result["total"].sum() > 0


def test_handles_empty_input(empty_dataframe):
    """Pipeline should handle empty DataFrames without crashing."""
    df = standardise_columns(empty_dataframe)
    assert len(df) == 0
    assert len(df.columns) == 5


def test_large_dataset_performance(large_orders):
    """Pipeline should handle 10k rows without issues."""
    df = standardise_columns(large_orders)
    df = clean_revenue(df)
    df, removed = remove_duplicates(df, ["order_id"])
    result = calculate_summary(df, "region", "revenue")

    assert len(result) == 4  # 4 regions
    assert result["count"].sum() == 10000

# Step 4: Mock External Dependencies

APIs, databases, and file systems should not be called during unit tests. Mock them.

python
# tests/test_extract.py

import pytest
import pandas as pd
from unittest.mock import patch, MagicMock


# The extraction function we are testing
def fetch_orders_from_api(base_url, api_key):
    """Fetch orders from REST API."""
    import requests
    response = requests.get(
        f"{base_url}/orders",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=30,
    )
    response.raise_for_status()
    return pd.DataFrame(response.json()["results"])


class TestFetchOrders:
    @patch("requests.get")
    def test_successful_fetch(self, mock_get):
        """Test successful API response."""
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "results": [
                {"id": 1, "amount": 100, "date": "2026-03-01"},
                {"id": 2, "amount": 200, "date": "2026-03-02"},
            ]
        }
        mock_response.raise_for_status = MagicMock()
        mock_get.return_value = mock_response

        result = fetch_orders_from_api("https://api.example.com", "test-key")

        assert len(result) == 2
        assert "amount" in result.columns
        mock_get.assert_called_once()

    @patch("requests.get")
    def test_api_timeout(self, mock_get):
        """Test handling of API timeout."""
        import requests
        mock_get.side_effect = requests.exceptions.Timeout("Connection timed out")

        with pytest.raises(requests.exceptions.Timeout):
            fetch_orders_from_api("https://api.example.com", "test-key")

    @patch("requests.get")
    def test_empty_response(self, mock_get):
        """Test handling of empty API response."""
        mock_response = MagicMock()
        mock_response.json.return_value = {"results": []}
        mock_response.raise_for_status = MagicMock()
        mock_get.return_value = mock_response

        result = fetch_orders_from_api("https://api.example.com", "test-key")
        assert len(result) == 0

# Mocking File System

python
# tests/test_file_extract.py

import pytest
import pandas as pd
from unittest.mock import patch
import os


def load_source_file(filepath):
    """Load data from Excel or CSV based on extension."""
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Source file not found: {filepath}")

    if filepath.endswith(".xlsx"):
        return pd.read_excel(filepath)
    elif filepath.endswith(".csv"):
        return pd.read_csv(filepath)
    else:
        raise ValueError(f"Unsupported file type: {filepath}")


class TestLoadSourceFile:
    @patch("os.path.exists", return_value=False)
    def test_missing_file_raises_error(self, mock_exists):
        with pytest.raises(FileNotFoundError, match="Source file not found"):
            load_source_file("data/missing.xlsx")

    @patch("pandas.read_excel")
    @patch("os.path.exists", return_value=True)
    def test_loads_excel(self, mock_exists, mock_read):
        mock_read.return_value = pd.DataFrame({"id": [1, 2]})
        result = load_source_file("data/orders.xlsx")
        assert len(result) == 2

    def test_unsupported_format(self, tmp_path):
        bad_file = tmp_path / "data.json"
        bad_file.write_text("{}")
        with pytest.raises(ValueError, match="Unsupported file type"):
            load_source_file(str(bad_file))

# Step 5: Snapshot Testing for Report Output

Verify that your pipeline output doesn't change unexpectedly:

python
# tests/test_report_output.py

import pytest
import pandas as pd
import json
import os

SNAPSHOT_DIR = "tests/snapshots"


def save_snapshot(name, df):
    """Save a DataFrame snapshot for comparison."""
    os.makedirs(SNAPSHOT_DIR, exist_ok=True)
    path = os.path.join(SNAPSHOT_DIR, f"{name}.json")
    snapshot = {
        "columns": list(df.columns),
        "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
        "shape": list(df.shape),
        "sample": df.head(5).to_dict(orient="records"),
    }
    with open(path, "w") as f:
        json.dump(snapshot, f, indent=2, default=str)


def load_snapshot(name):
    """Load a saved snapshot."""
    path = os.path.join(SNAPSHOT_DIR, f"{name}.json")
    if not os.path.exists(path):
        return None
    with open(path, "r") as f:
        return json.load(f)


def test_report_output_matches_snapshot(sample_orders):
    """Verify report output matches expected structure."""
    from pipeline.transform import calculate_summary

    result = calculate_summary(sample_orders, "region", "revenue")

    # Check structure
    assert list(result.columns) == ["region", "total", "count", "average"]
    assert len(result) > 0

    # Check types
    assert result["total"].dtype in [float, "float64"]
    assert result["count"].dtype in [int, "int64"]

    # Check values are reasonable
    assert result["total"].sum() == sample_orders["revenue"].sum()
    assert result["count"].sum() == len(sample_orders)

# Step 6: Test Configuration

# pytest.ini

ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
    slow: marks tests as slow (deselect with '-m "not slow"')
    integration: marks integration tests
addopts = -v --tb=short

# Running Tests

bash
# Run all tests
pytest

# Run with coverage report
pytest --cov=pipeline --cov-report=term-missing

# Run only unit tests (fast)
pytest tests/test_transform.py

# Run only integration tests
pytest -m integration

# Skip slow tests
pytest -m "not slow"

# Sample Coverage Output

text
---------- coverage: platform linux, python 3.11 ----------
Name                        Stmts   Miss  Cover   Missing
---------------------------------------------------------
pipeline/transform.py          45      2    96%   72-73
pipeline/extract.py            38      5    87%   45-49
pipeline/load.py               22      3    86%   31-33
---------------------------------------------------------
TOTAL                         105     10    90%

# Step 7: CI Integration

# GitHub Actions

yaml
# .github/workflows/test-pipeline.yml

name: Test Pipeline
on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run tests
        run: pytest --cov=pipeline --cov-report=xml

      - name: Check coverage threshold
        run: |
          coverage=$(python -c "import xml.etree.ElementTree as ET; print(float(ET.parse('coverage.xml').getroot().attrib['line-rate'])*100)")
          echo "Coverage: ${coverage}%"
          python -c "assert $coverage >= 80, f'Coverage {$coverage}% below 80% threshold'"

# Common Pipeline Test Patterns

What to test How to test it Why it matters
Column names after transform assert list(df.columns) == expected Schema changes break downstream
Row count after filtering assert len(result) == expected_count Silent row drops produce wrong totals
Aggregation totals assert result["total"].sum() == df["value"].sum() Groupby errors lose data
Null handling assert result["col"].isna().sum() == 0 Nulls propagate through calculations
Date parsing assert result["date"].dtype == "datetime64[ns]" String dates break time-series logic
Duplicate removal assert result["id"].is_unique Duplicates inflate counts
Edge cases Test with empty DataFrame, single row, all nulls Prevents crashes on unusual inputs

# What This Replaces

Manual process Automated test equivalent
"Run it and check the numbers look right" Assertion on exact expected values
"It worked last time" CI runs tests on every change
"The report looks weird, what changed?" Snapshot comparison catches drift
"Did that refactor break anything?" Full test suite runs in seconds
"Works on my machine" CI tests in a clean environment

# Next Steps

Start with unit tests for your transformation functions — they catch the most common bugs with the least effort. Add integration tests for stage-to-stage handoffs. Add CI once you have 10+ tests.

The return on investment is immediate: the first time a test catches a broken column rename or a bad aggregation before it reaches production, the testing setup has paid for itself.

For building the pipelines these tests protect, see How to Design Data Pipelines for Reliable Reporting. For making those pipelines recover from failures automatically, see How to Build Self-Healing Data Pipelines.

Data analytics services include building tested, reliable reporting systems with full CI/CD integration.

Get in touch to discuss adding tests to your data pipelines.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

test data pipeline pythonpytest data pipelinepython pipeline testingdataframe testing pytestmock api python testingdata pipeline ci cdpytest fixtures datatest etl pipelinedata validation testspipeline regression testing