Building a Lightweight Data Quality Framework from Scratch

· 10 min read · Data & Dashboards

Build a reusable data quality framework that scores datasets across completeness, accuracy, consistency, and timeliness — with trend tracking and automated alerting on quality degradation.

Building a Lightweight Data Quality Framework from Scratch

Clean data is not a one-time task. Data quality degrades constantly — sources change formats, new edge cases appear, upstream systems introduce bugs. A pipeline that produced perfect output last month can silently deliver garbage today.

The fix is not more cleaning code. It is a quality framework — a reusable system that scores your data against defined standards, tracks quality over time, and alerts when something degrades.

This guide builds a complete data quality framework in Python. By the end, you will have validators, scoring, trend tracking, and alerting that you can plug into any existing pipeline.

Who This Is For

  • Data engineers who have pipelines running but no systematic way to catch bad data
  • Analysts who have been burned by reports built on silently corrupted data
  • Vibe coders who automated a data flow and now need guardrails to keep it trustworthy
  • Team leads who want confidence that dashboards and reports reflect reality

You need basic Python knowledge. The framework uses pandas for data handling — if you can read a CSV into a DataFrame, you can follow this guide.

The Quality Framework Architecture

Data goes in. Quality scores come out. Scores are tracked over time. Degradation triggers alerts. Every pipeline run produces a quality report alongside its business output.

What You Will Need

pip install pandas numpy

No specialist libraries needed. The framework is built from scratch — portable and dependency-light.

The Four Dimensions of Data Quality

DimensionWhat it measuresExample check
CompletenessMissing values, null rates”Revenue column is 98% filled”
AccuracyValues within expected ranges”No negative revenue values”
ConsistencyFormat uniformity, referential integrity”All dates are in YYYY-MM-DD”
TimelinessData freshness, recency”Most recent record is from today”

Step 1: Define Quality Rules

Each rule is a function that checks one aspect of quality and returns a score between 0 and 1.

import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable
import logging

logger = logging.getLogger("quality")


@dataclass
class QualityRule:
    """A single data quality check."""

    name: str
    dimension: str          # completeness, accuracy, consistency, timeliness
    column: str             # which column to check (or "__dataset__" for row-level)
    check: Callable         # function(series_or_df) -> float (0.0 to 1.0)
    weight: float = 1.0     # relative importance
    threshold: float = 0.9  # minimum acceptable score


def completeness_rule(column, threshold=0.95):
    """Check what percentage of values are non-null."""
    return QualityRule(
        name=f"{column}_completeness",
        dimension="completeness",
        column=column,
        check=lambda s: 1 - (s.isna().sum() / len(s)) if len(s) > 0 else 0,
        threshold=threshold,
    )


def range_rule(column, min_val=None, max_val=None, threshold=0.95):
    """Check what percentage of values fall within an expected range."""
    def check(s):
        s = pd.to_numeric(s, errors="coerce").dropna()
        if len(s) == 0:
            return 0
        in_range = s.copy()
        if min_val is not None:
            in_range = in_range[in_range >= min_val]
        if max_val is not None:
            in_range = in_range[in_range <= max_val]
        return len(in_range) / len(s)

    return QualityRule(
        name=f"{column}_range",
        dimension="accuracy",
        column=column,
        check=check,
        threshold=threshold,
    )


def uniqueness_rule(column, threshold=0.99):
    """Check what percentage of values are unique."""
    return QualityRule(
        name=f"{column}_uniqueness",
        dimension="consistency",
        column=column,
        check=lambda s: s.nunique() / len(s) if len(s) > 0 else 0,
        threshold=threshold,
    )


def format_rule(column, pattern, threshold=0.95):
    """Check what percentage of values match a regex pattern."""
    import re
    compiled = re.compile(pattern)

    def check(s):
        s = s.dropna().astype(str)
        if len(s) == 0:
            return 0
        matches = s.apply(lambda x: bool(compiled.match(x)))
        return matches.sum() / len(s)

    return QualityRule(
        name=f"{column}_format",
        dimension="consistency",
        column=column,
        check=check,
        threshold=threshold,
    )


