How to Add Structured Logging to Python Data Pipelines

·7 min read·Automation

Replace print statements with structured logging that makes debugging production pipelines fast. Covers Python logging, structlog, JSON output, and correlation IDs.

How to Add Structured Logging to Python Data Pipelines

Every data pipeline starts with print(). Every production incident ends with someone wishing they had proper logging instead.

When a pipeline fails at 3 AM, the difference between a five-minute fix and a two-hour investigation is how much context your logs carry. Structured logging replaces wall-of-text output with queryable, machine-readable records that tell you exactly what happened, where, and why.

This guide builds a structured logging setup for Python data pipelines — from replacing print statements to production-ready JSON logs with correlation IDs.

# Architecture Overview

flowchart LR
  P["Pipeline Step"] --> L["structlog"]
  L --> C["Console\n(dev, human-readable)"]
  L --> J["JSON\n(prod, machine-readable)"]
  J --> A["Log Aggregator\n(ELK / CloudWatch)"]
  A --> D["Dashboards\n& Alerts"]

In development, logs render as coloured human-readable output. In production, the same code emits JSON that log aggregators can parse, index, and alert on.

# What You Will Need

bash
pip install structlog python-json-logger
  • structlog — structured logging library with context binding
  • python-json-logger — JSON formatter for the standard library

# The Problem: Why Print Statements Fail

Print statements seem fine until they do not:

python
# Typical pipeline output (broken in production)
print("Starting extraction...")
print(f"Got {len(rows)} rows")
print("Transform complete")
print("ERROR: connection timed out")  # which connection? which step?

When this runs in production across three pipelines simultaneously, the output becomes an interleaved mess with no timestamps, no severity levels, and no context.

# What You Actually Need

text
2026-05-05T07:15:32Z [INFO] extraction.started pipeline_id=daily-sales source=shopify_api
2026-05-05T07:15:34Z [INFO] extraction.complete pipeline_id=daily-sales rows=2847 duration_ms=1823
2026-05-05T07:15:35Z [ERROR] load.failed pipeline_id=daily-sales table=orders error="connection timed out" retry_attempt=1

Every line answers: what happened, when, where in the pipeline, and how severe it is.

# Step 1: Replace Print with Standard Logging

The first improvement costs nothing — Python's logging module is built in.

python
import logging

# Configure once at the top of your script
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%SZ",
)

logger = logging.getLogger("pipeline")

# Replace print() calls
logger.info("Extraction started")
logger.info("Got %d rows", len(rows))
logger.error("Connection timed out", exc_info=True)

This gives you timestamps, severity levels, and logger names. But the messages are still unstructured strings — hard to parse, filter, or aggregate.

# Step 2: Structured Logging with structlog

structlog lets you bind key-value context to every log line:

python
import structlog

# Configure structlog for development (human-readable)
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

# Logging with Context

python
# Bind context that persists across log calls
log = logger.bind(pipeline_id="daily-sales", source="shopify_api")

log.info("extraction.started")
# => 2026-05-05T07:15:32Z [info] extraction.started pipeline_id=daily-sales source=shopify_api

log.info("extraction.complete", rows=2847, duration_ms=1823)
# => 2026-05-05T07:15:34Z [info] extraction.complete pipeline_id=daily-sales source=shopify_api rows=2847 duration_ms=1823

log.error("load.failed", table="orders", error="connection timed out")
# => 2026-05-05T07:15:35Z [error] load.failed pipeline_id=daily-sales source=shopify_api table=orders error=connection timed out

The bind() method attaches context once. Every subsequent log call includes it automatically — no string formatting, no forgetting to include the pipeline name.

# Step 3: JSON Output for Production

Switch from human-readable to JSON by changing one processor:

python
import structlog
import logging
import sys

