Build an LLM-Powered Data Pipeline with Python and OpenAI
Build a production data pipeline that uses OpenAI's API to classify, extract, and enrich unstructured text — with structured output parsing, cost controls, rate limiting, and fallback logic so it actually works at scale.
AI Generated ImageEvery pipeline eventually hits a wall where the data is too messy for rules. Customer feedback is free text. Support tickets have no consistent format. Product descriptions from suppliers arrive in three languages with typos. You can write regex for a week or you can let a language model handle it.
But calling an LLM inside a data pipeline is not the same as calling it from a chat window. You need structured output, not prose. You need cost tracking, not open-ended billing. You need retry logic for rate limits, validation for bad responses, and fallback paths when the model hallucinates. Most "LLM pipeline" tutorials skip all of this.
This guide builds a production pipeline that processes unstructured text through OpenAI's API — classification, extraction, and enrichment — with every guardrail you need to run it on real data at scale.
# Who This Is For
- Data engineers who have unstructured text fields sitting in warehouses that nobody can query
- Developers building classification or extraction features who want to use LLMs without the unpredictability
- Teams processing customer feedback, support tickets, or supplier data that arrives in inconsistent formats
- Vibe coders who have played with ChatGPT and want to embed that capability into an actual pipeline
Basic Python and a passing familiarity with APIs is all you need. No ML background required — the LLM does the heavy lifting.
# Pipeline Architecture
flowchart LR SRC["Source Data\n(CSV / DB / API)"] --> PRE["Pre-process\n& Batch"] PRE --> RL["Rate Limiter"] RL --> LLM["OpenAI API\n(structured output)"] LLM --> VAL["Validate\n(Pydantic)"] VAL -->|Valid| OUT["Output\n(DB / Parquet)"] VAL -->|Invalid| DLQ["Dead Letter\nQueue"] DLQ --> LLM RL -.-> COST["Cost Tracker\n(token budget)"] COST -.->|Budget hit| STOP["Halt Pipeline"]
The key difference from a normal pipeline: the LLM stage is non-deterministic. Same input can produce different output. That means validation is not optional — it is the only thing between you and garbage data landing in your warehouse.
# What You Will Need
pip install openai pandas pydantic tenacity tiktoken
- openai — official Python client (v1.x)
- pandas — data wrangling
- pydantic — response validation and schema enforcement
- tenacity — retry logic for rate limits and transient errors
- tiktoken — token counting for cost estimation
You will also need an OpenAI API key. Set it as an environment variable:
export OPENAI_API_KEY="sk-..."
# Step 1: Define Your Output Schema
The most important decision in the entire pipeline. If you do not define what the LLM should return, you will spend all your time parsing free-text responses.
from pydantic import BaseModel, Field
from enum import Enum
class Sentiment(str, Enum):
positive = "positive"
negative = "negative"
neutral = "neutral"
mixed = "mixed"
class FeedbackCategory(str, Enum):
product_quality = "product_quality"
shipping = "shipping"
customer_service = "customer_service"
pricing = "pricing"
website_ux = "website_ux"
other = "other"
class FeedbackAnalysis(BaseModel):
"""What we expect back from the LLM for each feedback record."""
sentiment: Sentiment
category: FeedbackCategory
key_issues: list[str] = Field(
description="Specific problems mentioned, max 3",
max_length=3,
)
action_required: bool = Field(
description="True if this needs a human response",
)
summary: str = Field(
description="One sentence summary of the feedback",
max_length=200,
)
This is a Pydantic model, not a prompt. The LLM sees it as a JSON schema and returns structured output that we can validate automatically. If the response does not match, we know immediately.
# Step 2: Build the Prompt Template
Prompt engineering for pipelines is different from chat prompts. You want consistency, not creativity.
SYSTEM_PROMPT = """You are a data processing assistant. Your job is to analyse
customer feedback and return structured JSON. Be precise and consistent.
Rules:
- Classify sentiment based on overall tone, not individual words
- Pick the single most relevant category
- Extract only issues explicitly mentioned — do not infer
- Set action_required to true only if the customer expects a response
- Keep summaries factual, no editorialising"""
def build_user_prompt(text: str) -> str:
"""Format a feedback record for the LLM.
Strips whitespace and truncates to avoid blowing the context window
on a single record.
"""
cleaned = " ".join(text.split())
# 2000 chars is ~500 tokens, plenty for a feedback record
if len(cleaned) > 2000:
cleaned = cleaned[:2000] + "..."
return f"Analyse this customer feedback:\n\n{cleaned}"
Two things to notice. First, the system prompt sets rules, not vibes. "Do not infer" is specific. "Be helpful" is useless. Second, the user prompt truncates long input — one verbose customer review should not eat your entire token budget.
# Step 3: Rate Limiting and Cost Tracking
OpenAI enforces rate limits per model. Hit them and you get 429 errors. Worse, if you are processing 50,000 records, an uncapped pipeline can rack up hundreds of dollars before you notice.
import time
import tiktoken
import logging
logger = logging.getLogger("llm_pipeline")
class CostTracker:
"""Track token usage and enforce budget limits."""
# pricing per 1M tokens as of mid-2026
MODEL_COSTS = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
}
def __init__(self, model: str, budget_usd: float = 10.0):
self.model = model
self.budget = budget_usd
self.input_tokens = 0
self.output_tokens = 0
costs = self.MODEL_COSTS.get(model)
if not costs:
raise ValueError(f"Unknown model: {model}")
self.input_cost_per_token = costs["input"] / 1_000_000
self.output_cost_per_token = costs["output"] / 1_000_000
@property
def total_cost(self) -> float:
return (
self.input_tokens * self.input_cost_per_token
+ self.output_tokens * self.output_cost_per_token
)
def record(self, usage):
"""Record token usage from an API response."""
self.input_tokens += usage.prompt_tokens
self.output_tokens += usage.completion_tokens
if self.total_cost >= self.budget:
raise BudgetExceeded(
f"Budget of ${self.budget:.2f} exceeded — "
f"spent ${self.total_cost:.2f} after "
f"{self.input_tokens + self.output_tokens:,} tokens"
)
def summary(self) -> dict:
return {
"model": self.model,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"total_cost_usd": round(self.total_cost, 4),
"budget_remaining_usd": round(self.budget - self.total_cost, 4),
}
class BudgetExceeded(Exception):
pass
class RateLimiter:
"""Simple token bucket rate limiter for API calls."""
def __init__(self, requests_per_minute: int = 500):
self.rpm = requests_per_minute
self.interval = 60.0 / requests_per_minute
self.last_call = 0.0
def wait(self):
"""Block until we are allowed to make the next request."""
now = time.monotonic()
elapsed = now - self.last_call
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
self.last_call = time.monotonic()
The CostTracker raises an exception when the budget is hit. This is intentional — you want the pipeline to stop, not silently keep billing. The RateLimiter is deliberately simple. For most pipelines, a fixed delay between requests is enough. If you need adaptive rate limiting, use the response headers from OpenAI.
# Step 4: The LLM Processing Stage
This is the core of the pipeline — calling the model, parsing the response, and handling failures.
import json
from openai import OpenAI
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
client = OpenAI() # reads OPENAI_API_KEY from env
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((Exception,)),
)
def call_llm(text: str, model: str = "gpt-4o-mini") -> dict:
"""Send a single record to the LLM and get structured output back.
Retries on transient errors (rate limits, timeouts).
Raises on persistent failures after 3 attempts.
"""
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": build_user_prompt(text)},
],
response_format={"type": "json_object"},
temperature=0.1, # low temp = more consistent output
max_tokens=300,
)
raw = response.choices[0].message.content
parsed = json.loads(raw)
return parsed, response.usage
Temperature is set to 0.1, not 0. Zero temperature sounds good in theory but some models behave oddly at exactly zero. 0.1 gives near-deterministic output while avoiding edge cases.
We ask for json_object response format. This forces the model to return valid JSON. But valid JSON is not the same as correct JSON — the fields might be wrong, the values might be nonsensical. That is what validation handles.
# Step 5: Validation and Dead Letter Queue
Every LLM response goes through Pydantic validation. Records that fail validation go to a dead letter queue instead of polluting downstream data.
from pydantic import ValidationError
from pathlib import Path
from datetime import datetime
class DeadLetterQueue:
"""Store records that failed processing for later review."""
def __init__(self, path: str = "dead_letters.jsonl"):
self.path = Path(path)
def add(self, record: dict, error: str, attempt: int = 1):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"record": record,
"error": error,
"attempt": attempt,
}
with open(self.path, "a") as f:
f.write(json.dumps(entry) + "\n")
def pending_count(self) -> int:
if not self.path.exists():
return 0
with open(self.path) as f:
return sum(1 for _ in f)
def validate_response(raw: dict) -> FeedbackAnalysis:
"""Validate LLM output against our schema.
Raises ValidationError if the output is malformed.
"""
return FeedbackAnalysis(**raw)
The DLQ is a JSONL file — one JSON object per line. Simple, appendable, easy to grep through. For production systems you might use a database table, but a file works fine for thousands of records.
# Step 6: Batch Processing with Progress Tracking
Processing records one at a time is wasteful. Batching improves throughput and makes cost tracking meaningful.
import pandas as pd
def process_batch(
records: list[dict],
model: str = "gpt-4o-mini",
budget_usd: float = 10.0,
rpm: int = 500,
) -> pd.DataFrame:
"""Process a batch of text records through the LLM pipeline.
Returns a DataFrame of successfully processed records.
Failed records go to the dead letter queue.
"""
cost_tracker = CostTracker(model=model, budget_usd=budget_usd)
rate_limiter = RateLimiter(requests_per_minute=rpm)
dlq = DeadLetterQueue()
results = []
for i, record in enumerate(records):
text = record.get("text", "")
record_id = record.get("id", i)
if not text or len(text.strip()) < 10:
logger.debug(f"Skipping record {record_id}: too short")
continue
try:
rate_limiter.wait()
raw_output, usage = call_llm(text, model=model)
cost_tracker.record(usage)
validated = validate_response(raw_output)
results.append({
"id": record_id,
"original_text": text[:200],
**validated.model_dump(),
})
except BudgetExceeded:
logger.error(f"Budget exceeded at record {i}/{len(records)}")
logger.info(f"Cost summary: {cost_tracker.summary()}")
break
except ValidationError as e:
logger.warning(f"Validation failed for record {record_id}: {e}")
dlq.add(record, error=str(e))
except Exception as e:
logger.error(f"Failed to process record {record_id}: {e}")
dlq.add(record, error=str(e))
if (i + 1) % 100 == 0:
logger.info(
f"Progress: {i + 1}/{len(records)} | "
f"Cost: ${cost_tracker.total_cost:.4f} | "
f"DLQ: {dlq.pending_count()}"
)
logger.info(f"Batch complete: {len(results)} processed, {dlq.pending_count()} failed")
logger.info(f"Final cost: {cost_tracker.summary()}")
return pd.DataFrame(results)
Notice the progress logging every 100 records. When you are processing 10,000 records at 3 seconds each, you want to know where you are. The budget check happens after every record — if you hit your limit, the pipeline stops cleanly and tells you exactly how far it got.
# Step 7: Putting It All Together
A complete pipeline from CSV input to Parquet output:
import sys
def run_pipeline(
input_path: str,
output_path: str,
model: str = "gpt-4o-mini",
budget_usd: float = 10.0,
sample_size: int | None = None,
):
"""Run the full LLM enrichment pipeline.
Args:
input_path: CSV with at least 'id' and 'text' columns
output_path: where to write the enriched parquet file
model: which OpenAI model to use
budget_usd: max spend for this run
sample_size: process a subset for testing (None = all)
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
# load input
df = pd.read_csv(input_path)
logger.info(f"Loaded {len(df)} records from {input_path}")
if sample_size:
df = df.sample(n=min(sample_size, len(df)), random_state=42)
logger.info(f"Sampled {len(df)} records for processing")
records = df.to_dict("records")
# process
results = process_batch(records, model=model, budget_usd=budget_usd)
if results.empty:
logger.error("No records processed successfully")
sys.exit(1)
# save
results.to_parquet(output_path, index=False)
logger.info(f"Saved {len(results)} enriched records to {output_path}")
return results
if __name__ == "__main__":
run_pipeline(
input_path="data/customer_feedback.csv",
output_path="data/feedback_enriched.parquet",
model="gpt-4o-mini",
budget_usd=5.0,
sample_size=50, # start small, validate, then remove the cap
)
Start with sample_size=50. Look at the output. Check the DLQ. Adjust your prompt. Then remove the cap and let it run. Never start with the full dataset — you will waste money on a bad prompt.
# Step 8: Improving Prompt Quality
The first version of your prompt will not be great. Here is how to iterate.
# Check the Dead Letter Queue
def review_dlq(path: str = "dead_letters.jsonl"):
"""Print failed records grouped by error type."""
errors = {}
with open(path) as f:
for line in f:
entry = json.loads(line)
error_type = entry["error"].split("\n")[0]
errors.setdefault(error_type, []).append(entry["record"])
for error_type, records in errors.items():
print(f"\n--- {error_type} ({len(records)} records) ---")
for r in records[:3]:
print(f" Input: {r.get('text', '')[:100]}...")
Common patterns you will find:
| DLQ pattern | Fix |
|---|---|
| Sentiment is a full sentence instead of enum value | Add explicit enum values to prompt |
| key_issues has 10 items instead of max 3 | Add "return at most 3 issues" to prompt |
| category is "product" instead of "product_quality" | List exact allowed values in prompt |
| summary is 500 characters | Add character limit to prompt, not just schema |
# Run a Consistency Check
def consistency_check(df: pd.DataFrame, text_col: str = "original_text"):
"""Spot-check a few records for obvious mistakes."""
# find records where sentiment is positive but action_required is True
suspicious = df[(df["sentiment"] == "positive") & (df["action_required"] == True)]
if len(suspicious) > 0:
logger.warning(
f"{len(suspicious)} records are positive but flagged for action — "
f"review these manually"
)
# check category distribution — if >80% is "other", the categories need work
cat_dist = df["category"].value_counts(normalize=True)
if cat_dist.get("other", 0) > 0.3:
logger.warning(
f"'other' category is {cat_dist['other']:.0%} — "
f"consider adding more specific categories"
)
return suspicious
# Step 9: Scaling Up
Once your prompt is stable and the DLQ rate is below 5%, you can scale.
# Async Processing for Higher Throughput
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def process_record_async(
record: dict, semaphore: asyncio.Semaphore, model: str = "gpt-4o-mini"
) -> dict | None:
"""Process a single record with concurrency control."""
async with semaphore:
text = record.get("text", "")
try:
response = await async_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": build_user_prompt(text)},
],
response_format={"type": "json_object"},
temperature=0.1,
max_tokens=300,
)
raw = json.loads(response.choices[0].message.content)
validated = validate_response(raw)
return {"id": record["id"], **validated.model_dump()}
except Exception as e:
logger.error(f"Record {record.get('id')} failed: {e}")
return None
async def process_batch_async(records: list[dict], concurrency: int = 10):
"""Process records concurrently with a semaphore to control parallelism."""
sem = asyncio.Semaphore(concurrency)
tasks = [process_record_async(r, sem) for r in records]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
The semaphore limits concurrent requests. Start with 10, increase until you start hitting rate limits, then back off. Do not set this to 100 — you will get rate limited instantly and waste retries.
# Model Selection by Record Complexity
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o-mini")
def pick_model(text: str) -> str:
"""Use the cheap model for simple records, expensive one for complex ones.
Short, clear feedback goes to gpt-4o-mini.
Long, ambiguous text gets gpt-4o for better classification.
"""
token_count = len(enc.encode(text))
has_multiple_topics = text.count(".") > 8 # rough heuristic
is_long = token_count > 300
if is_long and has_multiple_topics:
return "gpt-4o"
return "gpt-4o-mini"
This is a rough heuristic. In practice, you should look at your DLQ — if certain types of records consistently fail with the cheap model, route those to the expensive one. There is no point paying for GPT-4o on "Great product, fast shipping".
# What This Replaces
| Manual approach | LLM pipeline equivalent |
|---|---|
| Reading feedback one by one and tagging manually | Automated classification with structured output |
| Regex-based sentiment analysis | Context-aware sentiment that handles sarcasm and nuance |
| Outsourcing data labelling to a team | LLM processing at a fraction of the cost and time |
| Ignoring unstructured text fields entirely | Turning text into queryable columns |
| Building a custom NLP model for each task | One pipeline that handles classification, extraction, and enrichment |
# Next Steps
For building the data pipelines that feed into this LLM stage, see How to Design Data Pipelines for Reliable Reporting. For adding retry and recovery logic beyond what tenacity provides, see Build Self-Healing Data Pipelines That Recover from Failures. For securing your API keys in production, see Python Secrets Management for Automation Pipelines.
Automation services include building LLM-powered data processing pipelines with cost controls, validation, and production monitoring.
Get in touch to discuss adding LLM enrichment to your data pipelines.
Frequently Asked Questions
- How much does it cost to run an LLM data pipeline?
- Costs depend on model choice and volume. GPT-4o-mini processes roughly 1,000 short documents for under $0.50. The pipeline in this guide tracks token usage per batch and sets budget caps so you never get a surprise bill.
- Can I use a local LLM instead of OpenAI?
- Yes. The pipeline uses a clean abstraction layer — swap the API call for an Ollama or vLLM endpoint and everything else stays the same. Local models eliminate API costs but need GPU hardware.
- How do I handle rate limits from the OpenAI API?
- The pipeline uses a token bucket rate limiter that tracks requests per minute and tokens per minute separately. When either limit is close, it backs off automatically. Tenacity handles transient 429 errors with exponential retry.
- What happens when the LLM returns bad output?
- Every response is validated against a Pydantic schema. If the output fails validation, the record goes to a dead letter queue for retry with a stricter prompt or manual review. The pipeline never silently passes garbage downstream.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.