How to Design Idempotent Data Pipelines That Are Safe to Re-Run

·10 min read·Automation

Build data pipelines that produce the same result whether they run once or ten times — using upserts, deduplication, and staging patterns in Python.

How to Design Idempotent Data Pipelines That Are Safe to Re-RunAI Generated

Your pipeline fails at 2 AM. You re-run it at 7 AM. Now you have duplicate rows, double-counted revenue, and a dashboard that says Tuesday's sales were twice what they actually were.

This happens because most pipelines assume they only run once. In production, that assumption breaks constantly. Retries, scheduler restarts, partial failures, and manual re-runs all mean your pipeline will execute the same work more than once. If it is not idempotent — meaning it produces the same result regardless of how many times it runs — every re-run is a gamble.

This guide builds idempotent patterns in Python that make your pipelines safe to re-run at any time, from any point.

# Who This Is For

  • Data engineers who have seen duplicate rows in production after re-running a failed job
  • Developers building ETL processes that may run more than once due to retries or scheduler restarts
  • Vibe coders whose automations occasionally double-process data and cause downstream issues
  • Anyone who has manually cleaned up duplicates after a pipeline re-run gone wrong

Basic Python and SQL knowledge is helpful. The core concept is simple: if you run the same pipeline twice, you should get the same result — not twice the data. This guide shows you how to guarantee that.

# The Idempotent Pipeline Architecture

flowchart TD
  S[Source Data] --> H[Hash & Tag Records]
  H --> ST[Staging Table]
  ST --> D{Dedup Check}
  D -->|New| U[Upsert to Target]
  D -->|Duplicate| SK[Skip]
  U --> V[Verify Row Counts]
  SK --> V
  V --> CL[Clean Staging]
  CL --> Done[Complete ✓]

Every record gets a deterministic identifier. The pipeline writes to a staging area first, deduplicates against what already exists, then upserts into the final target. Whether this runs once or five times, the result is identical.

# What You Will Need

bash
pip install pandas sqlalchemy psycopg2-binary hashlib
  • pandas — data processing and DataFrame operations
  • sqlalchemy — database abstraction for upserts
  • psycopg2-binary — PostgreSQL adapter
  • hashlib — deterministic record hashing (standard library)

# Step 1: Deterministic Record Identifiers

The foundation of idempotency is identifying each record uniquely based on its content, not on when it arrived or which run produced it.

python
import hashlib
import json
from datetime import datetime

def generate_record_id(record: dict, key_fields: list[str]) -> str:
    """Generate a deterministic hash ID from specific fields.

    Same input always produces the same ID, regardless of
    when or how many times the pipeline runs.
    """
    key_data = {field: record[field] for field in sorted(key_fields)}
    raw = json.dumps(key_data, sort_keys=True, default=str)
    return hashlib.sha256(raw.encode()).hexdigest()[:16]


def tag_records(df, key_fields: list[str], run_id: str):
    """Add deterministic IDs and run metadata to every record."""
    df = df.copy()
    df["record_id"] = df.apply(
        lambda row: generate_record_id(row.to_dict(), key_fields), axis=1
    )
    df["run_id"] = run_id
    df["loaded_at"] = datetime.utcnow().isoformat()
    return df

# Why This Matters

Approach Re-run safe? Problem
Auto-increment IDs No Every insert gets a new ID
Timestamp-based IDs No Different time = different ID
Content-based hash Yes Same data = same ID every time

The hash approach means that if the same order appears in two pipeline runs, it gets the same record_id both times — making deduplication trivial.

# Step 2: Run IDs for Auditability

Every pipeline execution should carry a unique run ID. This lets you trace exactly which run produced which records and clean up partial runs.

python
import uuid
from datetime import datetime

def generate_run_id(pipeline_name: str, target_date: str) -> str:
    """Generate a deterministic run ID for a given pipeline and date.

    Same pipeline + date always produces the same run ID.
    This means re-running Tuesday's pipeline on Wednesday
    still targets Tuesday's data.
    """
    raw = f"{pipeline_name}:{target_date}"
    return hashlib.sha256(raw.encode()).hexdigest()[:12]