def configure_logging(environment="development"):
    """Configure structured logging based on environment."""

    shared_processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
    ]

    if environment == "production":
        # JSON output for log aggregators
        shared_processors.append(structlog.processors.JSONRenderer())
    else:
        # Coloured console output for development
        shared_processors.append(structlog.dev.ConsoleRenderer())

    structlog.configure(
        processors=shared_processors,
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
    )

# Development Output

text
2026-05-05T07:15:32Z [info     ] extraction.started     pipeline_id=daily-sales source=shopify_api

# Production Output (JSON)

json
{"event": "extraction.started", "level": "info", "timestamp": "2026-05-05T07:15:32Z", "pipeline_id": "daily-sales", "source": "shopify_api"}

Same code, same log calls — the output format changes based on environment. JSON lines are parseable by CloudWatch, ELK Stack, Datadog, and every other log aggregator.

# Step 4: Correlation IDs Across Pipeline Steps

When multiple pipelines run simultaneously, you need a way to trace a single execution across all its log lines:

python
import uuid
import structlog
from contextvars import ContextVar

# Store correlation ID in context variable (thread/async safe)
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def start_pipeline_run(pipeline_name):
    """Generate a correlation ID and bind it to all subsequent logs."""
    run_id = str(uuid.uuid4())[:8]
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        correlation_id=run_id,
        pipeline=pipeline_name,
    )
    return run_id

# Using Correlation IDs

python
logger = structlog.get_logger()

def run_daily_sales_pipeline():
    run_id = start_pipeline_run("daily-sales")
    logger.info("pipeline.started")

    rows = extract_from_api()
    logger.info("extraction.complete", rows=len(rows))

    cleaned = transform(rows)
    logger.info("transform.complete", rows_in=len(rows), rows_out=len(cleaned))

    load_to_warehouse(cleaned)
    logger.info("pipeline.complete")

Every log line from this execution shares the same correlation_id, even across function calls and modules:

json
{"event": "pipeline.started", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "timestamp": "..."}
{"event": "extraction.complete", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "rows": 2847, "timestamp": "..."}
{"event": "transform.complete", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "rows_in": 2847, "rows_out": 2831, "timestamp": "..."}
{"event": "pipeline.complete", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "timestamp": "..."}

To find everything about one pipeline run: grep "a1b2c3d4" or query correlation_id = "a1b2c3d4" in your log aggregator.

# Step 5: Timing and Performance Metrics

Add automatic duration tracking to pipeline steps:

python
import time
import structlog
from contextlib import contextmanager

logger = structlog.get_logger()

@contextmanager
def log_duration(step_name, **extra_context):
    """Log the duration of a pipeline step."""
    log = logger.bind(step=step_name, **extra_context)
    log.info(f"{step_name}.started")
    start = time.perf_counter()

    try:
        yield log
    except Exception as exc:
        duration_ms = round((time.perf_counter() - start) * 1000)
        log.error(
            f"{step_name}.failed",
            duration_ms=duration_ms,
            error=str(exc),
            error_type=type(exc).__name__,
        )
        raise
    else:
        duration_ms = round((time.perf_counter() - start) * 1000)
        log.info(f"{step_name}.complete", duration_ms=duration_ms)

# Usage

python
def run_pipeline():
    start_pipeline_run("daily-sales")

    with log_duration("extract", source="shopify_api") as log:
        rows = extract_from_api()
        log.info("rows.fetched", count=len(rows))

    with log_duration("transform"):
        cleaned = transform(rows)

    with log_duration("load", target="warehouse"):
        load_to_warehouse(cleaned)

Output:

text
extract.started       source=shopify_api
rows.fetched          source=shopify_api count=2847
extract.complete      source=shopify_api duration_ms=1823
transform.started
transform.complete    duration_ms=342
load.started          target=warehouse
load.complete         target=warehouse duration_ms=567

# Step 6: Error Context That Actually Helps

When errors happen, log the context needed to reproduce and fix the issue:

python
import structlog

logger = structlog.get_logger()

