How to Build a CI/CD Pipeline for Data Workflows

·8 min read·Automation

Ship data pipeline changes with confidence using automated testing, linting, and deployment. Covers GitHub Actions, data validation gates, and rollback strategies.

How to Build a CI/CD Pipeline for Data WorkflowsAI Generated Image

Data pipelines break in production for the same reason application code does — untested changes pushed directly to the main branch. But unlike web applications, data pipelines have an extra failure mode: the data itself can change shape, volume, or quality without any code change at all.

A proper CI/CD pipeline for data workflows tests both the code and the data assumptions before anything reaches production. This guide builds that pipeline using GitHub Actions, pytest, and data validation gates.

# Who This Is For

  • Data engineers who deploy pipeline changes manually and have been burned by untested updates
  • Developers familiar with CI/CD for web apps but unsure how to apply it to data workflows
  • Vibe coders whose data scripts have grown into something that needs proper release discipline
  • Teams where multiple people contribute to the same pipeline codebase

You should know Python and Git basics. Familiarity with GitHub is helpful. No prior CI/CD experience needed — this guide builds the entire workflow step by step.

# Architecture Overview

flowchart LR
  D["Developer\nPush to branch"] --> PR["Pull Request"]
  PR --> L["Lint & Format\n(ruff, black)"]
  L --> T["Unit Tests\n(pytest)"]
  T --> I["Integration Tests\n(sample data)"]
  I --> V["Data Validation\n(schema checks)"]
  V --> M["Merge to main"]
  M --> S["Stage Deploy\n(dry run)"]
  S --> P["Production Deploy"]
  P --> H["Health Check\n& Monitoring"]

Every push triggers linting and unit tests. Merges to main run integration tests with sample data and deploy through a staging gate before production.

# What You Will Need

bash
pip install pytest ruff great-expectations pydantic
  • pytest — test runner for pipeline logic
  • ruff — fast Python linter and formatter
  • great-expectations — data validation framework
  • pydantic — schema validation for pipeline configs

# Step 1: Project Structure for Testable Pipelines

Organise your pipeline code so it is testable:

text
data-pipeline/
├── .github/
│   └── workflows/
│       ├── ci.yml              # runs on every push/PR
│       └── deploy.yml          # runs on merge to main
├── pipelines/
│   ├── __init__.py
│   ├── extract.py              # extraction functions
│   ├── transform.py            # transformation logic
│   └── load.py                 # warehouse loading
├── tests/
│   ├── __init__.py
│   ├── conftest.py             # shared fixtures
│   ├── test_extract.py
│   ├── test_transform.py
│   └── test_load.py
├── fixtures/
│   └── sample_orders.csv       # test data
├── validations/
│   └── schema_checks.py        # data validation rules
├── pyproject.toml
└── Makefile

# The Makefile

makefile
.PHONY: lint test validate deploy

lint:
	ruff check pipelines/ tests/
	ruff format --check pipelines/ tests/

test:
	pytest tests/ -v --tb=short

test-integration:
	pytest tests/ -v --tb=short -m integration

validate:
	python validations/schema_checks.py

deploy-staging:
	python -m pipelines.deploy --env staging --dry-run

deploy-production:
	python -m pipelines.deploy --env production

# Step 2: Unit Tests for Transform Logic

Test transformation functions in isolation — no databases, no APIs:

python
# tests/test_transform.py
import pytest
import pandas as pd
from pipelines.transform import (
    clean_order_data,
    calculate_revenue_metrics,
    flag_anomalies,
)

@pytest.fixture
def sample_orders():
    """Create a DataFrame matching the expected extraction schema."""
    return pd.DataFrame({
        "order_id": ["ORD001", "ORD002", "ORD003", "ORD004"],
        "amount": [49.99, 150.00, -10.00, 3200.00],
        "currency": ["GBP", "GBP", "GBP", "GBP"],
        "status": ["completed", "completed", "refunded", "completed"],
        "created_at": pd.to_datetime([
            "2026-05-01", "2026-05-01", "2026-05-02", "2026-05-02"
        ]),
    })

def test_clean_removes_negative_amounts(sample_orders):
    cleaned = clean_order_data(sample_orders)
    assert (cleaned["amount"] >= 0).all()

def test_clean_preserves_valid_rows(sample_orders):
    cleaned = clean_order_data(sample_orders)
    assert len(cleaned) == 3  # one negative removed

def test_revenue_metrics_daily_aggregation(sample_orders):
    cleaned = clean_order_data(sample_orders)
    metrics = calculate_revenue_metrics(cleaned, period="daily")
    assert "date" in metrics.columns
    assert "total_revenue" in metrics.columns
    assert "order_count" in metrics.columns