class PipelineRun:
    """Track a single pipeline execution."""

    def __init__(self, pipeline_name: str, target_date: str):
        self.pipeline_name = pipeline_name
        self.target_date = target_date
        self.run_id = generate_run_id(pipeline_name, target_date)
        self.started_at = datetime.utcnow()

    def __repr__(self):
        return f"PipelineRun({self.pipeline_name}, {self.target_date}, run={self.run_id})"

A deterministic run ID means re-running the same pipeline for the same date always produces the same identifier. This is the key to replacing old results instead of duplicating them.

# Step 3: Stage Before You Load

Never write directly to your production table. Write to a staging table first, then merge.

python
import pandas as pd
from sqlalchemy import create_engine, text
import logging

logger = logging.getLogger("pipeline")

class StagingLoader:
    """Load data through a staging table for safe, idempotent writes."""

    def __init__(self, engine, schema="staging"):
        self.engine = engine
        self.schema = schema

    def write_to_staging(self, df: pd.DataFrame, table_name: str, run_id: str):
        """Write records to staging, replacing any previous run data."""
        staging_table = f"{self.schema}.{table_name}"

        with self.engine.begin() as conn:
            # Clear previous data from this run (idempotent cleanup)
            conn.execute(
                text(f"DELETE FROM {staging_table} WHERE run_id = :run_id"),
                {"run_id": run_id},
            )

            # Write fresh data
            df.to_sql(
                table_name,
                conn,
                schema=self.schema,
                if_exists="append",
                index=False,
            )

        logger.info(
            f"Staged {len(df)} rows to {staging_table} (run: {run_id})"
        )

    def clear_staging(self, table_name: str, run_id: str):
        """Clean up staging data after successful merge."""
        staging_table = f"{self.schema}.{table_name}"
        with self.engine.begin() as conn:
            conn.execute(
                text(f"DELETE FROM {staging_table} WHERE run_id = :run_id"),
                {"run_id": run_id},
            )
        logger.info(f"Cleared staging: {staging_table} (run: {run_id})")

The DELETE before INSERT pattern means that even if the staging step runs multiple times, the staging table contains exactly one copy of the data.

# Step 4: Upsert — Insert or Update, Never Duplicate

The upsert is the core idempotent write operation. If a record exists, update it. If not, insert it. Either way, one run or ten runs produces the same final state.

# PostgreSQL Upsert

python
from sqlalchemy import text

def upsert_from_staging(engine, staging_table, target_table, key_column, columns):
    """Merge staged data into target using INSERT ON CONFLICT.

    This is the idempotent write — running it multiple times
    produces the same result.
    """
    update_cols = [c for c in columns if c != key_column]
    update_clause = ", ".join(
        f"{col} = EXCLUDED.{col}" for col in update_cols
    )
    col_list = ", ".join(columns)

    query = text(f"""
        INSERT INTO {target_table} ({col_list})
        SELECT {col_list} FROM {staging_table}
        ON CONFLICT ({key_column})
        DO UPDATE SET {update_clause}
    """)

    with engine.begin() as conn:
        result = conn.execute(query)
        logger.info(
            f"Upserted {result.rowcount} rows from {staging_table}{target_table}"
        )
        return result.rowcount

# SQLite Upsert (For Local Development)

python
def upsert_sqlite(engine, staging_table, target_table, key_column, columns):
    """SQLite equivalent using INSERT OR REPLACE."""
    col_list = ", ".join(columns)

    query = text(f"""
        INSERT OR REPLACE INTO {target_table} ({col_list})
        SELECT {col_list} FROM {staging_table}
    """)

    with engine.begin() as conn:
        result = conn.execute(query)
        logger.info(f"Upserted {result.rowcount} rows into {target_table}")
        return result.rowcount

# What Each Run Looks Like

