Building a Lightweight Data Quality Framework from Scratch

·10 min read·Data & Dashboards

Build a reusable data quality framework that scores datasets across completeness, accuracy, consistency, and timeliness — with trend tracking and automated alerting on quality degradation.

Building a Lightweight Data Quality Framework from ScratchAI Generated

Clean data is not a one-time task. Data quality degrades constantly — sources change formats, new edge cases appear, upstream systems introduce bugs. A pipeline that produced perfect output last month can silently deliver garbage today.

The fix is not more cleaning code. It is a quality framework — a reusable system that scores your data against defined standards, tracks quality over time, and alerts when something degrades.

This guide builds a complete data quality framework in Python. By the end, you will have validators, scoring, trend tracking, and alerting that you can plug into any existing pipeline.

# Who This Is For

  • Data engineers who have pipelines running but no systematic way to catch bad data
  • Analysts who have been burned by reports built on silently corrupted data
  • Vibe coders who automated a data flow and now need guardrails to keep it trustworthy
  • Team leads who want confidence that dashboards and reports reflect reality

You need basic Python knowledge. The framework uses pandas for data handling — if you can read a CSV into a DataFrame, you can follow this guide.

# The Quality Framework Architecture

flowchart LR
  D["Dataset"] --> V["Validators\n(rules engine)"]
  V --> S["Quality Score\n(0–100)"]
  S --> T["Trend Tracker\n(historical)"]
  T --> A["Alerting\n(if degraded)"]
  S --> R["Quality Report\n(per-column detail)"]

Data goes in. Quality scores come out. Scores are tracked over time. Degradation triggers alerts. Every pipeline run produces a quality report alongside its business output.

# What You Will Need

bash
pip install pandas numpy

No specialist libraries needed. The framework is built from scratch — portable and dependency-light.

# The Four Dimensions of Data Quality

flowchart TD
  DQ["Data Quality"] --> C["Completeness\n(are values present?)"]
  DQ --> AC["Accuracy\n(are values correct?)"]
  DQ --> CO["Consistency\n(are values uniform?)"]
  DQ --> TI["Timeliness\n(is data current?)"]
Dimension What it measures Example check
Completeness Missing values, null rates "Revenue column is 98% filled"
Accuracy Values within expected ranges "No negative revenue values"
Consistency Format uniformity, referential integrity "All dates are in YYYY-MM-DD"
Timeliness Data freshness, recency "Most recent record is from today"

# Step 1: Define Quality Rules

Each rule is a function that checks one aspect of quality and returns a score between 0 and 1.

python
import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable
import logging

logger = logging.getLogger("quality")


@dataclass
class QualityRule:
    """A single data quality check."""

    name: str
    dimension: str          # completeness, accuracy, consistency, timeliness
    column: str             # which column to check (or "__dataset__" for row-level)
    check: Callable         # function(series_or_df) -> float (0.0 to 1.0)
    weight: float = 1.0     # relative importance
    threshold: float = 0.9  # minimum acceptable score


def completeness_rule(column, threshold=0.95):
    """Check what percentage of values are non-null."""
    return QualityRule(
        name=f"{column}_completeness",
        dimension="completeness",
        column=column,
        check=lambda s: 1 - (s.isna().sum() / len(s)) if len(s) > 0 else 0,
        threshold=threshold,
    )


def range_rule(column, min_val=None, max_val=None, threshold=0.95):
    """Check what percentage of values fall within an expected range."""
    def check(s):
        s = pd.to_numeric(s, errors="coerce").dropna()
        if len(s) == 0:
            return 0
        in_range = s.copy()
        if min_val is not None:
            in_range = in_range[in_range >= min_val]
        if max_val is not None:
            in_range = in_range[in_range <= max_val]
        return len(in_range) / len(s)

    return QualityRule(
        name=f"{column}_range",
        dimension="accuracy",
        column=column,
        check=check,
        threshold=threshold,
    )


def uniqueness_rule(column, threshold=0.99):
    """Check what percentage of values are unique."""
    return QualityRule(
        name=f"{column}_uniqueness",
        dimension="consistency",
        column=column,
        check=lambda s: s.nunique() / len(s) if len(s) > 0 else 0,
        threshold=threshold,
    )