def process_batch(batch, batch_number, total_batches):
    """Process a batch of records with rich error context."""
    log = logger.bind(
        batch_number=batch_number,
        total_batches=total_batches,
        batch_size=len(batch),
    )

    for i, record in enumerate(batch):
        try:
            result = transform_record(record)
        except Exception as exc:
            log.error(
                "record.transform_failed",
                record_index=i,
                record_id=record.get("id", "unknown"),
                error=str(exc),
                error_type=type(exc).__name__,
                # Include a sample of the problematic data (never log secrets)
                record_keys=list(record.keys()),
            )
            continue  # skip bad records, log and move on

    log.info("batch.complete", processed=len(batch))

# What This Logs on Failure

json
{
  "event": "record.transform_failed",
  "batch_number": 3,
  "total_batches": 10,
  "batch_size": 500,
  "record_index": 247,
  "record_id": "ord_8842",
  "error": "invalid literal for int() with base 10: 'N/A'",
  "error_type": "ValueError",
  "record_keys": ["id", "amount", "currency", "date"]
}

This tells you: which batch, which record, what the error was, and what the record looks like — without dumping sensitive data.

# Wiring It All Together

Here is the full logging configuration for a production pipeline:

python
import logging
import os
import structlog

def setup_pipeline_logging():
    """Configure structured logging for the pipeline."""
    env = os.environ.get("ENVIRONMENT", "development")
    log_level = os.environ.get("LOG_LEVEL", "INFO")

    # Shared processors for all environments
    shared_processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
    ]

    if env == "production":
        shared_processors.append(structlog.processors.JSONRenderer())
    else:
        shared_processors.append(
            structlog.dev.ConsoleRenderer(colors=True)
        )

    structlog.configure(
        processors=shared_processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper(), logging.INFO)
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

    return structlog.get_logger()

# Running a Pipeline with Full Logging

python
def main():
    logger = setup_pipeline_logging()
    run_id = start_pipeline_run("daily-sales")
    logger.info("pipeline.initialised", run_id=run_id)

    try:
        with log_duration("extract", source="shopify_api"):
            rows = extract_from_api()

        with log_duration("transform"):
            cleaned = transform(rows)

        with log_duration("load", target="warehouse"):
            load_to_warehouse(cleaned)

        logger.info("pipeline.success", total_rows=len(cleaned))

    except Exception as exc:
        logger.error(
            "pipeline.failed",
            error=str(exc),
            error_type=type(exc).__name__,
            exc_info=True,
        )
        raise

if __name__ == "__main__":
    main()

# Log Level Reference

Level When to Use Example
DEBUG Internal state during development debug("cache.check", key="orders_2026", hit=True)
INFO Normal pipeline events info("extraction.complete", rows=2847)
WARNING Recoverable issues warning("retry.attempt", attempt=2, max=5)
ERROR Failed operations error("load.failed", table="orders")
CRITICAL Pipeline cannot continue critical("database.unreachable")

Use INFO as your baseline. If your logs are too noisy, you have too many INFO messages — move some to DEBUG.

# What This Replaces

Before (print statements) After (structured logging)
No timestamps ISO 8601 timestamps on every line
No severity levels DEBUG / INFO / WARNING / ERROR / CRITICAL
Interleaved output from multiple runs Correlation IDs isolate each run
No duration tracking Automatic timing with log_duration
String formatting for context Key-value pairs, machine-parseable
Debugging by reading scrollback Querying by field in log aggregator

# Next Steps

Structured logging is the foundation for pipeline observability. Once your logs carry context, you can:

If your automation scripts still use print(), start with Step 1. The migration is incremental — each step makes debugging measurably easier.

Need help setting up observability for your data pipelines? Get in touch or explore our automation services.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

python structured loggingstructlog python pipelinepython logging best practicesjson logging pythonpipeline observability pythoncorrelation id loggingpython logging configurationdebug data pipeline pythonstructured log format pythonmonitoring python automation