Run Staging rows New rows Updated rows Target total
1st 500 500 0 500
2nd (re-run) 500 0 500 500
3rd (new data) 520 20 500 520

The target table always reflects the correct state, regardless of how many times you run the pipeline.

# Step 5: Deduplication Within a Single Run

Sometimes the source data itself contains duplicates. Handle this before writing to staging.

python
def deduplicate_records(df: pd.DataFrame, key_fields: list[str]) -> pd.DataFrame:
    """Remove duplicate records, keeping the most recent version.

    Uses the deterministic record_id to identify duplicates.
    Keeps the last occurrence (assuming source data is ordered by time).
    """
    before = len(df)
    df = df.drop_duplicates(subset=["record_id"], keep="last")
    after = len(df)

    if before != after:
        logger.warning(f"Removed {before - after} duplicate records from source")

    return df


def deduplicate_against_target(engine, df: pd.DataFrame, target_table: str) -> pd.DataFrame:
    """Remove records that already exist unchanged in the target.

    Only processes records that are genuinely new or modified.
    Reduces write volume on re-runs.
    """
    if df.empty:
        return df

    record_ids = df["record_id"].tolist()
    placeholders = ", ".join([f":id_{i}" for i in range(len(record_ids))])
    params = {f"id_{i}": rid for i, rid in enumerate(record_ids)}

    query = text(
        f"SELECT record_id FROM {target_table} WHERE record_id IN ({placeholders})"
    )

    with engine.connect() as conn:
        existing = set(row[0] for row in conn.execute(query, params))

    new_records = df[~df["record_id"].isin(existing)]
    logger.info(
        f"Dedup check: {len(df)} total, {len(existing)} already exist, "
        f"{len(new_records)} to process"
    )
    return new_records

# Step 6: Verify and Reconcile

After each load, verify that the target contains what you expect. This catches silent data loss.

python
class LoadVerifier:
    """Verify data integrity after pipeline loads."""

    def __init__(self, engine):
        self.engine = engine

    def verify_row_counts(self, staging_table, target_table, run_id):
        """Check that all staged records made it to the target."""
        with self.engine.connect() as conn:
            staged = conn.execute(
                text(f"SELECT COUNT(*) FROM {staging_table} WHERE run_id = :run_id"),
                {"run_id": run_id},
            ).scalar()

            # Count records in target that match this run's record_ids
            target_count = conn.execute(
                text(f"""
                    SELECT COUNT(*) FROM {target_table}
                    WHERE record_id IN (
                        SELECT record_id FROM {staging_table}
                        WHERE run_id = :run_id
                    )
                """),
                {"run_id": run_id},
            ).scalar()

        if staged != target_count:
            raise DataIntegrityError(
                f"Row count mismatch: {staged} staged, {target_count} in target"
            )

        logger.info(f"Verified: {target_count} rows match between staging and target")
        return True

    def check_no_duplicates(self, target_table, key_column):
        """Verify no duplicates exist in the target table."""
        with self.engine.connect() as conn:
            dupes = conn.execute(
                text(f"""
                    SELECT {key_column}, COUNT(*) as cnt
                    FROM {target_table}
                    GROUP BY {key_column}
                    HAVING COUNT(*) > 1
                    LIMIT 10
                """)
            ).fetchall()

        if dupes:
            raise DataIntegrityError(
                f"Found {len(dupes)} duplicate keys in {target_table}: "
                f"{[row[0] for row in dupes]}"
            )

        logger.info(f"Verified: no duplicates in {target_table}")
        return True


class DataIntegrityError(Exception):
    """Raised when data verification fails."""
    pass

# Step 7: The Complete Idempotent Pipeline

Wire everything together into a pipeline that is safe to run, re-run, and run again.