def format_rule(column, pattern, threshold=0.95):
    """Check what percentage of values match a regex pattern."""
    import re
    compiled = re.compile(pattern)

    def check(s):
        s = s.dropna().astype(str)
        if len(s) == 0:
            return 0
        matches = s.apply(lambda x: bool(compiled.match(x)))
        return matches.sum() / len(s)

    return QualityRule(
        name=f"{column}_format",
        dimension="consistency",
        column=column,
        check=check,
        threshold=threshold,
    )


def freshness_rule(column, max_age_hours=24, threshold=0.9):
    """Check if the most recent record is within expected age."""
    def check(s):
        s = pd.to_datetime(s, errors="coerce").dropna()
        if len(s) == 0:
            return 0
        most_recent = s.max()
        age = datetime.now() - most_recent
        max_age = timedelta(hours=max_age_hours)
        if age <= max_age:
            return 1.0
        elif age <= max_age * 2:
            return 0.5
        else:
            return 0.0

    return QualityRule(
        name=f"{column}_freshness",
        dimension="timeliness",
        column=column,
        check=check,
        threshold=threshold,
    )


def referential_rule(column, valid_values, threshold=0.95):
    """Check what percentage of values are in a valid set."""
    def check(s):
        s = s.dropna()
        if len(s) == 0:
            return 0
        valid = s.isin(valid_values)
        return valid.sum() / len(s)

    return QualityRule(
        name=f"{column}_referential",
        dimension="consistency",
        column=column,
        check=check,
        threshold=threshold,
    )

# Step 2: The Quality Engine

Run all rules against a dataset and produce scores.

python
@dataclass
class QualityResult:
    """Result of a single quality check."""

    rule_name: str
    dimension: str
    column: str
    score: float
    threshold: float
    passed: bool
    details: str = ""


class QualityEngine:
    """Run quality rules against a dataset and produce scores."""

    def __init__(self, rules):
        self.rules = rules

    def evaluate(self, df):
        """Evaluate all rules against a DataFrame."""
        results = []

        for rule in self.rules:
            try:
                if rule.column == "__dataset__":
                    score = rule.check(df)
                elif rule.column in df.columns:
                    score = rule.check(df[rule.column])
                else:
                    results.append(QualityResult(
                        rule_name=rule.name,
                        dimension=rule.dimension,
                        column=rule.column,
                        score=0.0,
                        threshold=rule.threshold,
                        passed=False,
                        details=f"Column '{rule.column}' not found",
                    ))
                    continue

                score = round(float(score), 4)
                passed = score >= rule.threshold

                results.append(QualityResult(
                    rule_name=rule.name,
                    dimension=rule.dimension,
                    column=rule.column,
                    score=score,
                    threshold=rule.threshold,
                    passed=passed,
                    details=f"{'PASS' if passed else 'FAIL'}: {score:.1%} (threshold: {rule.threshold:.1%})",
                ))

                if not passed:
                    logger.warning(f"Quality check FAILED: {rule.name}{score:.1%} < {rule.threshold:.1%}")

            except Exception as e:
                results.append(QualityResult(
                    rule_name=rule.name,
                    dimension=rule.dimension,
                    column=rule.column,
                    score=0.0,
                    threshold=rule.threshold,
                    passed=False,
                    details=f"Error: {e}",
                ))
                logger.error(f"Quality check error: {rule.name}{e}")

        return results

    def calculate_overall_score(self, results):
        """Calculate weighted overall quality score."""
        if not results:
            return 0

        rule_map = {r.name: r for r in self.rules}
        weighted_sum = 0
        total_weight = 0

        for result in results:
            rule = rule_map.get(result.rule_name)
            weight = rule.weight if rule else 1.0
            weighted_sum += result.score * weight
            total_weight += weight

        return round(weighted_sum / total_weight * 100, 1) if total_weight > 0 else 0

    def generate_report(self, df):
        """Generate a complete quality report."""
        results = self.evaluate(df)
        overall = self.calculate_overall_score(results)

        # Group by dimension
        dimension_scores = {}
        for result in results:
            if result.dimension not in dimension_scores:
                dimension_scores[result.dimension] = []
            dimension_scores[result.dimension].append(result.score)

        dimension_averages = {
            dim: round(np.mean(scores) * 100, 1)
            for dim, scores in dimension_scores.items()
        }

        failed = [r for r in results if not r.passed]

        report = {
            "overall_score": overall,
            "dimension_scores": dimension_averages,
            "total_checks": len(results),
            "passed": len(results) - len(failed),
            "failed": len(failed),
            "failed_checks": [
                {
                    "rule": r.rule_name,
                    "dimension": r.dimension,
                    "score": f"{r.score:.1%}",
                    "threshold": f"{r.threshold:.1%}",
                }
                for r in failed
            ],
            "details": results,
            "timestamp": datetime.now().isoformat(),
            "rows": len(df),
            "columns": len(df.columns),
        }

        logger.info(
            f"Quality report: {overall}/100 overall | "
            f"{len(results) - len(failed)}/{len(results)} checks passed"
        )

        return report