def freshness_rule(column, max_age_hours=24, threshold=0.9):
    """Check if the most recent record is within expected age."""
    def check(s):
        s = pd.to_datetime(s, errors="coerce").dropna()
        if len(s) == 0:
            return 0
        most_recent = s.max()
        age = datetime.now() - most_recent
        max_age = timedelta(hours=max_age_hours)
        if age <= max_age:
            return 1.0
        elif age <= max_age * 2:
            return 0.5
        else:
            return 0.0

    return QualityRule(
        name=f"{column}_freshness",
        dimension="timeliness",
        column=column,
        check=check,
        threshold=threshold,
    )


def referential_rule(column, valid_values, threshold=0.95):
    """Check what percentage of values are in a valid set."""
    def check(s):
        s = s.dropna()
        if len(s) == 0:
            return 0
        valid = s.isin(valid_values)
        return valid.sum() / len(s)

    return QualityRule(
        name=f"{column}_referential",
        dimension="consistency",
        column=column,
        check=check,
        threshold=threshold,
    )

Step 2: The Quality Engine

Run all rules against a dataset and produce scores.

@dataclass
class QualityResult:
    """Result of a single quality check."""

    rule_name: str
    dimension: str
    column: str
    score: float
    threshold: float
    passed: bool
    details: str = ""


class QualityEngine:
    """Run quality rules against a dataset and produce scores."""

    def __init__(self, rules):
        self.rules = rules

    def evaluate(self, df):
        """Evaluate all rules against a DataFrame."""
        results = []

        for rule in self.rules:
            try:
                if rule.column == "__dataset__":
                    score = rule.check(df)
                elif rule.column in df.columns:
                    score = rule.check(df[rule.column])
                else:
                    results.append(QualityResult(
                        rule_name=rule.name,
                        dimension=rule.dimension,
                        column=rule.column,
                        score=0.0,
                        threshold=rule.threshold,
                        passed=False,
                        details=f"Column '{rule.column}' not found",
                    ))
                    continue

                score = round(float(score), 4)
                passed = score >= rule.threshold

                results.append(QualityResult(
                    rule_name=rule.name,
                    dimension=rule.dimension,
                    column=rule.column,
                    score=score,
                    threshold=rule.threshold,
                    passed=passed,
                    details=f"{'PASS' if passed else 'FAIL'}: {score:.1%} (threshold: {rule.threshold:.1%})",
                ))

                if not passed:
                    logger.warning(f"Quality check FAILED: {rule.name}{score:.1%} < {rule.threshold:.1%}")

            except Exception as e:
                results.append(QualityResult(
                    rule_name=rule.name,
                    dimension=rule.dimension,
                    column=rule.column,
                    score=0.0,
                    threshold=rule.threshold,
                    passed=False,
                    details=f"Error: {e}",
                ))
                logger.error(f"Quality check error: {rule.name}{e}")

        return results

    def calculate_overall_score(self, results):
        """Calculate weighted overall quality score."""
        if not results:
            return 0

        rule_map = {r.name: r for r in self.rules}
        weighted_sum = 0
        total_weight = 0

        for result in results:
            rule = rule_map.get(result.rule_name)
            weight = rule.weight if rule else 1.0
            weighted_sum += result.score * weight
            total_weight += weight

        return round(weighted_sum / total_weight * 100, 1) if total_weight > 0 else 0

    def generate_report(self, df):
        """Generate a complete quality report."""
        results = self.evaluate(df)
        overall = self.calculate_overall_score(results)

        # Group by dimension
        dimension_scores = {}
        for result in results:
            if result.dimension not in dimension_scores:
                dimension_scores[result.dimension] = []
            dimension_scores[result.dimension].append(result.score)

        dimension_averages = {
            dim: round(np.mean(scores) * 100, 1)
            for dim, scores in dimension_scores.items()
        }

        failed = [r for r in results if not r.passed]

        report = {
            "overall_score": overall,
            "dimension_scores": dimension_averages,
            "total_checks": len(results),
            "passed": len(results) - len(failed),
            "failed": len(failed),
            "failed_checks": [
                {
                    "rule": r.rule_name,
                    "dimension": r.dimension,
                    "score": f"{r.score:.1%}",
                    "threshold": f"{r.threshold:.1%}",
                }
                for r in failed
            ],
            "details": results,
            "timestamp": datetime.now().isoformat(),
            "rows": len(df),
            "columns": len(df.columns),
        }

        logger.info(
            f"Quality report: {overall}/100 overall | "
            f"{len(results) - len(failed)}/{len(results)} checks passed"
        )

        return report

