How to Build a CI/CD Pipeline for Data Workflows
Ship data pipeline changes with confidence using automated testing, linting, and deployment. Covers GitHub Actions, data validation gates, and rollback strategies.
AI Generated ImageData pipelines break in production for the same reason application code does — untested changes pushed directly to the main branch. But unlike web applications, data pipelines have an extra failure mode: the data itself can change shape, volume, or quality without any code change at all.
A proper CI/CD pipeline for data workflows tests both the code and the data assumptions before anything reaches production. This guide builds that pipeline using GitHub Actions, pytest, and data validation gates.
# Who This Is For
- Data engineers who deploy pipeline changes manually and have been burned by untested updates
- Developers familiar with CI/CD for web apps but unsure how to apply it to data workflows
- Vibe coders whose data scripts have grown into something that needs proper release discipline
- Teams where multiple people contribute to the same pipeline codebase
You should know Python and Git basics. Familiarity with GitHub is helpful. No prior CI/CD experience needed — this guide builds the entire workflow step by step.
# Architecture Overview
flowchart LR D["Developer\nPush to branch"] --> PR["Pull Request"] PR --> L["Lint & Format\n(ruff, black)"] L --> T["Unit Tests\n(pytest)"] T --> I["Integration Tests\n(sample data)"] I --> V["Data Validation\n(schema checks)"] V --> M["Merge to main"] M --> S["Stage Deploy\n(dry run)"] S --> P["Production Deploy"] P --> H["Health Check\n& Monitoring"]
Every push triggers linting and unit tests. Merges to main run integration tests with sample data and deploy through a staging gate before production.
# What You Will Need
pip install pytest ruff great-expectations pydantic
- pytest — test runner for pipeline logic
- ruff — fast Python linter and formatter
- great-expectations — data validation framework
- pydantic — schema validation for pipeline configs
# Step 1: Project Structure for Testable Pipelines
Organise your pipeline code so it is testable:
data-pipeline/
├── .github/
│ └── workflows/
│ ├── ci.yml # runs on every push/PR
│ └── deploy.yml # runs on merge to main
├── pipelines/
│ ├── __init__.py
│ ├── extract.py # extraction functions
│ ├── transform.py # transformation logic
│ └── load.py # warehouse loading
├── tests/
│ ├── __init__.py
│ ├── conftest.py # shared fixtures
│ ├── test_extract.py
│ ├── test_transform.py
│ └── test_load.py
├── fixtures/
│ └── sample_orders.csv # test data
├── validations/
│ └── schema_checks.py # data validation rules
├── pyproject.toml
└── Makefile
# The Makefile
.PHONY: lint test validate deploy
lint:
ruff check pipelines/ tests/
ruff format --check pipelines/ tests/
test:
pytest tests/ -v --tb=short
test-integration:
pytest tests/ -v --tb=short -m integration
validate:
python validations/schema_checks.py
deploy-staging:
python -m pipelines.deploy --env staging --dry-run
deploy-production:
python -m pipelines.deploy --env production
# Step 2: Unit Tests for Transform Logic
Test transformation functions in isolation — no databases, no APIs:
# tests/test_transform.py
import pytest
import pandas as pd
from pipelines.transform import (
clean_order_data,
calculate_revenue_metrics,
flag_anomalies,
)
@pytest.fixture
def sample_orders():
"""Create a DataFrame matching the expected extraction schema."""
return pd.DataFrame({
"order_id": ["ORD001", "ORD002", "ORD003", "ORD004"],
"amount": [49.99, 150.00, -10.00, 3200.00],
"currency": ["GBP", "GBP", "GBP", "GBP"],
"status": ["completed", "completed", "refunded", "completed"],
"created_at": pd.to_datetime([
"2026-05-01", "2026-05-01", "2026-05-02", "2026-05-02"
]),
})
def test_clean_removes_negative_amounts(sample_orders):
cleaned = clean_order_data(sample_orders)
assert (cleaned["amount"] >= 0).all()
def test_clean_preserves_valid_rows(sample_orders):
cleaned = clean_order_data(sample_orders)
assert len(cleaned) == 3 # one negative removed
def test_revenue_metrics_daily_aggregation(sample_orders):
cleaned = clean_order_data(sample_orders)
metrics = calculate_revenue_metrics(cleaned, period="daily")
assert "date" in metrics.columns
assert "total_revenue" in metrics.columns
assert "order_count" in metrics.columns
def test_flag_anomalies_catches_high_value(sample_orders):
flagged = flag_anomalies(sample_orders, threshold=1000.0)
anomalies = flagged[flagged["is_anomaly"]]
assert len(anomalies) == 1
assert anomalies.iloc[0]["order_id"] == "ORD004"
# Tests for Edge Cases
def test_clean_handles_empty_dataframe():
empty = pd.DataFrame(columns=["order_id", "amount", "currency", "status", "created_at"])
cleaned = clean_order_data(empty)
assert len(cleaned) == 0
assert list(cleaned.columns) == list(empty.columns)
def test_clean_handles_null_amounts():
df = pd.DataFrame({
"order_id": ["ORD001"],
"amount": [None],
"currency": ["GBP"],
"status": ["completed"],
"created_at": [pd.Timestamp("2026-05-01")],
})
cleaned = clean_order_data(df)
assert len(cleaned) == 0 # null amounts dropped
# Step 3: Integration Tests with Sample Data
Mark tests that need external resources and run them separately:
# tests/test_extract.py
import pytest
from pipelines.extract import extract_orders_from_api
@pytest.mark.integration
def test_api_extraction_returns_expected_schema():
"""Test against the staging API with a date range known to have data."""
df = extract_orders_from_api(
start_date="2026-04-01",
end_date="2026-04-02",
api_url="https://staging-api.example.com",
)
expected_columns = {"order_id", "amount", "currency", "status", "created_at"}
assert expected_columns.issubset(set(df.columns))
assert len(df) > 0
assert df["amount"].dtype in ["float64", "int64"]
# tests/conftest.py
import pytest
import pandas as pd
from pathlib import Path
@pytest.fixture
def sample_data():
"""Load the shared test fixture."""
fixture_path = Path(__file__).parent.parent / "fixtures" / "sample_orders.csv"
return pd.read_csv(fixture_path, parse_dates=["created_at"])
def pytest_configure(config):
"""Register custom markers."""
config.addinivalue_line("markers", "integration: marks integration tests (deselect with '-m not integration')")
# Step 4: Data Validation Gates
Validate data schema and quality before loading to the warehouse:
# validations/schema_checks.py
from pydantic import BaseModel, field_validator
from typing import Optional
import pandas as pd
class OrderSchema(BaseModel):
"""Expected schema for orders after transformation."""
order_id: str
amount: float
currency: str
status: str
created_at: str
@field_validator("amount")
@classmethod
def amount_must_be_positive(cls, v):
if v < 0:
raise ValueError(f"Amount must be >= 0, got {v}")
return v
@field_validator("currency")
@classmethod
def currency_must_be_valid(cls, v):
valid = {"GBP", "USD", "EUR"}
if v not in valid:
raise ValueError(f"Currency must be one of {valid}, got {v}")
return v
def validate_dataframe(df: pd.DataFrame, schema=OrderSchema) -> dict:
"""Validate every row in a DataFrame against the schema.
Returns a dict with valid_count, invalid_count, and errors.
"""
errors = []
valid_count = 0
for i, row in df.iterrows():
try:
schema(**row.to_dict())
valid_count += 1
except Exception as e:
errors.append({"row": i, "error": str(e)})
return {
"valid_count": valid_count,
"invalid_count": len(errors),
"error_rate": len(errors) / max(len(df), 1),
"errors": errors[:10], # first 10 only
}
def validation_gate(df: pd.DataFrame, max_error_rate=0.01) -> bool:
"""Pass/fail gate for data quality.
Fails if more than 1% of rows are invalid.
"""
results = validate_dataframe(df)
passed = results["error_rate"] <= max_error_rate
if not passed:
print(f"FAIL — Validation: {results['error_rate']:.1%} error rate")
print(f" {results['invalid_count']} invalid rows out of {results['valid_count'] + results['invalid_count']}")
for error in results["errors"]:
print(f" Row {error['row']}: {error['error']}")
else:
print(f"PASS — Validation: {results['error_rate']:.1%} error rate")
return passed
# Step 5: GitHub Actions CI Workflow
# .github/workflows/ci.yml
name: Data Pipeline CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install ruff
- run: ruff check pipelines/ tests/
- run: ruff format --check pipelines/ tests/
test:
runs-on: ubuntu-latest
needs: lint
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements.txt
- run: pytest tests/ -v --tb=short -m "not integration"
integration:
runs-on: ubuntu-latest
needs: test
if: github.ref == 'refs/heads/main'
services:
postgres:
image: postgres:16
env:
POSTGRES_DB: test_warehouse
POSTGRES_USER: test
POSTGRES_PASSWORD: test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements.txt
- run: pytest tests/ -v --tb=short -m integration
env:
DATABASE_URL: postgresql://test:test@localhost:5432/test_warehouse
# Step 6: Deployment Workflow
# .github/workflows/deploy.yml
name: Deploy Data Pipeline
on:
push:
branches: [main]
paths:
- "pipelines/**"
- "validations/**"
- "requirements.txt"
jobs:
deploy:
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements.txt
# Run full test suite before deploying
- name: Run tests
run: pytest tests/ -v --tb=short
# Dry run against staging
- name: Staging dry run
run: python -m pipelines.deploy --env staging --dry-run
env:
STAGING_DB_URL: ${{ secrets.STAGING_DB_URL }}
# Deploy to production
- name: Deploy to production
run: python -m pipelines.deploy --env production
env:
PRODUCTION_DB_URL: ${{ secrets.PRODUCTION_DB_URL }}
# Verify deployment
- name: Health check
run: python -m pipelines.health_check --env production
env:
PRODUCTION_DB_URL: ${{ secrets.PRODUCTION_DB_URL }}
# Step 7: Rollback Strategy
When a deployment causes data issues, you need a fast rollback:
# pipelines/deploy.py
import argparse
import subprocess
from datetime import datetime, timezone
class PipelineDeployer:
"""Deploy pipeline changes with rollback capability."""
def __init__(self, env="staging"):
self.env = env
self.deploy_log = []
def deploy(self, dry_run=False):
"""Deploy the current pipeline version."""
version = self._get_git_sha()
timestamp = datetime.now(timezone.utc).isoformat()
if dry_run:
print(f"[DRY RUN] Would deploy {version} to {self.env}")
self._run_validation()
return
print(f"Deploying {version} to {self.env} at {timestamp}")
# Step 1: Validate data before migration
if not self._run_validation():
print("FAIL — Pre-deploy validation failed. Aborting.")
return False
# Step 2: Apply schema migrations (if any)
self._apply_migrations()
# Step 3: Run one cycle with new code
self._run_smoke_test()
# Step 4: Log the deployment
self.deploy_log.append({
"version": version,
"env": self.env,
"timestamp": timestamp,
"status": "success",
})
print(f"OK — Deployed {version} to {self.env}")
return True
def rollback(self, target_version=None):
"""Rollback to the previous version."""
if target_version is None:
target_version = "HEAD~1"
print(f"Rolling back to {target_version}")
subprocess.run(
["git", "checkout", target_version, "--", "pipelines/"],
check=True,
)
print(f"OK — Rolled back to {target_version}")
def _get_git_sha(self):
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
capture_output=True, text=True,
)
return result.stdout.strip()
def _run_validation(self):
"""Run data validation checks."""
from validations.schema_checks import validation_gate
# Load recent data and validate
return True # simplified
def _apply_migrations(self):
print(f"Applying migrations to {self.env}...")
def _run_smoke_test(self):
print(f"Running smoke test on {self.env}...")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--env", default="staging")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--rollback", action="store_true")
args = parser.parse_args()
deployer = PipelineDeployer(env=args.env)
if args.rollback:
deployer.rollback()
else:
deployer.deploy(dry_run=args.dry_run)
# CI/CD Checklist
| Stage | What It Checks | Fails If |
|---|---|---|
| Lint | Code style, import order, type hints | ruff reports errors |
| Unit tests | Transform logic, edge cases | Any assertion fails |
| Integration tests | API contracts, DB operations | Schema mismatch or connection errors |
| Data validation | Row-level schema, quality metrics | Error rate > 1% |
| Staging dry run | End-to-end with real schemas | Validation gate fails |
| Health check | Post-deploy data freshness | No new data within expected window |
# What This Replaces
| Manual Deployment | CI/CD Pipeline |
|---|---|
SSH into server, git pull, hope it works |
Automated test → stage → deploy → verify |
| "It works on my machine" | Tests run in identical CI environment |
| No rollback plan | One-command rollback to previous version |
| Data bugs found by users | Data validation gates catch issues pre-deploy |
| No code review enforcement | PR required, tests must pass |
| Deploy any time, break anything | Deploy only tested, validated changes |
# Next Steps
CI/CD is the backbone of reliable data operations. Extend it by:
- Adding pytest fixtures and mock APIs for more thorough test coverage
- Containerising your pipelines for reproducible CI environments
- Setting up structured logging so you can debug failed deployments
- Building data quality frameworks that feed into your validation gates
Start with linting and unit tests. Add integration tests when you have staging infrastructure. The validation gate is the single highest-impact addition — it catches the data bugs that code tests miss.
Need help building CI/CD for your data workflows? Get in touch or explore our automation services.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.