Testing Data Pipelines: A Practical Guide with pytest
Write tests that catch broken pipelines before they produce wrong reports — with pytest fixtures, DataFrame assertions, mock APIs, and CI integration.

You built a data pipeline. It runs every morning. It worked for three weeks. Then someone changed a column name in the source file and the pipeline produced a report with half the rows missing. Nobody noticed for five days.
Testing prevents this. Not manual "run it and eyeball the output" testing — automated tests that catch broken transformations, missing columns, and wrong aggregations before they reach production.
This guide covers how to test data pipelines with pytest. By the end, you will have unit tests, integration tests, fixture patterns, and CI configuration that you can apply to any Python pipeline.
# The Testing Architecture
flowchart TD U["Unit Tests\n(each function)"] --> I["Integration Tests\n(stage-to-stage)"] I --> E["End-to-End Tests\n(full pipeline)"] U --> CI["CI Pipeline\n(run on every push)"] I --> CI E --> CI CI -->|Pass| D[Deploy / Schedule] CI -->|Fail| B[Block & Alert]
Unit tests verify individual functions. Integration tests verify stages work together. End-to-end tests run the full pipeline on sample data. All three run automatically on every code change.
# What You Will Need
pip install pytest pandas openpyxl pytest-cov
- pytest — test framework
- pandas — data processing (the code you are testing)
- pytest-cov — test coverage reporting
# Step 1: Structure Your Pipeline for Testing
The key to testable pipelines: separate logic from I/O. Every transformation should be a pure function that takes data in and returns data out.
# pipeline/transform.py
import pandas as pd
import numpy as np
def standardise_columns(df):
"""Normalise column names to lowercase with underscores."""
df = df.copy()
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
return df
def clean_revenue(df, column="revenue"):
"""Convert revenue column to numeric, coerce errors to NaN."""
df = df.copy()
df[column] = pd.to_numeric(df[column], errors="coerce")
return df
def remove_duplicates(df, key_columns):
"""Remove duplicate rows based on key columns."""
before = len(df)
df = df.drop_duplicates(subset=key_columns, keep="last")
removed = before - len(df)
return df, removed
def calculate_summary(df, group_by, value_column):
"""Aggregate data by group with standard metrics."""
return (
df.groupby(group_by)
.agg(
total=(value_column, "sum"),
count=(value_column, "count"),
average=(value_column, "mean"),
)
.reset_index()
.round(2)
)
def validate_schema(df, required_columns):
"""Check that all required columns exist."""
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
return True
Each function is pure — no file reads, no API calls, no side effects. This makes them trivially testable.
# Step 2: Write Unit Tests
# Basic DataFrame Tests
# tests/test_transform.py
import pytest
import pandas as pd
import numpy as np
from pipeline.transform import (
standardise_columns,
clean_revenue,
remove_duplicates,
calculate_summary,
validate_schema,
)
class TestStandardiseColumns:
def test_lowercases_columns(self):
df = pd.DataFrame({"First Name": [1], "LAST NAME": [2]})
result = standardise_columns(df)
assert list(result.columns) == ["first_name", "last_name"]
def test_strips_whitespace(self):
df = pd.DataFrame({" name ": [1], " age ": [2]})
result = standardise_columns(df)
assert list(result.columns) == ["name", "age"]
def test_does_not_modify_original(self):
df = pd.DataFrame({"Name": [1]})
standardise_columns(df)
assert list(df.columns) == ["Name"]
class TestCleanRevenue:
def test_converts_strings_to_numbers(self):
df = pd.DataFrame({"revenue": ["100.50", "200", "300.75"]})
result = clean_revenue(df)
assert result["revenue"].dtype == np.float64
assert result["revenue"].tolist() == [100.50, 200.0, 300.75]
def test_coerces_invalid_to_nan(self):
df = pd.DataFrame({"revenue": ["100", "N/A", "not a number"]})
result = clean_revenue(df)
assert result["revenue"].iloc[0] == 100.0
assert pd.isna(result["revenue"].iloc[1])
assert pd.isna(result["revenue"].iloc[2])
def test_handles_empty_dataframe(self):
df = pd.DataFrame({"revenue": pd.Series([], dtype=str)})
result = clean_revenue(df)
assert len(result) == 0
class TestRemoveDuplicates:
def test_removes_exact_duplicates(self):
df = pd.DataFrame({
"id": [1, 2, 2, 3],
"value": [10, 20, 25, 30],
})
result, removed = remove_duplicates(df, ["id"])
assert len(result) == 3
assert removed == 1
def test_keeps_last_duplicate(self):
df = pd.DataFrame({
"id": [1, 1],
"value": ["old", "new"],
})
result, _ = remove_duplicates(df, ["id"])
assert result.iloc[0]["value"] == "new"
def test_no_duplicates_returns_same(self):
df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
result, removed = remove_duplicates(df, ["id"])
assert len(result) == 3
assert removed == 0
# Testing Aggregations
class TestCalculateSummary:
def test_groups_and_aggregates(self):
df = pd.DataFrame({
"region": ["North", "North", "South"],
"revenue": [100, 200, 150],
})
result = calculate_summary(df, "region", "revenue")
north = result[result["region"] == "North"].iloc[0]
assert north["total"] == 300
assert north["count"] == 2
assert north["average"] == 150.0
def test_single_group(self):
df = pd.DataFrame({
"region": ["North", "North"],
"revenue": [100, 200],
})
result = calculate_summary(df, "region", "revenue")
assert len(result) == 1
assert result.iloc[0]["total"] == 300
def test_rounds_to_two_decimals(self):
df = pd.DataFrame({
"region": ["A", "A", "A"],
"revenue": [10, 20, 33],
})
result = calculate_summary(df, "region", "revenue")
assert result.iloc[0]["average"] == 21.0
class TestValidateSchema:
def test_passes_with_all_columns(self):
df = pd.DataFrame({"id": [1], "name": ["test"], "value": [100]})
assert validate_schema(df, ["id", "name", "value"]) is True
def test_fails_with_missing_columns(self):
df = pd.DataFrame({"id": [1], "name": ["test"]})
with pytest.raises(ValueError, match="Missing required columns"):
validate_schema(df, ["id", "name", "value"])
def test_passes_with_extra_columns(self):
df = pd.DataFrame({"id": [1], "name": ["test"], "extra": [True]})
assert validate_schema(df, ["id", "name"]) is True
# Step 3: Test Fixtures for Reusable Data
Create fixtures that provide consistent test data across all tests:
# tests/conftest.py
import pytest
import pandas as pd
from datetime import datetime, timedelta
@pytest.fixture
def sample_orders():
"""Realistic order data for testing."""
return pd.DataFrame({
"order_id": [1001, 1002, 1003, 1004, 1005],
"date": pd.date_range("2026-03-01", periods=5, freq="D"),
"customer_id": [101, 102, 101, 103, 102],
"product": ["Widget", "Gadget", "Widget", "Doohickey", "Gadget"],
"region": ["North", "South", "North", "East", "South"],
"revenue": [150.00, 250.00, 175.00, 320.00, 280.00],
"quantity": [2, 1, 3, 1, 2],
})
@pytest.fixture
def messy_orders():
"""Orders with common real-world data quality issues."""
return pd.DataFrame({
"Order ID": [1001, 1002, 1002, 1003, 1004],
" Date ": ["2026-03-01", "2026-03-02", "2026-03-02", "invalid", "2026-03-04"],
"Customer_ID": [101, 102, 102, 103, 104],
"REVENUE": ["150.00", "N/A", "250.00", "300", "not_a_number"],
"Region": ["north", " South ", "SOUTH", "East", ""],
})
@pytest.fixture
def empty_dataframe():
"""Empty DataFrame with expected schema."""
return pd.DataFrame(columns=["order_id", "date", "customer_id", "revenue", "region"])
@pytest.fixture
def large_orders():
"""Large dataset for performance testing."""
n = 10000
return pd.DataFrame({
"order_id": range(n),
"date": pd.date_range("2025-01-01", periods=n, freq="h"),
"customer_id": [i % 100 for i in range(n)],
"revenue": [round(50 + (i * 7.3) % 500, 2) for i in range(n)],
"region": ["North", "South", "East", "West"] * (n // 4),
})
# Using Fixtures in Tests
# tests/test_pipeline_integration.py
import pandas as pd
from pipeline.transform import (
standardise_columns,
clean_revenue,
remove_duplicates,
calculate_summary,
)
def test_full_transform_chain(messy_orders):
"""Test the complete transformation pipeline on messy data."""
# Step 1: Standardise
df = standardise_columns(messy_orders)
assert "order_id" in df.columns
assert "revenue" in df.columns
# Step 2: Clean revenue
df = clean_revenue(df)
valid_revenue = df["revenue"].dropna()
assert len(valid_revenue) >= 2 # Some should survive
# Step 3: Deduplicate
df, removed = remove_duplicates(df, ["order_id"])
assert removed >= 1 # Order 1002 is duplicated
# Step 4: Summarise
df_valid = df.dropna(subset=["revenue"])
if len(df_valid) > 0:
result = calculate_summary(df_valid, "region", "revenue")
assert "total" in result.columns
assert result["total"].sum() > 0
def test_handles_empty_input(empty_dataframe):
"""Pipeline should handle empty DataFrames without crashing."""
df = standardise_columns(empty_dataframe)
assert len(df) == 0
assert len(df.columns) == 5
def test_large_dataset_performance(large_orders):
"""Pipeline should handle 10k rows without issues."""
df = standardise_columns(large_orders)
df = clean_revenue(df)
df, removed = remove_duplicates(df, ["order_id"])
result = calculate_summary(df, "region", "revenue")
assert len(result) == 4 # 4 regions
assert result["count"].sum() == 10000
# Step 4: Mock External Dependencies
APIs, databases, and file systems should not be called during unit tests. Mock them.
# tests/test_extract.py
import pytest
import pandas as pd
from unittest.mock import patch, MagicMock
# The extraction function we are testing
def fetch_orders_from_api(base_url, api_key):
"""Fetch orders from REST API."""
import requests
response = requests.get(
f"{base_url}/orders",
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
response.raise_for_status()
return pd.DataFrame(response.json()["results"])
class TestFetchOrders:
@patch("requests.get")
def test_successful_fetch(self, mock_get):
"""Test successful API response."""
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [
{"id": 1, "amount": 100, "date": "2026-03-01"},
{"id": 2, "amount": 200, "date": "2026-03-02"},
]
}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
result = fetch_orders_from_api("https://api.example.com", "test-key")
assert len(result) == 2
assert "amount" in result.columns
mock_get.assert_called_once()
@patch("requests.get")
def test_api_timeout(self, mock_get):
"""Test handling of API timeout."""
import requests
mock_get.side_effect = requests.exceptions.Timeout("Connection timed out")
with pytest.raises(requests.exceptions.Timeout):
fetch_orders_from_api("https://api.example.com", "test-key")
@patch("requests.get")
def test_empty_response(self, mock_get):
"""Test handling of empty API response."""
mock_response = MagicMock()
mock_response.json.return_value = {"results": []}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
result = fetch_orders_from_api("https://api.example.com", "test-key")
assert len(result) == 0
# Mocking File System
# tests/test_file_extract.py
import pytest
import pandas as pd
from unittest.mock import patch
import os
def load_source_file(filepath):
"""Load data from Excel or CSV based on extension."""
if not os.path.exists(filepath):
raise FileNotFoundError(f"Source file not found: {filepath}")
if filepath.endswith(".xlsx"):
return pd.read_excel(filepath)
elif filepath.endswith(".csv"):
return pd.read_csv(filepath)
else:
raise ValueError(f"Unsupported file type: {filepath}")
class TestLoadSourceFile:
@patch("os.path.exists", return_value=False)
def test_missing_file_raises_error(self, mock_exists):
with pytest.raises(FileNotFoundError, match="Source file not found"):
load_source_file("data/missing.xlsx")
@patch("pandas.read_excel")
@patch("os.path.exists", return_value=True)
def test_loads_excel(self, mock_exists, mock_read):
mock_read.return_value = pd.DataFrame({"id": [1, 2]})
result = load_source_file("data/orders.xlsx")
assert len(result) == 2
def test_unsupported_format(self, tmp_path):
bad_file = tmp_path / "data.json"
bad_file.write_text("{}")
with pytest.raises(ValueError, match="Unsupported file type"):
load_source_file(str(bad_file))
# Step 5: Snapshot Testing for Report Output
Verify that your pipeline output doesn't change unexpectedly:
# tests/test_report_output.py
import pytest
import pandas as pd
import json
import os
SNAPSHOT_DIR = "tests/snapshots"
def save_snapshot(name, df):
"""Save a DataFrame snapshot for comparison."""
os.makedirs(SNAPSHOT_DIR, exist_ok=True)
path = os.path.join(SNAPSHOT_DIR, f"{name}.json")
snapshot = {
"columns": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"shape": list(df.shape),
"sample": df.head(5).to_dict(orient="records"),
}
with open(path, "w") as f:
json.dump(snapshot, f, indent=2, default=str)
def load_snapshot(name):
"""Load a saved snapshot."""
path = os.path.join(SNAPSHOT_DIR, f"{name}.json")
if not os.path.exists(path):
return None
with open(path, "r") as f:
return json.load(f)
def test_report_output_matches_snapshot(sample_orders):
"""Verify report output matches expected structure."""
from pipeline.transform import calculate_summary
result = calculate_summary(sample_orders, "region", "revenue")
# Check structure
assert list(result.columns) == ["region", "total", "count", "average"]
assert len(result) > 0
# Check types
assert result["total"].dtype in [float, "float64"]
assert result["count"].dtype in [int, "int64"]
# Check values are reasonable
assert result["total"].sum() == sample_orders["revenue"].sum()
assert result["count"].sum() == len(sample_orders)
# Step 6: Test Configuration
# pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks integration tests
addopts = -v --tb=short
# Running Tests
# Run all tests
pytest
# Run with coverage report
pytest --cov=pipeline --cov-report=term-missing
# Run only unit tests (fast)
pytest tests/test_transform.py
# Run only integration tests
pytest -m integration
# Skip slow tests
pytest -m "not slow"
# Sample Coverage Output
---------- coverage: platform linux, python 3.11 ----------
Name Stmts Miss Cover Missing
---------------------------------------------------------
pipeline/transform.py 45 2 96% 72-73
pipeline/extract.py 38 5 87% 45-49
pipeline/load.py 22 3 86% 31-33
---------------------------------------------------------
TOTAL 105 10 90%
# Step 7: CI Integration
# GitHub Actions
# .github/workflows/test-pipeline.yml
name: Test Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: pytest --cov=pipeline --cov-report=xml
- name: Check coverage threshold
run: |
coverage=$(python -c "import xml.etree.ElementTree as ET; print(float(ET.parse('coverage.xml').getroot().attrib['line-rate'])*100)")
echo "Coverage: ${coverage}%"
python -c "assert $coverage >= 80, f'Coverage {$coverage}% below 80% threshold'"
# Common Pipeline Test Patterns
| What to test | How to test it | Why it matters |
|---|---|---|
| Column names after transform | assert list(df.columns) == expected |
Schema changes break downstream |
| Row count after filtering | assert len(result) == expected_count |
Silent row drops produce wrong totals |
| Aggregation totals | assert result["total"].sum() == df["value"].sum() |
Groupby errors lose data |
| Null handling | assert result["col"].isna().sum() == 0 |
Nulls propagate through calculations |
| Date parsing | assert result["date"].dtype == "datetime64[ns]" |
String dates break time-series logic |
| Duplicate removal | assert result["id"].is_unique |
Duplicates inflate counts |
| Edge cases | Test with empty DataFrame, single row, all nulls | Prevents crashes on unusual inputs |
# What This Replaces
| Manual process | Automated test equivalent |
|---|---|
| "Run it and check the numbers look right" | Assertion on exact expected values |
| "It worked last time" | CI runs tests on every change |
| "The report looks weird, what changed?" | Snapshot comparison catches drift |
| "Did that refactor break anything?" | Full test suite runs in seconds |
| "Works on my machine" | CI tests in a clean environment |
# Next Steps
Start with unit tests for your transformation functions — they catch the most common bugs with the least effort. Add integration tests for stage-to-stage handoffs. Add CI once you have 10+ tests.
The return on investment is immediate: the first time a test catches a broken column rename or a bad aggregation before it reaches production, the testing setup has paid for itself.
For building the pipelines these tests protect, see How to Design Data Pipelines for Reliable Reporting. For making those pipelines recover from failures automatically, see How to Build Self-Healing Data Pipelines.
Data analytics services include building tested, reliable reporting systems with full CI/CD integration.
Get in touch to discuss adding tests to your data pipelines.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.