# Step 3: Define Rules for Your Dataset

python
# Define quality rules for a sales dataset
sales_quality_rules = [
    # Completeness
    completeness_rule("order_id", threshold=1.0),     # Must never be null
    completeness_rule("revenue", threshold=0.98),
    completeness_rule("customer_id", threshold=0.95),
    completeness_rule("region", threshold=0.90),

    # Accuracy
    range_rule("revenue", min_val=0, max_val=100000),  # No negative or extreme values
    range_rule("quantity", min_val=1, max_val=1000),

    # Consistency
    uniqueness_rule("order_id", threshold=0.99),       # Should be mostly unique
    format_rule("email", r'^[\w.+-]+@[\w-]+\.[\w.]+$', threshold=0.90),
    referential_rule("region", ["North", "South", "East", "West"], threshold=0.95),

    # Timeliness
    freshness_rule("created_at", max_age_hours=24),
]

# Create engine
engine = QualityEngine(sales_quality_rules)

# Running a Quality Check

python
# Load your data
df = pd.read_csv("sales_data.csv")

# Generate report
report = engine.generate_report(df)

# Display results
print(f"\nOverall Quality Score: {report['overall_score']}/100")
print(f"Checks: {report['passed']}/{report['total_checks']} passed")

print("\nDimension Scores:")
for dim, score in report["dimension_scores"].items():
    print(f"  {dim}: {score}/100")

if report["failed_checks"]:
    print("\nFailed Checks:")
    for check in report["failed_checks"]:
        print(f"  FAIL {check['rule']}: {check['score']} (threshold: {check['threshold']})")

# Example Output

text
Overall Quality Score: 87.3/100
Checks: 8/11 passed

Dimension Scores:
  completeness: 95.2/100
  accuracy: 92.1/100
  consistency: 78.5/100
  timeliness: 100.0/100

Failed Checks:
  FAIL email_format: 82.3% (threshold: 90.0%)
  FAIL region_referential: 88.7% (threshold: 95.0%)
  FAIL order_id_uniqueness: 96.2% (threshold: 99.0%)

# Step 4: Track Quality Over Time

Quality scores are most valuable as trends. A score of 87 means nothing in isolation — but a drop from 95 to 87 in two weeks signals a problem.

python
import sqlite3
import json

class QualityTracker:
    """Track quality scores over time for trend analysis."""

    def __init__(self, db_path="quality_history.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """Create tracking tables."""
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS quality_runs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                dataset_name TEXT NOT NULL,
                run_timestamp TEXT NOT NULL,
                overall_score REAL NOT NULL,
                dimension_scores TEXT NOT NULL,
                total_checks INTEGER NOT NULL,
                passed_checks INTEGER NOT NULL,
                failed_checks TEXT,
                row_count INTEGER,
                column_count INTEGER
            )
        """)
        conn.commit()
        conn.close()

    def record(self, dataset_name, report):
        """Record a quality report for trend tracking."""
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """INSERT INTO quality_runs
               (dataset_name, run_timestamp, overall_score, dimension_scores,
                total_checks, passed_checks, failed_checks, row_count, column_count)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                dataset_name,
                report["timestamp"],
                report["overall_score"],
                json.dumps(report["dimension_scores"]),
                report["total_checks"],
                report["passed"],
                json.dumps(report["failed_checks"]),
                report["rows"],
                report["columns"],
            ),
        )
        conn.commit()
        conn.close()
        logger.info(f"Quality recorded: {dataset_name} = {report['overall_score']}/100")

    def get_trend(self, dataset_name, days=30):
        """Get quality score trend for a dataset."""
        conn = sqlite3.connect(self.db_path)
        since = (datetime.now() - timedelta(days=days)).isoformat()

        df = pd.read_sql(
            """SELECT run_timestamp, overall_score, dimension_scores,
                      passed_checks, total_checks, row_count
               FROM quality_runs
               WHERE dataset_name = ? AND run_timestamp >= ?
               ORDER BY run_timestamp""",
            conn,
            params=(dataset_name, since),
        )
        conn.close()

        if not df.empty:
            df["run_timestamp"] = pd.to_datetime(df["run_timestamp"])

        return df

    def detect_degradation(self, dataset_name, window=7, threshold_drop=5.0):
        """Detect if quality has dropped significantly."""
        trend = self.get_trend(dataset_name, days=window * 2)

        if len(trend) < 2:
            return None  # Not enough data

        recent_avg = trend.tail(window)["overall_score"].mean()
        previous_avg = trend.head(window)["overall_score"].mean()
        change = recent_avg - previous_avg

        result = {
            "dataset": dataset_name,
            "recent_average": round(recent_avg, 1),
            "previous_average": round(previous_avg, 1),
            "change": round(change, 1),
            "degraded": change < -threshold_drop,
        }

        if result["degraded"]:
            logger.warning(
                f"Quality DEGRADATION detected: {dataset_name} "
                f"dropped {abs(change):.1f} points "
                f"({previous_avg:.1f}{recent_avg:.1f})"
            )

        return result

# Step 5: Quality-Gated Pipelines

Stop the pipeline if data quality is below acceptable levels.

python
class QualityGate:
    """Gate pipeline stages based on quality scores."""

    def __init__(self, engine, tracker, dataset_name):
        self.engine = engine
        self.tracker = tracker
        self.dataset_name = dataset_name

    def check(self, df, min_overall=80, fail_on_critical=True):
        """Run quality check and decide if pipeline should continue.

        Returns:
            (bool, dict) — (should_continue, quality_report)
        """
        report = self.engine.generate_report(df)
        self.tracker.record(self.dataset_name, report)

        # Check overall score
        if report["overall_score"] < min_overall:
            logger.error(
                f"Quality gate FAILED: {report['overall_score']}/100 "
                f"(minimum: {min_overall})"
            )
            return False, report

        # Check for critical failures (completeness of required fields)
        if fail_on_critical:
            critical_failures = [
                f for f in report["failed_checks"]
                if "completeness" in f["rule"] and f["rule"].endswith("_completeness")
            ]
            if critical_failures:
                logger.error(
                    f"Quality gate FAILED: critical completeness checks failed — "
                    f"{[f['rule'] for f in critical_failures]}"
                )
                return False, report

        # Check for degradation
        degradation = self.tracker.detect_degradation(self.dataset_name)
        if degradation and degradation["degraded"]:
            logger.warning(
                f"Quality degradation detected but passing: "
                f"{degradation['change']:+.1f} points"
            )

        logger.info(f"Quality gate PASSED: {report['overall_score']}/100")
        return True, report

# Using Quality Gates in a Pipeline

python
def run_quality_gated_pipeline():
    """Pipeline that stops if data quality is insufficient."""
    engine = QualityEngine(sales_quality_rules)
    tracker = QualityTracker()
    gate = QualityGate(engine, tracker, "daily_sales")

    # Extract
    df = extract_sales_data()
    logger.info(f"Extracted {len(df)} rows")

    # Quality gate — stop if quality is too low
    should_continue, report = gate.check(df, min_overall=80)
    if not should_continue:
        logger.error("Pipeline stopped: data quality below threshold")
        send_quality_alert(report)
        return {"status": "blocked", "quality_score": report["overall_score"]}

    # Transform (only if quality passed)
    transformed = transform_data(df)

    # Load
    load_to_database(transformed)

    return {
        "status": "success",
        "rows": len(transformed),
        "quality_score": report["overall_score"],
    }

# Step 6: Quality Reporting

Generate a quality summary for stakeholders.

python
def generate_quality_summary(tracker, dataset_name, days=30):
    """Generate a human-readable quality summary."""
    trend = tracker.get_trend(dataset_name, days=days)

    if trend.empty:
        return "No quality data available."

    current = trend.iloc[-1]
    avg = trend["overall_score"].mean()
    min_score = trend["overall_score"].min()
    max_score = trend["overall_score"].max()

    lines = [
        f"Quality Summary: {dataset_name}",
        f"   Period: last {days} days ({len(trend)} runs)",
        f"   Current score: {current['overall_score']}/100",
        f"   Average: {avg:.1f}/100",
        f"   Range: {min_score:.1f}{max_score:.1f}",
        f"   Latest row count: {current['row_count']:,}",
    ]

    # Parse dimension scores from latest run
    dim_scores = json.loads(current["dimension_scores"])
    lines.append("\n   Dimension breakdown:")
    for dim, score in dim_scores.items():
        bar = "" * int(score / 5) + "" * (20 - int(score / 5))
        lines.append(f"   {dim:15s} {bar} {score}/100")

    # Check degradation
    degradation = tracker.detect_degradation(dataset_name)
    if degradation:
        if degradation["degraded"]:
            lines.append(f"\n   WARNING -- DEGRADATION: {degradation['change']:+.1f} points over last week")
        else:
            lines.append(f"\n   OK -- Trend: {degradation['change']:+.1f} points (stable)")

    return "\n".join(lines)

# Sample Quality Summary

text
Quality Summary: daily_sales
   Period: last 30 days (28 runs)
   Current score: 87.3/100
   Average: 91.2/100
   Range: 85.1 — 97.4

   Dimension breakdown:
   completeness    ███████████████████░ 95.2/100
   accuracy        ██████████████████░░ 92.1/100
   consistency     ███████████████░░░░░ 78.5/100
   timeliness      ████████████████████ 100.0/100

   WARNING -- DEGRADATION: -4.2 points over last week

# What This Replaces

Ad-hoc approach Quality framework equivalent
"The numbers look off" Automated quality score with thresholds
Manual spot-checking Systematic rule evaluation
No quality history Trend tracking with degradation detection
Quality issues found in reports Quality gates stop bad data before loading
"Is our data getting worse?" Time-series quality scores with alerting
Different checks per pipeline Reusable rule library

# Quick-Start Rule Sets

Dataset type Key rules
Sales/Orders Revenue ≥ 0, order_id unique, date freshness, customer_id completeness
Customer Email format, phone format, country in valid list, no exact duplicates
Product Price > 0, category in valid set, name completeness, SKU uniqueness
Time series No gaps in dates, values within historical range, freshness
Survey/Form Required fields complete, ratings in valid range, no spam patterns

# Next Steps

Start with completeness and accuracy rules — they catch the most impactful issues. Add consistency and timeliness rules as your framework matures. The quality gate pattern is particularly valuable: it prevents bad data from ever reaching your reports.

The trend tracking pays off over weeks. When you can see quality scores declining gradually, you can investigate and fix the root cause before it becomes an incident.

For cleaning the data when quality checks fail, see How to Clean Messy Excel Data Using Python. For building the pipelines that these quality checks protect, see How to Design Data Pipelines for Reliable Reporting.

Data analytics services include building data quality monitoring frameworks for production reporting systems.

Get in touch to discuss implementing data quality monitoring for your pipelines.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

data quality framework pythondata quality scoringdata completeness checkdata validation frameworkdata quality monitoringpython data qualitydata quality metricsdata quality trendsautomated data validationdata quality dashboard