How to Design Idempotent Data Pipelines That Are Safe to Re-Run
Build data pipelines that produce the same result whether they run once or ten times — using upserts, deduplication, and staging patterns in Python.
AI GeneratedYour pipeline fails at 2 AM. You re-run it at 7 AM. Now you have duplicate rows, double-counted revenue, and a dashboard that says Tuesday's sales were twice what they actually were.
This happens because most pipelines assume they only run once. In production, that assumption breaks constantly. Retries, scheduler restarts, partial failures, and manual re-runs all mean your pipeline will execute the same work more than once. If it is not idempotent — meaning it produces the same result regardless of how many times it runs — every re-run is a gamble.
This guide builds idempotent patterns in Python that make your pipelines safe to re-run at any time, from any point.
# Who This Is For
- Data engineers who have seen duplicate rows in production after re-running a failed job
- Developers building ETL processes that may run more than once due to retries or scheduler restarts
- Vibe coders whose automations occasionally double-process data and cause downstream issues
- Anyone who has manually cleaned up duplicates after a pipeline re-run gone wrong
Basic Python and SQL knowledge is helpful. The core concept is simple: if you run the same pipeline twice, you should get the same result — not twice the data. This guide shows you how to guarantee that.
# The Idempotent Pipeline Architecture
flowchart TD
S[Source Data] --> H[Hash & Tag Records]
H --> ST[Staging Table]
ST --> D{Dedup Check}
D -->|New| U[Upsert to Target]
D -->|Duplicate| SK[Skip]
U --> V[Verify Row Counts]
SK --> V
V --> CL[Clean Staging]
CL --> Done[Complete ✓]Every record gets a deterministic identifier. The pipeline writes to a staging area first, deduplicates against what already exists, then upserts into the final target. Whether this runs once or five times, the result is identical.
# What You Will Need
pip install pandas sqlalchemy psycopg2-binary hashlib
- pandas — data processing and DataFrame operations
- sqlalchemy — database abstraction for upserts
- psycopg2-binary — PostgreSQL adapter
- hashlib — deterministic record hashing (standard library)
# Step 1: Deterministic Record Identifiers
The foundation of idempotency is identifying each record uniquely based on its content, not on when it arrived or which run produced it.
import hashlib
import json
from datetime import datetime
def generate_record_id(record: dict, key_fields: list[str]) -> str:
"""Generate a deterministic hash ID from specific fields.
Same input always produces the same ID, regardless of
when or how many times the pipeline runs.
"""
key_data = {field: record[field] for field in sorted(key_fields)}
raw = json.dumps(key_data, sort_keys=True, default=str)
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def tag_records(df, key_fields: list[str], run_id: str):
"""Add deterministic IDs and run metadata to every record."""
df = df.copy()
df["record_id"] = df.apply(
lambda row: generate_record_id(row.to_dict(), key_fields), axis=1
)
df["run_id"] = run_id
df["loaded_at"] = datetime.utcnow().isoformat()
return df
# Why This Matters
| Approach | Re-run safe? | Problem |
|---|---|---|
| Auto-increment IDs | No | Every insert gets a new ID |
| Timestamp-based IDs | No | Different time = different ID |
| Content-based hash | Yes | Same data = same ID every time |
The hash approach means that if the same order appears in two pipeline runs, it gets the same record_id both times — making deduplication trivial.
# Step 2: Run IDs for Auditability
Every pipeline execution should carry a unique run ID. This lets you trace exactly which run produced which records and clean up partial runs.
import uuid
from datetime import datetime
def generate_run_id(pipeline_name: str, target_date: str) -> str:
"""Generate a deterministic run ID for a given pipeline and date.
Same pipeline + date always produces the same run ID.
This means re-running Tuesday's pipeline on Wednesday
still targets Tuesday's data.
"""
raw = f"{pipeline_name}:{target_date}"
return hashlib.sha256(raw.encode()).hexdigest()[:12]
class PipelineRun:
"""Track a single pipeline execution."""
def __init__(self, pipeline_name: str, target_date: str):
self.pipeline_name = pipeline_name
self.target_date = target_date
self.run_id = generate_run_id(pipeline_name, target_date)
self.started_at = datetime.utcnow()
def __repr__(self):
return f"PipelineRun({self.pipeline_name}, {self.target_date}, run={self.run_id})"
A deterministic run ID means re-running the same pipeline for the same date always produces the same identifier. This is the key to replacing old results instead of duplicating them.
# Step 3: Stage Before You Load
Never write directly to your production table. Write to a staging table first, then merge.
import pandas as pd
from sqlalchemy import create_engine, text
import logging
logger = logging.getLogger("pipeline")
class StagingLoader:
"""Load data through a staging table for safe, idempotent writes."""
def __init__(self, engine, schema="staging"):
self.engine = engine
self.schema = schema
def write_to_staging(self, df: pd.DataFrame, table_name: str, run_id: str):
"""Write records to staging, replacing any previous run data."""
staging_table = f"{self.schema}.{table_name}"
with self.engine.begin() as conn:
# Clear previous data from this run (idempotent cleanup)
conn.execute(
text(f"DELETE FROM {staging_table} WHERE run_id = :run_id"),
{"run_id": run_id},
)
# Write fresh data
df.to_sql(
table_name,
conn,
schema=self.schema,
if_exists="append",
index=False,
)
logger.info(
f"Staged {len(df)} rows to {staging_table} (run: {run_id})"
)
def clear_staging(self, table_name: str, run_id: str):
"""Clean up staging data after successful merge."""
staging_table = f"{self.schema}.{table_name}"
with self.engine.begin() as conn:
conn.execute(
text(f"DELETE FROM {staging_table} WHERE run_id = :run_id"),
{"run_id": run_id},
)
logger.info(f"Cleared staging: {staging_table} (run: {run_id})")
The DELETE before INSERT pattern means that even if the staging step runs multiple times, the staging table contains exactly one copy of the data.
# Step 4: Upsert — Insert or Update, Never Duplicate
The upsert is the core idempotent write operation. If a record exists, update it. If not, insert it. Either way, one run or ten runs produces the same final state.
# PostgreSQL Upsert
from sqlalchemy import text
def upsert_from_staging(engine, staging_table, target_table, key_column, columns):
"""Merge staged data into target using INSERT ON CONFLICT.
This is the idempotent write — running it multiple times
produces the same result.
"""
update_cols = [c for c in columns if c != key_column]
update_clause = ", ".join(
f"{col} = EXCLUDED.{col}" for col in update_cols
)
col_list = ", ".join(columns)
query = text(f"""
INSERT INTO {target_table} ({col_list})
SELECT {col_list} FROM {staging_table}
ON CONFLICT ({key_column})
DO UPDATE SET {update_clause}
""")
with engine.begin() as conn:
result = conn.execute(query)
logger.info(
f"Upserted {result.rowcount} rows from {staging_table} → {target_table}"
)
return result.rowcount
# SQLite Upsert (For Local Development)
def upsert_sqlite(engine, staging_table, target_table, key_column, columns):
"""SQLite equivalent using INSERT OR REPLACE."""
col_list = ", ".join(columns)
query = text(f"""
INSERT OR REPLACE INTO {target_table} ({col_list})
SELECT {col_list} FROM {staging_table}
""")
with engine.begin() as conn:
result = conn.execute(query)
logger.info(f"Upserted {result.rowcount} rows into {target_table}")
return result.rowcount
# What Each Run Looks Like
| Run | Staging rows | New rows | Updated rows | Target total |
|---|---|---|---|---|
| 1st | 500 | 500 | 0 | 500 |
| 2nd (re-run) | 500 | 0 | 500 | 500 |
| 3rd (new data) | 520 | 20 | 500 | 520 |
The target table always reflects the correct state, regardless of how many times you run the pipeline.
# Step 5: Deduplication Within a Single Run
Sometimes the source data itself contains duplicates. Handle this before writing to staging.
def deduplicate_records(df: pd.DataFrame, key_fields: list[str]) -> pd.DataFrame:
"""Remove duplicate records, keeping the most recent version.
Uses the deterministic record_id to identify duplicates.
Keeps the last occurrence (assuming source data is ordered by time).
"""
before = len(df)
df = df.drop_duplicates(subset=["record_id"], keep="last")
after = len(df)
if before != after:
logger.warning(f"Removed {before - after} duplicate records from source")
return df
def deduplicate_against_target(engine, df: pd.DataFrame, target_table: str) -> pd.DataFrame:
"""Remove records that already exist unchanged in the target.
Only processes records that are genuinely new or modified.
Reduces write volume on re-runs.
"""
if df.empty:
return df
record_ids = df["record_id"].tolist()
placeholders = ", ".join([f":id_{i}" for i in range(len(record_ids))])
params = {f"id_{i}": rid for i, rid in enumerate(record_ids)}
query = text(
f"SELECT record_id FROM {target_table} WHERE record_id IN ({placeholders})"
)
with engine.connect() as conn:
existing = set(row[0] for row in conn.execute(query, params))
new_records = df[~df["record_id"].isin(existing)]
logger.info(
f"Dedup check: {len(df)} total, {len(existing)} already exist, "
f"{len(new_records)} to process"
)
return new_records
# Step 6: Verify and Reconcile
After each load, verify that the target contains what you expect. This catches silent data loss.
class LoadVerifier:
"""Verify data integrity after pipeline loads."""
def __init__(self, engine):
self.engine = engine
def verify_row_counts(self, staging_table, target_table, run_id):
"""Check that all staged records made it to the target."""
with self.engine.connect() as conn:
staged = conn.execute(
text(f"SELECT COUNT(*) FROM {staging_table} WHERE run_id = :run_id"),
{"run_id": run_id},
).scalar()
# Count records in target that match this run's record_ids
target_count = conn.execute(
text(f"""
SELECT COUNT(*) FROM {target_table}
WHERE record_id IN (
SELECT record_id FROM {staging_table}
WHERE run_id = :run_id
)
"""),
{"run_id": run_id},
).scalar()
if staged != target_count:
raise DataIntegrityError(
f"Row count mismatch: {staged} staged, {target_count} in target"
)
logger.info(f"Verified: {target_count} rows match between staging and target")
return True
def check_no_duplicates(self, target_table, key_column):
"""Verify no duplicates exist in the target table."""
with self.engine.connect() as conn:
dupes = conn.execute(
text(f"""
SELECT {key_column}, COUNT(*) as cnt
FROM {target_table}
GROUP BY {key_column}
HAVING COUNT(*) > 1
LIMIT 10
""")
).fetchall()
if dupes:
raise DataIntegrityError(
f"Found {len(dupes)} duplicate keys in {target_table}: "
f"{[row[0] for row in dupes]}"
)
logger.info(f"Verified: no duplicates in {target_table}")
return True
class DataIntegrityError(Exception):
"""Raised when data verification fails."""
pass
# Step 7: The Complete Idempotent Pipeline
Wire everything together into a pipeline that is safe to run, re-run, and run again.
class IdempotentPipeline:
"""A pipeline that produces the same result regardless of how many times it runs."""
def __init__(self, name: str, engine, key_fields: list[str]):
self.name = name
self.engine = engine
self.key_fields = key_fields
self.staging = StagingLoader(engine)
self.verifier = LoadVerifier(engine)
def run(self, target_date: str, extract_func, transform_func=None):
"""Execute the full idempotent pipeline.
Safe to call multiple times for the same target_date.
"""
run = PipelineRun(self.name, target_date)
logger.info(f"Starting {run}")
# 1. Extract
logger.info(f"[{run.run_id}] Extracting data for {target_date}")
raw_df = extract_func(target_date)
logger.info(f"[{run.run_id}] Extracted {len(raw_df)} rows")
# 2. Tag with deterministic IDs
tagged_df = tag_records(raw_df, self.key_fields, run.run_id)
# 3. Transform (if provided)
if transform_func:
tagged_df = transform_func(tagged_df)
# 4. Deduplicate within the batch
clean_df = deduplicate_records(tagged_df, self.key_fields)
# 5. Stage
self.staging.write_to_staging(clean_df, self.name, run.run_id)
# 6. Upsert to target
columns = clean_df.columns.tolist()
upsert_from_staging(
self.engine,
f"staging.{self.name}",
f"public.{self.name}",
"record_id",
columns,
)
# 7. Verify
self.verifier.verify_row_counts(
f"staging.{self.name}", f"public.{self.name}", run.run_id
)
self.verifier.check_no_duplicates(f"public.{self.name}", "record_id")
# 8. Clean staging
self.staging.clear_staging(self.name, run.run_id)
logger.info(f"Completed {run} — {len(clean_df)} rows processed")
return {"run_id": run.run_id, "rows": len(clean_df)}
# Usage
engine = create_engine("postgresql://user:pass@localhost:5432/analytics")
pipeline = IdempotentPipeline(
name="daily_orders",
engine=engine,
key_fields=["order_id", "line_item_id"],
)
# Run it once
pipeline.run("2026-04-30", extract_orders)
# Run it again — same result, no duplicates
pipeline.run("2026-04-30", extract_orders)
# The Idempotency Checklist
Before considering your pipeline idempotent, verify each point:
| Check | Question | Pattern |
|---|---|---|
| Record identity | Can you identify the same record across runs? | Content-based hash |
| Write safety | Does re-running a write produce the same result? | Upsert / INSERT ON CONFLICT |
| Run isolation | Can you tell which run produced which data? | Deterministic run IDs |
| Staging cleanup | Does the pipeline clean up after itself? | DELETE + INSERT in staging |
| Source dedup | Does the pipeline handle source duplicates? | Hash-based deduplication |
| Target verification | Do you verify the final state is correct? | Row count + duplicate checks |
# What This Replaces
| Manual process | Idempotent equivalent |
|---|---|
| Deleting all data and reloading from scratch | Upsert — update existing, insert new |
| Manually checking for duplicates after re-runs | Deterministic record IDs + dedup |
| Hoping the pipeline only runs once | Design that is safe at any execution count |
| Tracking which runs succeeded in a spreadsheet | Deterministic run IDs with audit columns |
| Rebuilding target tables after partial failures | Staging + merge — partial runs are harmless |
# Next Steps
Idempotency is the foundation that makes every other pipeline pattern safe. Retries, scheduled runs, manual re-runs, and failover recovery all depend on your pipeline producing consistent results regardless of execution count.
For adding retry logic and automatic recovery, see How to Build Self-Healing Data Pipelines That Recover from Failures. For testing that your pipelines actually behave correctly under re-runs, see How to Test Your Data Pipelines with pytest. For scheduling pipelines that might need re-triggering, see How to Schedule and Orchestrate Multi-Step Workflows with Prefect. For deploying pipelines in containers where restarts are routine, see Containerizing Your Python Pipelines with Docker.
Automation services include designing idempotent pipeline systems that are safe to retry, re-run, and recover without data corruption.
Get in touch to discuss making your data pipelines safe to re-run.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.