Step 3: Define Rules for Your Dataset

# Define quality rules for a sales dataset
sales_quality_rules = [
    # Completeness
    completeness_rule("order_id", threshold=1.0),     # Must never be null
    completeness_rule("revenue", threshold=0.98),
    completeness_rule("customer_id", threshold=0.95),
    completeness_rule("region", threshold=0.90),

    # Accuracy
    range_rule("revenue", min_val=0, max_val=100000),  # No negative or extreme values
    range_rule("quantity", min_val=1, max_val=1000),

    # Consistency
    uniqueness_rule("order_id", threshold=0.99),       # Should be mostly unique
    format_rule("email", r'^[\w.+-]+@[\w-]+\.[\w.]+$', threshold=0.90),
    referential_rule("region", ["North", "South", "East", "West"], threshold=0.95),

    # Timeliness
    freshness_rule("created_at", max_age_hours=24),
]

# Create engine
engine = QualityEngine(sales_quality_rules)

Running a Quality Check

# Load your data
df = pd.read_csv("sales_data.csv")

# Generate report
report = engine.generate_report(df)

# Display results
print(f"\nOverall Quality Score: {report['overall_score']}/100")
print(f"Checks: {report['passed']}/{report['total_checks']} passed")

print("\nDimension Scores:")
for dim, score in report["dimension_scores"].items():
    print(f"  {dim}: {score}/100")

if report["failed_checks"]:
    print("\nFailed Checks:")
    for check in report["failed_checks"]:
        print(f"  FAIL {check['rule']}: {check['score']} (threshold: {check['threshold']})")

Example Output

Overall Quality Score: 87.3/100
Checks: 8/11 passed

Dimension Scores:
  completeness: 95.2/100
  accuracy: 92.1/100
  consistency: 78.5/100
  timeliness: 100.0/100

Failed Checks:
  FAIL email_format: 82.3% (threshold: 90.0%)
  FAIL region_referential: 88.7% (threshold: 95.0%)
  FAIL order_id_uniqueness: 96.2% (threshold: 99.0%)

Step 4: Track Quality Over Time

Quality scores are most valuable as trends. A score of 87 means nothing in isolation — but a drop from 95 to 87 in two weeks signals a problem.

import sqlite3
import json