def test_flag_anomalies_catches_high_value(sample_orders):
    flagged = flag_anomalies(sample_orders, threshold=1000.0)
    anomalies = flagged[flagged["is_anomaly"]]
    assert len(anomalies) == 1
    assert anomalies.iloc[0]["order_id"] == "ORD004"

# Tests for Edge Cases

python
def test_clean_handles_empty_dataframe():
    empty = pd.DataFrame(columns=["order_id", "amount", "currency", "status", "created_at"])
    cleaned = clean_order_data(empty)
    assert len(cleaned) == 0
    assert list(cleaned.columns) == list(empty.columns)

def test_clean_handles_null_amounts():
    df = pd.DataFrame({
        "order_id": ["ORD001"],
        "amount": [None],
        "currency": ["GBP"],
        "status": ["completed"],
        "created_at": [pd.Timestamp("2026-05-01")],
    })
    cleaned = clean_order_data(df)
    assert len(cleaned) == 0  # null amounts dropped

# Step 3: Integration Tests with Sample Data

Mark tests that need external resources and run them separately:

python
# tests/test_extract.py
import pytest
from pipelines.extract import extract_orders_from_api

@pytest.mark.integration
def test_api_extraction_returns_expected_schema():
    """Test against the staging API with a date range known to have data."""
    df = extract_orders_from_api(
        start_date="2026-04-01",
        end_date="2026-04-02",
        api_url="https://staging-api.example.com",
    )

    expected_columns = {"order_id", "amount", "currency", "status", "created_at"}
    assert expected_columns.issubset(set(df.columns))
    assert len(df) > 0
    assert df["amount"].dtype in ["float64", "int64"]
python
# tests/conftest.py
import pytest
import pandas as pd
from pathlib import Path

@pytest.fixture
def sample_data():
    """Load the shared test fixture."""
    fixture_path = Path(__file__).parent.parent / "fixtures" / "sample_orders.csv"
    return pd.read_csv(fixture_path, parse_dates=["created_at"])

def pytest_configure(config):
    """Register custom markers."""
    config.addinivalue_line("markers", "integration: marks integration tests (deselect with '-m not integration')")

# Step 4: Data Validation Gates

Validate data schema and quality before loading to the warehouse:

python
# validations/schema_checks.py
from pydantic import BaseModel, field_validator
from typing import Optional
import pandas as pd

class OrderSchema(BaseModel):
    """Expected schema for orders after transformation."""
    order_id: str
    amount: float
    currency: str
    status: str
    created_at: str

    @field_validator("amount")
    @classmethod
    def amount_must_be_positive(cls, v):
        if v < 0:
            raise ValueError(f"Amount must be >= 0, got {v}")
        return v

    @field_validator("currency")
    @classmethod
    def currency_must_be_valid(cls, v):
        valid = {"GBP", "USD", "EUR"}
        if v not in valid:
            raise ValueError(f"Currency must be one of {valid}, got {v}")
        return v

def validate_dataframe(df: pd.DataFrame, schema=OrderSchema) -> dict:
    """Validate every row in a DataFrame against the schema.

    Returns a dict with valid_count, invalid_count, and errors.
    """
    errors = []
    valid_count = 0

    for i, row in df.iterrows():
        try:
            schema(**row.to_dict())
            valid_count += 1
        except Exception as e:
            errors.append({"row": i, "error": str(e)})

    return {
        "valid_count": valid_count,
        "invalid_count": len(errors),
        "error_rate": len(errors) / max(len(df), 1),
        "errors": errors[:10],  # first 10 only
    }

def validation_gate(df: pd.DataFrame, max_error_rate=0.01) -> bool:
    """Pass/fail gate for data quality.

    Fails if more than 1% of rows are invalid.
    """
    results = validate_dataframe(df)
    passed = results["error_rate"] <= max_error_rate

    if not passed:
        print(f"FAIL — Validation: {results['error_rate']:.1%} error rate")
        print(f"   {results['invalid_count']} invalid rows out of {results['valid_count'] + results['invalid_count']}")
        for error in results["errors"]:
            print(f"   Row {error['row']}: {error['error']}")
    else:
        print(f"PASS — Validation: {results['error_rate']:.1%} error rate")

    return passed

# Step 5: GitHub Actions CI Workflow

yaml
# .github/workflows/ci.yml
name: Data Pipeline CI

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - run: pip install ruff
      - run: ruff check pipelines/ tests/
      - run: ruff format --check pipelines/ tests/

  test:
    runs-on: ubuntu-latest
    needs: lint
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - run: pip install -r requirements.txt
      - run: pytest tests/ -v --tb=short -m "not integration"

  integration:
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/main'
    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_DB: test_warehouse
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - run: pip install -r requirements.txt
      - run: pytest tests/ -v --tb=short -m integration
        env:
          DATABASE_URL: postgresql://test:test@localhost:5432/test_warehouse

