How to Add Structured Logging to Python Data Pipelines
Replace print statements with structured logging that makes debugging production pipelines fast. Covers Python logging, structlog, JSON output, and correlation IDs.

Every data pipeline starts with print(). Every production incident ends with someone wishing they had proper logging instead.
When a pipeline fails at 3 AM, the difference between a five-minute fix and a two-hour investigation is how much context your logs carry. Structured logging replaces wall-of-text output with queryable, machine-readable records that tell you exactly what happened, where, and why.
This guide builds a structured logging setup for Python data pipelines — from replacing print statements to production-ready JSON logs with correlation IDs.
# Architecture Overview
flowchart LR P["Pipeline Step"] --> L["structlog"] L --> C["Console\n(dev, human-readable)"] L --> J["JSON\n(prod, machine-readable)"] J --> A["Log Aggregator\n(ELK / CloudWatch)"] A --> D["Dashboards\n& Alerts"]
In development, logs render as coloured human-readable output. In production, the same code emits JSON that log aggregators can parse, index, and alert on.
# What You Will Need
pip install structlog python-json-logger
- structlog — structured logging library with context binding
- python-json-logger — JSON formatter for the standard library
# The Problem: Why Print Statements Fail
Print statements seem fine until they do not:
# Typical pipeline output (broken in production)
print("Starting extraction...")
print(f"Got {len(rows)} rows")
print("Transform complete")
print("ERROR: connection timed out") # which connection? which step?
When this runs in production across three pipelines simultaneously, the output becomes an interleaved mess with no timestamps, no severity levels, and no context.
# What You Actually Need
2026-05-05T07:15:32Z [INFO] extraction.started pipeline_id=daily-sales source=shopify_api
2026-05-05T07:15:34Z [INFO] extraction.complete pipeline_id=daily-sales rows=2847 duration_ms=1823
2026-05-05T07:15:35Z [ERROR] load.failed pipeline_id=daily-sales table=orders error="connection timed out" retry_attempt=1
Every line answers: what happened, when, where in the pipeline, and how severe it is.
# Step 1: Replace Print with Standard Logging
The first improvement costs nothing — Python's logging module is built in.
import logging
# Configure once at the top of your script
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%SZ",
)
logger = logging.getLogger("pipeline")
# Replace print() calls
logger.info("Extraction started")
logger.info("Got %d rows", len(rows))
logger.error("Connection timed out", exc_info=True)
This gives you timestamps, severity levels, and logger names. But the messages are still unstructured strings — hard to parse, filter, or aggregate.
# Step 2: Structured Logging with structlog
structlog lets you bind key-value context to every log line:
import structlog
# Configure structlog for development (human-readable)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# Logging with Context
# Bind context that persists across log calls
log = logger.bind(pipeline_id="daily-sales", source="shopify_api")
log.info("extraction.started")
# => 2026-05-05T07:15:32Z [info] extraction.started pipeline_id=daily-sales source=shopify_api
log.info("extraction.complete", rows=2847, duration_ms=1823)
# => 2026-05-05T07:15:34Z [info] extraction.complete pipeline_id=daily-sales source=shopify_api rows=2847 duration_ms=1823
log.error("load.failed", table="orders", error="connection timed out")
# => 2026-05-05T07:15:35Z [error] load.failed pipeline_id=daily-sales source=shopify_api table=orders error=connection timed out
The bind() method attaches context once. Every subsequent log call includes it automatically — no string formatting, no forgetting to include the pipeline name.
# Step 3: JSON Output for Production
Switch from human-readable to JSON by changing one processor:
import structlog
import logging
import sys
def configure_logging(environment="development"):
"""Configure structured logging based on environment."""
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
if environment == "production":
# JSON output for log aggregators
shared_processors.append(structlog.processors.JSONRenderer())
else:
# Coloured console output for development
shared_processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(
processors=shared_processors,
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
# Development Output
2026-05-05T07:15:32Z [info ] extraction.started pipeline_id=daily-sales source=shopify_api
# Production Output (JSON)
{"event": "extraction.started", "level": "info", "timestamp": "2026-05-05T07:15:32Z", "pipeline_id": "daily-sales", "source": "shopify_api"}
Same code, same log calls — the output format changes based on environment. JSON lines are parseable by CloudWatch, ELK Stack, Datadog, and every other log aggregator.
# Step 4: Correlation IDs Across Pipeline Steps
When multiple pipelines run simultaneously, you need a way to trace a single execution across all its log lines:
import uuid
import structlog
from contextvars import ContextVar
# Store correlation ID in context variable (thread/async safe)
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def start_pipeline_run(pipeline_name):
"""Generate a correlation ID and bind it to all subsequent logs."""
run_id = str(uuid.uuid4())[:8]
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
correlation_id=run_id,
pipeline=pipeline_name,
)
return run_id
# Using Correlation IDs
logger = structlog.get_logger()
def run_daily_sales_pipeline():
run_id = start_pipeline_run("daily-sales")
logger.info("pipeline.started")
rows = extract_from_api()
logger.info("extraction.complete", rows=len(rows))
cleaned = transform(rows)
logger.info("transform.complete", rows_in=len(rows), rows_out=len(cleaned))
load_to_warehouse(cleaned)
logger.info("pipeline.complete")
Every log line from this execution shares the same correlation_id, even across function calls and modules:
{"event": "pipeline.started", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "timestamp": "..."}
{"event": "extraction.complete", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "rows": 2847, "timestamp": "..."}
{"event": "transform.complete", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "rows_in": 2847, "rows_out": 2831, "timestamp": "..."}
{"event": "pipeline.complete", "correlation_id": "a1b2c3d4", "pipeline": "daily-sales", "timestamp": "..."}
To find everything about one pipeline run: grep "a1b2c3d4" or query correlation_id = "a1b2c3d4" in your log aggregator.
# Step 5: Timing and Performance Metrics
Add automatic duration tracking to pipeline steps:
import time
import structlog
from contextlib import contextmanager
logger = structlog.get_logger()
@contextmanager
def log_duration(step_name, **extra_context):
"""Log the duration of a pipeline step."""
log = logger.bind(step=step_name, **extra_context)
log.info(f"{step_name}.started")
start = time.perf_counter()
try:
yield log
except Exception as exc:
duration_ms = round((time.perf_counter() - start) * 1000)
log.error(
f"{step_name}.failed",
duration_ms=duration_ms,
error=str(exc),
error_type=type(exc).__name__,
)
raise
else:
duration_ms = round((time.perf_counter() - start) * 1000)
log.info(f"{step_name}.complete", duration_ms=duration_ms)
# Usage
def run_pipeline():
start_pipeline_run("daily-sales")
with log_duration("extract", source="shopify_api") as log:
rows = extract_from_api()
log.info("rows.fetched", count=len(rows))
with log_duration("transform"):
cleaned = transform(rows)
with log_duration("load", target="warehouse"):
load_to_warehouse(cleaned)
Output:
extract.started source=shopify_api
rows.fetched source=shopify_api count=2847
extract.complete source=shopify_api duration_ms=1823
transform.started
transform.complete duration_ms=342
load.started target=warehouse
load.complete target=warehouse duration_ms=567
# Step 6: Error Context That Actually Helps
When errors happen, log the context needed to reproduce and fix the issue:
import structlog
logger = structlog.get_logger()
def process_batch(batch, batch_number, total_batches):
"""Process a batch of records with rich error context."""
log = logger.bind(
batch_number=batch_number,
total_batches=total_batches,
batch_size=len(batch),
)
for i, record in enumerate(batch):
try:
result = transform_record(record)
except Exception as exc:
log.error(
"record.transform_failed",
record_index=i,
record_id=record.get("id", "unknown"),
error=str(exc),
error_type=type(exc).__name__,
# Include a sample of the problematic data (never log secrets)
record_keys=list(record.keys()),
)
continue # skip bad records, log and move on
log.info("batch.complete", processed=len(batch))
# What This Logs on Failure
{
"event": "record.transform_failed",
"batch_number": 3,
"total_batches": 10,
"batch_size": 500,
"record_index": 247,
"record_id": "ord_8842",
"error": "invalid literal for int() with base 10: 'N/A'",
"error_type": "ValueError",
"record_keys": ["id", "amount", "currency", "date"]
}
This tells you: which batch, which record, what the error was, and what the record looks like — without dumping sensitive data.
# Wiring It All Together
Here is the full logging configuration for a production pipeline:
import logging
import os
import structlog
def setup_pipeline_logging():
"""Configure structured logging for the pipeline."""
env = os.environ.get("ENVIRONMENT", "development")
log_level = os.environ.get("LOG_LEVEL", "INFO")
# Shared processors for all environments
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
]
if env == "production":
shared_processors.append(structlog.processors.JSONRenderer())
else:
shared_processors.append(
structlog.dev.ConsoleRenderer(colors=True)
)
structlog.configure(
processors=shared_processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper(), logging.INFO)
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
return structlog.get_logger()
# Running a Pipeline with Full Logging
def main():
logger = setup_pipeline_logging()
run_id = start_pipeline_run("daily-sales")
logger.info("pipeline.initialised", run_id=run_id)
try:
with log_duration("extract", source="shopify_api"):
rows = extract_from_api()
with log_duration("transform"):
cleaned = transform(rows)
with log_duration("load", target="warehouse"):
load_to_warehouse(cleaned)
logger.info("pipeline.success", total_rows=len(cleaned))
except Exception as exc:
logger.error(
"pipeline.failed",
error=str(exc),
error_type=type(exc).__name__,
exc_info=True,
)
raise
if __name__ == "__main__":
main()
# Log Level Reference
| Level | When to Use | Example |
|---|---|---|
| DEBUG | Internal state during development | debug("cache.check", key="orders_2026", hit=True) |
| INFO | Normal pipeline events | info("extraction.complete", rows=2847) |
| WARNING | Recoverable issues | warning("retry.attempt", attempt=2, max=5) |
| ERROR | Failed operations | error("load.failed", table="orders") |
| CRITICAL | Pipeline cannot continue | critical("database.unreachable") |
Use INFO as your baseline. If your logs are too noisy, you have too many INFO messages — move some to DEBUG.
# What This Replaces
| Before (print statements) | After (structured logging) |
|---|---|
| No timestamps | ISO 8601 timestamps on every line |
| No severity levels | DEBUG / INFO / WARNING / ERROR / CRITICAL |
| Interleaved output from multiple runs | Correlation IDs isolate each run |
| No duration tracking | Automatic timing with log_duration |
| String formatting for context | Key-value pairs, machine-parseable |
| Debugging by reading scrollback | Querying by field in log aggregator |
# Next Steps
Structured logging is the foundation for pipeline observability. Once your logs carry context, you can:
- Set up self-healing pipelines that react to logged errors automatically
- Build notification systems triggered by log severity levels
- Add log-based alerts to your orchestration workflows
- Feed structured logs into the dashboards your team already uses
If your automation scripts still use print(), start with Step 1. The migration is incremental — each step makes debugging measurably easier.
Need help setting up observability for your data pipelines? Get in touch or explore our automation services.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.