class QualityTracker:
    """Track quality scores over time for trend analysis."""

    def __init__(self, db_path="quality_history.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """Create tracking tables."""
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS quality_runs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                dataset_name TEXT NOT NULL,
                run_timestamp TEXT NOT NULL,
                overall_score REAL NOT NULL,
                dimension_scores TEXT NOT NULL,
                total_checks INTEGER NOT NULL,
                passed_checks INTEGER NOT NULL,
                failed_checks TEXT,
                row_count INTEGER,
                column_count INTEGER
            )
        """)
        conn.commit()
        conn.close()

    def record(self, dataset_name, report):
        """Record a quality report for trend tracking."""
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """INSERT INTO quality_runs
               (dataset_name, run_timestamp, overall_score, dimension_scores,
                total_checks, passed_checks, failed_checks, row_count, column_count)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                dataset_name,
                report["timestamp"],
                report["overall_score"],
                json.dumps(report["dimension_scores"]),
                report["total_checks"],
                report["passed"],
                json.dumps(report["failed_checks"]),
                report["rows"],
                report["columns"],
            ),
        )
        conn.commit()
        conn.close()
        logger.info(f"Quality recorded: {dataset_name} = {report['overall_score']}/100")

    def get_trend(self, dataset_name, days=30):
        """Get quality score trend for a dataset."""
        conn = sqlite3.connect(self.db_path)
        since = (datetime.now() - timedelta(days=days)).isoformat()

        df = pd.read_sql(
            """SELECT run_timestamp, overall_score, dimension_scores,
                      passed_checks, total_checks, row_count
               FROM quality_runs
               WHERE dataset_name = ? AND run_timestamp >= ?
               ORDER BY run_timestamp""",
            conn,
            params=(dataset_name, since),
        )
        conn.close()

        if not df.empty:
            df["run_timestamp"] = pd.to_datetime(df["run_timestamp"])

        return df

    def detect_degradation(self, dataset_name, window=7, threshold_drop=5.0):
        """Detect if quality has dropped significantly."""
        trend = self.get_trend(dataset_name, days=window * 2)

        if len(trend) < 2:
            return None  # Not enough data

        recent_avg = trend.tail(window)["overall_score"].mean()
        previous_avg = trend.head(window)["overall_score"].mean()
        change = recent_avg - previous_avg

        result = {
            "dataset": dataset_name,
            "recent_average": round(recent_avg, 1),
            "previous_average": round(previous_avg, 1),
            "change": round(change, 1),
            "degraded": change < -threshold_drop,
        }

        if result["degraded"]:
            logger.warning(
                f"Quality DEGRADATION detected: {dataset_name} "
                f"dropped {abs(change):.1f} points "
                f"({previous_avg:.1f}{recent_avg:.1f})"
            )

        return result

Step 5: Quality-Gated Pipelines

Stop the pipeline if data quality is below acceptable levels.

class QualityGate:
    """Gate pipeline stages based on quality scores."""

    def __init__(self, engine, tracker, dataset_name):
        self.engine = engine
        self.tracker = tracker
        self.dataset_name = dataset_name

    def check(self, df, min_overall=80, fail_on_critical=True):
        """Run quality check and decide if pipeline should continue.

        Returns:
            (bool, dict) — (should_continue, quality_report)
        """
        report = self.engine.generate_report(df)
        self.tracker.record(self.dataset_name, report)

        # Check overall score
        if report["overall_score"] < min_overall:
            logger.error(
                f"Quality gate FAILED: {report['overall_score']}/100 "
                f"(minimum: {min_overall})"
            )
            return False, report

        # Check for critical failures (completeness of required fields)
        if fail_on_critical:
            critical_failures = [
                f for f in report["failed_checks"]
                if "completeness" in f["rule"] and f["rule"].endswith("_completeness")
            ]
            if critical_failures:
                logger.error(
                    f"Quality gate FAILED: critical completeness checks failed — "
                    f"{[f['rule'] for f in critical_failures]}"
                )
                return False, report

        # Check for degradation
        degradation = self.tracker.detect_degradation(self.dataset_name)
        if degradation and degradation["degraded"]:
            logger.warning(
                f"Quality degradation detected but passing: "
                f"{degradation['change']:+.1f} points"
            )

        logger.info(f"Quality gate PASSED: {report['overall_score']}/100")
        return True, report

Using Quality Gates in a Pipeline

def run_quality_gated_pipeline():
    """Pipeline that stops if data quality is insufficient."""
    engine = QualityEngine(sales_quality_rules)
    tracker = QualityTracker()
    gate = QualityGate(engine, tracker, "daily_sales")

    # Extract
    df = extract_sales_data()
    logger.info(f"Extracted {len(df)} rows")

    # Quality gate — stop if quality is too low
    should_continue, report = gate.check(df, min_overall=80)
    if not should_continue:
        logger.error("Pipeline stopped: data quality below threshold")
        send_quality_alert(report)
        return {"status": "blocked", "quality_score": report["overall_score"]}

    # Transform (only if quality passed)
    transformed = transform_data(df)

    # Load
    load_to_database(transformed)

    return {
        "status": "success",
        "rows": len(transformed),
        "quality_score": report["overall_score"],
    }

Step 6: Quality Reporting

Generate a quality summary for stakeholders.

def generate_quality_summary(tracker, dataset_name, days=30):
    """Generate a human-readable quality summary."""
    trend = tracker.get_trend(dataset_name, days=days)

    if trend.empty:
        return "No quality data available."

    current = trend.iloc[-1]
    avg = trend["overall_score"].mean()
    min_score = trend["overall_score"].min()
    max_score = trend["overall_score"].max()

    lines = [
        f"Quality Summary: {dataset_name}",
        f"   Period: last {days} days ({len(trend)} runs)",
        f"   Current score: {current['overall_score']}/100",
        f"   Average: {avg:.1f}/100",
        f"   Range: {min_score:.1f}{max_score:.1f}",
        f"   Latest row count: {current['row_count']:,}",
    ]

    # Parse dimension scores from latest run
    dim_scores = json.loads(current["dimension_scores"])
    lines.append("\n   Dimension breakdown:")
    for dim, score in dim_scores.items():
        bar = "" * int(score / 5) + "" * (20 - int(score / 5))
        lines.append(f"   {dim:15s} {bar} {score}/100")

    # Check degradation
    degradation = tracker.detect_degradation(dataset_name)
    if degradation:
        if degradation["degraded"]:
            lines.append(f"\n   WARNING -- DEGRADATION: {degradation['change']:+.1f} points over last week")
        else:
            lines.append(f"\n   OK -- Trend: {degradation['change']:+.1f} points (stable)")

    return "\n".join(lines)

Sample Quality Summary

Quality Summary: daily_sales
   Period: last 30 days (28 runs)
   Current score: 87.3/100
   Average: 91.2/100
   Range: 85.1 — 97.4

   Dimension breakdown:
   completeness    ███████████████████░ 95.2/100
   accuracy        ██████████████████░░ 92.1/100
   consistency     ███████████████░░░░░ 78.5/100
   timeliness      ████████████████████ 100.0/100

   WARNING -- DEGRADATION: -4.2 points over last week

What This Replaces

Ad-hoc approachQuality framework equivalent
”The numbers look off”Automated quality score with thresholds
Manual spot-checkingSystematic rule evaluation
No quality historyTrend tracking with degradation detection
Quality issues found in reportsQuality gates stop bad data before loading
”Is our data getting worse?”Time-series quality scores with alerting
Different checks per pipelineReusable rule library

Quick-Start Rule Sets

Dataset typeKey rules
Sales/OrdersRevenue ≥ 0, order_id unique, date freshness, customer_id completeness
CustomerEmail format, phone format, country in valid list, no exact duplicates
ProductPrice > 0, category in valid set, name completeness, SKU uniqueness
Time seriesNo gaps in dates, values within historical range, freshness
Survey/FormRequired fields complete, ratings in valid range, no spam patterns

Next Steps

Start with completeness and accuracy rules — they catch the most impactful issues. Add consistency and timeliness rules as your framework matures. The quality gate pattern is particularly valuable: it prevents bad data from ever reaching your reports.

The trend tracking pays off over weeks. When you can see quality scores declining gradually, you can investigate and fix the root cause before it becomes an incident.

For cleaning the data when quality checks fail, see How to Clean Messy Excel Data Using Python. For building the pipelines that these quality checks protect, see How to Design Data Pipelines for Reliable Reporting.

Data analytics services include building data quality monitoring frameworks for production reporting systems.

Get in touch to discuss implementing data quality monitoring for your pipelines.

data quality framework python data quality scoring data completeness check data validation framework data quality monitoring python data quality data quality metrics data quality trends automated data validation data quality dashboard

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles