Building a Lightweight Data Quality Framework from Scratch
Build a reusable data quality framework that scores datasets across completeness, accuracy, consistency, and timeliness — with trend tracking and automated alerting on quality degradation.
AI GeneratedClean data is not a one-time task. Data quality degrades constantly — sources change formats, new edge cases appear, upstream systems introduce bugs. A pipeline that produced perfect output last month can silently deliver garbage today.
The fix is not more cleaning code. It is a quality framework — a reusable system that scores your data against defined standards, tracks quality over time, and alerts when something degrades.
This guide builds a complete data quality framework in Python. By the end, you will have validators, scoring, trend tracking, and alerting that you can plug into any existing pipeline.
# Who This Is For
- Data engineers who have pipelines running but no systematic way to catch bad data
- Analysts who have been burned by reports built on silently corrupted data
- Vibe coders who automated a data flow and now need guardrails to keep it trustworthy
- Team leads who want confidence that dashboards and reports reflect reality
You need basic Python knowledge. The framework uses pandas for data handling — if you can read a CSV into a DataFrame, you can follow this guide.
# The Quality Framework Architecture
flowchart LR D["Dataset"] --> V["Validators\n(rules engine)"] V --> S["Quality Score\n(0–100)"] S --> T["Trend Tracker\n(historical)"] T --> A["Alerting\n(if degraded)"] S --> R["Quality Report\n(per-column detail)"]
Data goes in. Quality scores come out. Scores are tracked over time. Degradation triggers alerts. Every pipeline run produces a quality report alongside its business output.
# What You Will Need
pip install pandas numpy
No specialist libraries needed. The framework is built from scratch — portable and dependency-light.
# The Four Dimensions of Data Quality
flowchart TD DQ["Data Quality"] --> C["Completeness\n(are values present?)"] DQ --> AC["Accuracy\n(are values correct?)"] DQ --> CO["Consistency\n(are values uniform?)"] DQ --> TI["Timeliness\n(is data current?)"]
| Dimension | What it measures | Example check |
|---|---|---|
| Completeness | Missing values, null rates | "Revenue column is 98% filled" |
| Accuracy | Values within expected ranges | "No negative revenue values" |
| Consistency | Format uniformity, referential integrity | "All dates are in YYYY-MM-DD" |
| Timeliness | Data freshness, recency | "Most recent record is from today" |
# Step 1: Define Quality Rules
Each rule is a function that checks one aspect of quality and returns a score between 0 and 1.
import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable
import logging
logger = logging.getLogger("quality")
@dataclass
class QualityRule:
"""A single data quality check."""
name: str
dimension: str # completeness, accuracy, consistency, timeliness
column: str # which column to check (or "__dataset__" for row-level)
check: Callable # function(series_or_df) -> float (0.0 to 1.0)
weight: float = 1.0 # relative importance
threshold: float = 0.9 # minimum acceptable score
def completeness_rule(column, threshold=0.95):
"""Check what percentage of values are non-null."""
return QualityRule(
name=f"{column}_completeness",
dimension="completeness",
column=column,
check=lambda s: 1 - (s.isna().sum() / len(s)) if len(s) > 0 else 0,
threshold=threshold,
)
def range_rule(column, min_val=None, max_val=None, threshold=0.95):
"""Check what percentage of values fall within an expected range."""
def check(s):
s = pd.to_numeric(s, errors="coerce").dropna()
if len(s) == 0:
return 0
in_range = s.copy()
if min_val is not None:
in_range = in_range[in_range >= min_val]
if max_val is not None:
in_range = in_range[in_range <= max_val]
return len(in_range) / len(s)
return QualityRule(
name=f"{column}_range",
dimension="accuracy",
column=column,
check=check,
threshold=threshold,
)
def uniqueness_rule(column, threshold=0.99):
"""Check what percentage of values are unique."""
return QualityRule(
name=f"{column}_uniqueness",
dimension="consistency",
column=column,
check=lambda s: s.nunique() / len(s) if len(s) > 0 else 0,
threshold=threshold,
)
def format_rule(column, pattern, threshold=0.95):
"""Check what percentage of values match a regex pattern."""
import re
compiled = re.compile(pattern)
def check(s):
s = s.dropna().astype(str)
if len(s) == 0:
return 0
matches = s.apply(lambda x: bool(compiled.match(x)))
return matches.sum() / len(s)
return QualityRule(
name=f"{column}_format",
dimension="consistency",
column=column,
check=check,
threshold=threshold,
)
def freshness_rule(column, max_age_hours=24, threshold=0.9):
"""Check if the most recent record is within expected age."""
def check(s):
s = pd.to_datetime(s, errors="coerce").dropna()
if len(s) == 0:
return 0
most_recent = s.max()
age = datetime.now() - most_recent
max_age = timedelta(hours=max_age_hours)
if age <= max_age:
return 1.0
elif age <= max_age * 2:
return 0.5
else:
return 0.0
return QualityRule(
name=f"{column}_freshness",
dimension="timeliness",
column=column,
check=check,
threshold=threshold,
)
def referential_rule(column, valid_values, threshold=0.95):
"""Check what percentage of values are in a valid set."""
def check(s):
s = s.dropna()
if len(s) == 0:
return 0
valid = s.isin(valid_values)
return valid.sum() / len(s)
return QualityRule(
name=f"{column}_referential",
dimension="consistency",
column=column,
check=check,
threshold=threshold,
)
# Step 2: The Quality Engine
Run all rules against a dataset and produce scores.
@dataclass
class QualityResult:
"""Result of a single quality check."""
rule_name: str
dimension: str
column: str
score: float
threshold: float
passed: bool
details: str = ""
class QualityEngine:
"""Run quality rules against a dataset and produce scores."""
def __init__(self, rules):
self.rules = rules
def evaluate(self, df):
"""Evaluate all rules against a DataFrame."""
results = []
for rule in self.rules:
try:
if rule.column == "__dataset__":
score = rule.check(df)
elif rule.column in df.columns:
score = rule.check(df[rule.column])
else:
results.append(QualityResult(
rule_name=rule.name,
dimension=rule.dimension,
column=rule.column,
score=0.0,
threshold=rule.threshold,
passed=False,
details=f"Column '{rule.column}' not found",
))
continue
score = round(float(score), 4)
passed = score >= rule.threshold
results.append(QualityResult(
rule_name=rule.name,
dimension=rule.dimension,
column=rule.column,
score=score,
threshold=rule.threshold,
passed=passed,
details=f"{'PASS' if passed else 'FAIL'}: {score:.1%} (threshold: {rule.threshold:.1%})",
))
if not passed:
logger.warning(f"Quality check FAILED: {rule.name} — {score:.1%} < {rule.threshold:.1%}")
except Exception as e:
results.append(QualityResult(
rule_name=rule.name,
dimension=rule.dimension,
column=rule.column,
score=0.0,
threshold=rule.threshold,
passed=False,
details=f"Error: {e}",
))
logger.error(f"Quality check error: {rule.name} — {e}")
return results
def calculate_overall_score(self, results):
"""Calculate weighted overall quality score."""
if not results:
return 0
rule_map = {r.name: r for r in self.rules}
weighted_sum = 0
total_weight = 0
for result in results:
rule = rule_map.get(result.rule_name)
weight = rule.weight if rule else 1.0
weighted_sum += result.score * weight
total_weight += weight
return round(weighted_sum / total_weight * 100, 1) if total_weight > 0 else 0
def generate_report(self, df):
"""Generate a complete quality report."""
results = self.evaluate(df)
overall = self.calculate_overall_score(results)
# Group by dimension
dimension_scores = {}
for result in results:
if result.dimension not in dimension_scores:
dimension_scores[result.dimension] = []
dimension_scores[result.dimension].append(result.score)
dimension_averages = {
dim: round(np.mean(scores) * 100, 1)
for dim, scores in dimension_scores.items()
}
failed = [r for r in results if not r.passed]
report = {
"overall_score": overall,
"dimension_scores": dimension_averages,
"total_checks": len(results),
"passed": len(results) - len(failed),
"failed": len(failed),
"failed_checks": [
{
"rule": r.rule_name,
"dimension": r.dimension,
"score": f"{r.score:.1%}",
"threshold": f"{r.threshold:.1%}",
}
for r in failed
],
"details": results,
"timestamp": datetime.now().isoformat(),
"rows": len(df),
"columns": len(df.columns),
}
logger.info(
f"Quality report: {overall}/100 overall | "
f"{len(results) - len(failed)}/{len(results)} checks passed"
)
return report
# Step 3: Define Rules for Your Dataset
# Define quality rules for a sales dataset
sales_quality_rules = [
# Completeness
completeness_rule("order_id", threshold=1.0), # Must never be null
completeness_rule("revenue", threshold=0.98),
completeness_rule("customer_id", threshold=0.95),
completeness_rule("region", threshold=0.90),
# Accuracy
range_rule("revenue", min_val=0, max_val=100000), # No negative or extreme values
range_rule("quantity", min_val=1, max_val=1000),
# Consistency
uniqueness_rule("order_id", threshold=0.99), # Should be mostly unique
format_rule("email", r'^[\w.+-]+@[\w-]+\.[\w.]+$', threshold=0.90),
referential_rule("region", ["North", "South", "East", "West"], threshold=0.95),
# Timeliness
freshness_rule("created_at", max_age_hours=24),
]
# Create engine
engine = QualityEngine(sales_quality_rules)
# Running a Quality Check
# Load your data
df = pd.read_csv("sales_data.csv")
# Generate report
report = engine.generate_report(df)
# Display results
print(f"\nOverall Quality Score: {report['overall_score']}/100")
print(f"Checks: {report['passed']}/{report['total_checks']} passed")
print("\nDimension Scores:")
for dim, score in report["dimension_scores"].items():
print(f" {dim}: {score}/100")
if report["failed_checks"]:
print("\nFailed Checks:")
for check in report["failed_checks"]:
print(f" FAIL {check['rule']}: {check['score']} (threshold: {check['threshold']})")
# Example Output
Overall Quality Score: 87.3/100
Checks: 8/11 passed
Dimension Scores:
completeness: 95.2/100
accuracy: 92.1/100
consistency: 78.5/100
timeliness: 100.0/100
Failed Checks:
FAIL email_format: 82.3% (threshold: 90.0%)
FAIL region_referential: 88.7% (threshold: 95.0%)
FAIL order_id_uniqueness: 96.2% (threshold: 99.0%)
# Step 4: Track Quality Over Time
Quality scores are most valuable as trends. A score of 87 means nothing in isolation — but a drop from 95 to 87 in two weeks signals a problem.
import sqlite3
import json
class QualityTracker:
"""Track quality scores over time for trend analysis."""
def __init__(self, db_path="quality_history.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Create tracking tables."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS quality_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dataset_name TEXT NOT NULL,
run_timestamp TEXT NOT NULL,
overall_score REAL NOT NULL,
dimension_scores TEXT NOT NULL,
total_checks INTEGER NOT NULL,
passed_checks INTEGER NOT NULL,
failed_checks TEXT,
row_count INTEGER,
column_count INTEGER
)
""")
conn.commit()
conn.close()
def record(self, dataset_name, report):
"""Record a quality report for trend tracking."""
conn = sqlite3.connect(self.db_path)
conn.execute(
"""INSERT INTO quality_runs
(dataset_name, run_timestamp, overall_score, dimension_scores,
total_checks, passed_checks, failed_checks, row_count, column_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
dataset_name,
report["timestamp"],
report["overall_score"],
json.dumps(report["dimension_scores"]),
report["total_checks"],
report["passed"],
json.dumps(report["failed_checks"]),
report["rows"],
report["columns"],
),
)
conn.commit()
conn.close()
logger.info(f"Quality recorded: {dataset_name} = {report['overall_score']}/100")
def get_trend(self, dataset_name, days=30):
"""Get quality score trend for a dataset."""
conn = sqlite3.connect(self.db_path)
since = (datetime.now() - timedelta(days=days)).isoformat()
df = pd.read_sql(
"""SELECT run_timestamp, overall_score, dimension_scores,
passed_checks, total_checks, row_count
FROM quality_runs
WHERE dataset_name = ? AND run_timestamp >= ?
ORDER BY run_timestamp""",
conn,
params=(dataset_name, since),
)
conn.close()
if not df.empty:
df["run_timestamp"] = pd.to_datetime(df["run_timestamp"])
return df
def detect_degradation(self, dataset_name, window=7, threshold_drop=5.0):
"""Detect if quality has dropped significantly."""
trend = self.get_trend(dataset_name, days=window * 2)
if len(trend) < 2:
return None # Not enough data
recent_avg = trend.tail(window)["overall_score"].mean()
previous_avg = trend.head(window)["overall_score"].mean()
change = recent_avg - previous_avg
result = {
"dataset": dataset_name,
"recent_average": round(recent_avg, 1),
"previous_average": round(previous_avg, 1),
"change": round(change, 1),
"degraded": change < -threshold_drop,
}
if result["degraded"]:
logger.warning(
f"Quality DEGRADATION detected: {dataset_name} "
f"dropped {abs(change):.1f} points "
f"({previous_avg:.1f} → {recent_avg:.1f})"
)
return result
# Step 5: Quality-Gated Pipelines
Stop the pipeline if data quality is below acceptable levels.
class QualityGate:
"""Gate pipeline stages based on quality scores."""
def __init__(self, engine, tracker, dataset_name):
self.engine = engine
self.tracker = tracker
self.dataset_name = dataset_name
def check(self, df, min_overall=80, fail_on_critical=True):
"""Run quality check and decide if pipeline should continue.
Returns:
(bool, dict) — (should_continue, quality_report)
"""
report = self.engine.generate_report(df)
self.tracker.record(self.dataset_name, report)
# Check overall score
if report["overall_score"] < min_overall:
logger.error(
f"Quality gate FAILED: {report['overall_score']}/100 "
f"(minimum: {min_overall})"
)
return False, report
# Check for critical failures (completeness of required fields)
if fail_on_critical:
critical_failures = [
f for f in report["failed_checks"]
if "completeness" in f["rule"] and f["rule"].endswith("_completeness")
]
if critical_failures:
logger.error(
f"Quality gate FAILED: critical completeness checks failed — "
f"{[f['rule'] for f in critical_failures]}"
)
return False, report
# Check for degradation
degradation = self.tracker.detect_degradation(self.dataset_name)
if degradation and degradation["degraded"]:
logger.warning(
f"Quality degradation detected but passing: "
f"{degradation['change']:+.1f} points"
)
logger.info(f"Quality gate PASSED: {report['overall_score']}/100")
return True, report
# Using Quality Gates in a Pipeline
def run_quality_gated_pipeline():
"""Pipeline that stops if data quality is insufficient."""
engine = QualityEngine(sales_quality_rules)
tracker = QualityTracker()
gate = QualityGate(engine, tracker, "daily_sales")
# Extract
df = extract_sales_data()
logger.info(f"Extracted {len(df)} rows")
# Quality gate — stop if quality is too low
should_continue, report = gate.check(df, min_overall=80)
if not should_continue:
logger.error("Pipeline stopped: data quality below threshold")
send_quality_alert(report)
return {"status": "blocked", "quality_score": report["overall_score"]}
# Transform (only if quality passed)
transformed = transform_data(df)
# Load
load_to_database(transformed)
return {
"status": "success",
"rows": len(transformed),
"quality_score": report["overall_score"],
}
# Step 6: Quality Reporting
Generate a quality summary for stakeholders.
def generate_quality_summary(tracker, dataset_name, days=30):
"""Generate a human-readable quality summary."""
trend = tracker.get_trend(dataset_name, days=days)
if trend.empty:
return "No quality data available."
current = trend.iloc[-1]
avg = trend["overall_score"].mean()
min_score = trend["overall_score"].min()
max_score = trend["overall_score"].max()
lines = [
f"Quality Summary: {dataset_name}",
f" Period: last {days} days ({len(trend)} runs)",
f" Current score: {current['overall_score']}/100",
f" Average: {avg:.1f}/100",
f" Range: {min_score:.1f} — {max_score:.1f}",
f" Latest row count: {current['row_count']:,}",
]
# Parse dimension scores from latest run
dim_scores = json.loads(current["dimension_scores"])
lines.append("\n Dimension breakdown:")
for dim, score in dim_scores.items():
bar = "█" * int(score / 5) + "░" * (20 - int(score / 5))
lines.append(f" {dim:15s} {bar} {score}/100")
# Check degradation
degradation = tracker.detect_degradation(dataset_name)
if degradation:
if degradation["degraded"]:
lines.append(f"\n WARNING -- DEGRADATION: {degradation['change']:+.1f} points over last week")
else:
lines.append(f"\n OK -- Trend: {degradation['change']:+.1f} points (stable)")
return "\n".join(lines)
# Sample Quality Summary
Quality Summary: daily_sales
Period: last 30 days (28 runs)
Current score: 87.3/100
Average: 91.2/100
Range: 85.1 — 97.4
Dimension breakdown:
completeness ███████████████████░ 95.2/100
accuracy ██████████████████░░ 92.1/100
consistency ███████████████░░░░░ 78.5/100
timeliness ████████████████████ 100.0/100
WARNING -- DEGRADATION: -4.2 points over last week
# What This Replaces
| Ad-hoc approach | Quality framework equivalent |
|---|---|
| "The numbers look off" | Automated quality score with thresholds |
| Manual spot-checking | Systematic rule evaluation |
| No quality history | Trend tracking with degradation detection |
| Quality issues found in reports | Quality gates stop bad data before loading |
| "Is our data getting worse?" | Time-series quality scores with alerting |
| Different checks per pipeline | Reusable rule library |
# Quick-Start Rule Sets
| Dataset type | Key rules |
|---|---|
| Sales/Orders | Revenue ≥ 0, order_id unique, date freshness, customer_id completeness |
| Customer | Email format, phone format, country in valid list, no exact duplicates |
| Product | Price > 0, category in valid set, name completeness, SKU uniqueness |
| Time series | No gaps in dates, values within historical range, freshness |
| Survey/Form | Required fields complete, ratings in valid range, no spam patterns |
# Next Steps
Start with completeness and accuracy rules — they catch the most impactful issues. Add consistency and timeliness rules as your framework matures. The quality gate pattern is particularly valuable: it prevents bad data from ever reaching your reports.
The trend tracking pays off over weeks. When you can see quality scores declining gradually, you can investigate and fix the root cause before it becomes an incident.
For cleaning the data when quality checks fail, see How to Clean Messy Excel Data Using Python. For building the pipelines that these quality checks protect, see How to Design Data Pipelines for Reliable Reporting.
Data analytics services include building data quality monitoring frameworks for production reporting systems.
Get in touch to discuss implementing data quality monitoring for your pipelines.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.