python
class IdempotentPipeline:
    """A pipeline that produces the same result regardless of how many times it runs."""

    def __init__(self, name: str, engine, key_fields: list[str]):
        self.name = name
        self.engine = engine
        self.key_fields = key_fields
        self.staging = StagingLoader(engine)
        self.verifier = LoadVerifier(engine)

    def run(self, target_date: str, extract_func, transform_func=None):
        """Execute the full idempotent pipeline.

        Safe to call multiple times for the same target_date.
        """
        run = PipelineRun(self.name, target_date)
        logger.info(f"Starting {run}")

        # 1. Extract
        logger.info(f"[{run.run_id}] Extracting data for {target_date}")
        raw_df = extract_func(target_date)
        logger.info(f"[{run.run_id}] Extracted {len(raw_df)} rows")

        # 2. Tag with deterministic IDs
        tagged_df = tag_records(raw_df, self.key_fields, run.run_id)

        # 3. Transform (if provided)
        if transform_func:
            tagged_df = transform_func(tagged_df)

        # 4. Deduplicate within the batch
        clean_df = deduplicate_records(tagged_df, self.key_fields)

        # 5. Stage
        self.staging.write_to_staging(clean_df, self.name, run.run_id)

        # 6. Upsert to target
        columns = clean_df.columns.tolist()
        upsert_from_staging(
            self.engine,
            f"staging.{self.name}",
            f"public.{self.name}",
            "record_id",
            columns,
        )

        # 7. Verify
        self.verifier.verify_row_counts(
            f"staging.{self.name}", f"public.{self.name}", run.run_id
        )
        self.verifier.check_no_duplicates(f"public.{self.name}", "record_id")

        # 8. Clean staging
        self.staging.clear_staging(self.name, run.run_id)

        logger.info(f"Completed {run}{len(clean_df)} rows processed")
        return {"run_id": run.run_id, "rows": len(clean_df)}


# Usage
engine = create_engine("postgresql://user:pass@localhost:5432/analytics")

pipeline = IdempotentPipeline(
    name="daily_orders",
    engine=engine,
    key_fields=["order_id", "line_item_id"],
)

# Run it once
pipeline.run("2026-04-30", extract_orders)

# Run it again — same result, no duplicates
pipeline.run("2026-04-30", extract_orders)

# The Idempotency Checklist

Before considering your pipeline idempotent, verify each point:

Check Question Pattern
Record identity Can you identify the same record across runs? Content-based hash
Write safety Does re-running a write produce the same result? Upsert / INSERT ON CONFLICT
Run isolation Can you tell which run produced which data? Deterministic run IDs
Staging cleanup Does the pipeline clean up after itself? DELETE + INSERT in staging
Source dedup Does the pipeline handle source duplicates? Hash-based deduplication
Target verification Do you verify the final state is correct? Row count + duplicate checks

# What This Replaces

Manual process Idempotent equivalent
Deleting all data and reloading from scratch Upsert — update existing, insert new
Manually checking for duplicates after re-runs Deterministic record IDs + dedup
Hoping the pipeline only runs once Design that is safe at any execution count
Tracking which runs succeeded in a spreadsheet Deterministic run IDs with audit columns
Rebuilding target tables after partial failures Staging + merge — partial runs are harmless

# Next Steps

Idempotency is the foundation that makes every other pipeline pattern safe. Retries, scheduled runs, manual re-runs, and failover recovery all depend on your pipeline producing consistent results regardless of execution count.

For adding retry logic and automatic recovery, see How to Build Self-Healing Data Pipelines That Recover from Failures. For testing that your pipelines actually behave correctly under re-runs, see How to Test Your Data Pipelines with pytest. For scheduling pipelines that might need re-triggering, see How to Schedule and Orchestrate Multi-Step Workflows with Prefect. For deploying pipelines in containers where restarts are routine, see Containerizing Your Python Pipelines with Docker.

Automation services include designing idempotent pipeline systems that are safe to retry, re-run, and recover without data corruption.

Get in touch to discuss making your data pipelines safe to re-run.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

idempotent data pipelinesafe rerun pipeline pythonupsert python pipelinedata deduplication pythonidempotency patterns data engineeringinsert on conflict pythonstaging table pipelinedeterministic data pipelinepipeline retry safererunnable etl python