# Step 6: Deployment Workflow

yaml
# .github/workflows/deploy.yml
name: Deploy Data Pipeline

on:
  push:
    branches: [main]
    paths:
      - "pipelines/**"
      - "validations/**"
      - "requirements.txt"

jobs:
  deploy:
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - run: pip install -r requirements.txt

      # Run full test suite before deploying
      - name: Run tests
        run: pytest tests/ -v --tb=short

      # Dry run against staging
      - name: Staging dry run
        run: python -m pipelines.deploy --env staging --dry-run
        env:
          STAGING_DB_URL: ${{ secrets.STAGING_DB_URL }}

      # Deploy to production
      - name: Deploy to production
        run: python -m pipelines.deploy --env production
        env:
          PRODUCTION_DB_URL: ${{ secrets.PRODUCTION_DB_URL }}

      # Verify deployment
      - name: Health check
        run: python -m pipelines.health_check --env production
        env:
          PRODUCTION_DB_URL: ${{ secrets.PRODUCTION_DB_URL }}

# Step 7: Rollback Strategy

When a deployment causes data issues, you need a fast rollback:

python
# pipelines/deploy.py
import argparse
import subprocess
from datetime import datetime, timezone

class PipelineDeployer:
    """Deploy pipeline changes with rollback capability."""

    def __init__(self, env="staging"):
        self.env = env
        self.deploy_log = []

    def deploy(self, dry_run=False):
        """Deploy the current pipeline version."""
        version = self._get_git_sha()
        timestamp = datetime.now(timezone.utc).isoformat()

        if dry_run:
            print(f"[DRY RUN] Would deploy {version} to {self.env}")
            self._run_validation()
            return

        print(f"Deploying {version} to {self.env} at {timestamp}")

        # Step 1: Validate data before migration
        if not self._run_validation():
            print("FAIL — Pre-deploy validation failed. Aborting.")
            return False

        # Step 2: Apply schema migrations (if any)
        self._apply_migrations()

        # Step 3: Run one cycle with new code
        self._run_smoke_test()

        # Step 4: Log the deployment
        self.deploy_log.append({
            "version": version,
            "env": self.env,
            "timestamp": timestamp,
            "status": "success",
        })

        print(f"OK — Deployed {version} to {self.env}")
        return True

    def rollback(self, target_version=None):
        """Rollback to the previous version."""
        if target_version is None:
            target_version = "HEAD~1"

        print(f"Rolling back to {target_version}")
        subprocess.run(
            ["git", "checkout", target_version, "--", "pipelines/"],
            check=True,
        )
        print(f"OK — Rolled back to {target_version}")

    def _get_git_sha(self):
        result = subprocess.run(
            ["git", "rev-parse", "--short", "HEAD"],
            capture_output=True, text=True,
        )
        return result.stdout.strip()

    def _run_validation(self):
        """Run data validation checks."""
        from validations.schema_checks import validation_gate
        # Load recent data and validate
        return True  # simplified

    def _apply_migrations(self):
        print(f"Applying migrations to {self.env}...")

    def _run_smoke_test(self):
        print(f"Running smoke test on {self.env}...")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--env", default="staging")
    parser.add_argument("--dry-run", action="store_true")
    parser.add_argument("--rollback", action="store_true")
    args = parser.parse_args()

    deployer = PipelineDeployer(env=args.env)

    if args.rollback:
        deployer.rollback()
    else:
        deployer.deploy(dry_run=args.dry_run)

# CI/CD Checklist

Stage What It Checks Fails If
Lint Code style, import order, type hints ruff reports errors
Unit tests Transform logic, edge cases Any assertion fails
Integration tests API contracts, DB operations Schema mismatch or connection errors
Data validation Row-level schema, quality metrics Error rate > 1%
Staging dry run End-to-end with real schemas Validation gate fails
Health check Post-deploy data freshness No new data within expected window

# What This Replaces

Manual Deployment CI/CD Pipeline
SSH into server, git pull, hope it works Automated test → stage → deploy → verify
"It works on my machine" Tests run in identical CI environment
No rollback plan One-command rollback to previous version
Data bugs found by users Data validation gates catch issues pre-deploy
No code review enforcement PR required, tests must pass
Deploy any time, break anything Deploy only tested, validated changes

# Next Steps

CI/CD is the backbone of reliable data operations. Extend it by:

Start with linting and unit tests. Add integration tests when you have staging infrastructure. The validation gate is the single highest-impact addition — it catches the data bugs that code tests miss.

Need help building CI/CD for your data workflows? Get in touch or explore our automation services.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

ci cd data pipelinegithub actions data engineeringdata pipeline deploymentautomated testing data pipelinescontinuous integration pythondata workflow automationpipeline deployment strategygithub actions python testingdata pipeline ci cdautomated data validation