Web Scraping to Structured Data: Building Reliable Extraction Pipelines
Turn web pages into clean, structured data with Python — using resilient selectors, rate limiting, change detection, and robots.txt compliance for reliable extraction pipelines.

Not every data source has an API. Price lists live on web pages. Competitor information is in HTML tables. Government statistics are published as web pages, not endpoints. When the data you need is on the web but not available through an API, scraping is the answer.
But scraping is fragile if done carelessly. A CSS class changes and your entire pipeline breaks. You hit the server too hard and get blocked. You scrape pages you are not supposed to.
This guide builds a reliable web scraping pipeline in Python. Every pattern is designed for production use — resilient selectors, polite request behaviour, change detection, and clean structured output.
# The Scraping Pipeline
flowchart LR T["Target URLs"] --> R["robots.txt\nCheck"] R --> F["Fetch\n(rate limited)"] F --> P["Parse\n(BeautifulSoup)"] P --> V["Validate\n(schema check)"] V --> S["Store\n(DataFrame / DB)"] S --> M["Monitor\n(change detection)"] F -.-> C["Cache\n(avoid re-fetching)"]
Every stage matters. Skip the robots.txt check and you risk getting banned. Skip rate limiting and you risk taking down the server. Skip validation and you get garbage data.
# What You Will Need
pip install httpx beautifulsoup4 lxml pandas
- httpx — modern HTTP client (async-capable, better than requests for scraping)
- beautifulsoup4 — HTML parsing
- lxml — fast HTML parser backend
- pandas — structured data output
# Step 1: Respectful Fetching
# Check robots.txt First
import httpx
from urllib.parse import urljoin, urlparse
import time
import logging
logger = logging.getLogger("scraper")
class RespectfulFetcher:
"""HTTP client that respects robots.txt and rate limits."""
def __init__(self, base_url, requests_per_second=1.0, user_agent=None):
self.base_url = base_url
self.delay = 1.0 / requests_per_second
self.last_request_time = 0
self.user_agent = user_agent or "DataPipelineBot/1.0 ([email protected])"
self.client = httpx.Client(
headers={"User-Agent": self.user_agent},
timeout=30.0,
follow_redirects=True,
)
self.disallowed_paths = []
self._load_robots()
def _load_robots(self):
"""Parse robots.txt to find disallowed paths."""
robots_url = urljoin(self.base_url, "/robots.txt")
try:
response = self.client.get(robots_url)
if response.status_code == 200:
current_agent_matches = False
for line in response.text.splitlines():
line = line.strip()
if line.lower().startswith("user-agent:"):
agent = line.split(":", 1)[1].strip()
current_agent_matches = agent == "*" or agent in self.user_agent
elif line.lower().startswith("disallow:") and current_agent_matches:
path = line.split(":", 1)[1].strip()
if path:
self.disallowed_paths.append(path)
logger.info(f"robots.txt loaded: {len(self.disallowed_paths)} disallowed paths")
except httpx.HTTPError:
logger.warning("Could not fetch robots.txt — proceeding with caution")
def is_allowed(self, url):
"""Check if a URL is allowed by robots.txt."""
path = urlparse(url).path
for disallowed in self.disallowed_paths:
if path.startswith(disallowed):
logger.warning(f"Blocked by robots.txt: {path}")
return False
return True
def fetch(self, url):
"""Fetch a URL with rate limiting and robots.txt compliance."""
if not self.is_allowed(url):
raise PermissionError(f"URL blocked by robots.txt: {url}")
# Rate limiting
elapsed = time.time() - self.last_request_time
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
response = self.client.get(url)
self.last_request_time = time.time()
response.raise_for_status()
logger.info(f"Fetched: {url} ({len(response.text)} bytes)")
return response.text
def close(self):
"""Close the HTTP client."""
self.client.close()
# Step 2: Resilient Parsing
The most common scraping failure: selectors break because the page layout changes. Use multiple fallback selectors and validate what you extract.
from bs4 import BeautifulSoup
from dataclasses import dataclass, field
@dataclass
class ExtractionRule:
"""Define how to extract a field with fallback selectors."""
name: str
selectors: list # Try each selector in order
attribute: str = None # Extract an attribute instead of text
required: bool = True # Fail if not found?
transform: callable = None # Post-processing function
class ResilientParser:
"""Parse HTML with fallback selectors and validation."""
def __init__(self, rules):
self.rules = rules
def parse(self, html, source_url=""):
"""Extract structured data from HTML."""
soup = BeautifulSoup(html, "lxml")
record = {"_source_url": source_url}
errors = []
for rule in self.rules:
value = self._extract_field(soup, rule)
if value is None and rule.required:
errors.append(f"Required field '{rule.name}' not found")
continue
if value and rule.transform:
try:
value = rule.transform(value)
except Exception as e:
errors.append(f"Transform failed for '{rule.name}': {e}")
continue
record[rule.name] = value
if errors:
logger.warning(f"Parse issues for {source_url}: {errors}")
return record, errors
def _extract_field(self, soup, rule):
"""Try multiple selectors until one works."""
for selector in rule.selectors:
try:
element = soup.select_one(selector)
if element:
if rule.attribute:
return element.get(rule.attribute)
return element.get_text(strip=True)
except Exception:
continue
return None
def parse_table(self, html, table_selector="table"):
"""Extract an HTML table into a list of dicts."""
soup = BeautifulSoup(html, "lxml")
table = soup.select_one(table_selector)
if not table:
return []
# Get headers
headers = []
header_row = table.select_one("thead tr") or table.select_one("tr")
if header_row:
headers = [th.get_text(strip=True) for th in header_row.select("th, td")]
# Get rows
rows = []
body_rows = table.select("tbody tr") or table.select("tr")[1:]
for tr in body_rows:
cells = [td.get_text(strip=True) for td in tr.select("td")]
if cells and len(cells) == len(headers):
rows.append(dict(zip(headers, cells)))
return rows
# Defining Extraction Rules
import re
def parse_price(text):
"""Extract numeric price from text like '$1,234.56'."""
match = re.search(r'[\d,]+\.?\d*', text.replace(",", ""))
return float(match.group()) if match else None
# Define rules for a product page
product_rules = [
ExtractionRule(
name="title",
selectors=[
"h1.product-title",
"h1[data-testid='product-name']",
".product-info h1",
"h1", # Last resort
],
),
ExtractionRule(
name="price",
selectors=[
"span.price-current",
"[data-testid='price']",
".product-price span",
".price",
],
transform=parse_price,
),
ExtractionRule(
name="description",
selectors=[
"div.product-description",
"[data-testid='description']",
".description p",
],
required=False,
),
ExtractionRule(
name="image_url",
selectors=[
"img.product-image",
".product-gallery img",
".product img",
],
attribute="src",
required=False,
),
ExtractionRule(
name="availability",
selectors=[
"span.stock-status",
"[data-testid='availability']",
".availability",
],
required=False,
),
]
# Step 3: Caching
Avoid re-fetching pages that have not changed. Saves time and is polite to the server.
import hashlib
import json
import os
from datetime import datetime, timedelta
class PageCache:
"""Cache fetched pages to avoid unnecessary requests."""
def __init__(self, cache_dir="scraper_cache", ttl_hours=24):
self.cache_dir = cache_dir
self.ttl = timedelta(hours=ttl_hours)
os.makedirs(cache_dir, exist_ok=True)
def _cache_key(self, url):
"""Generate a filesystem-safe cache key."""
return hashlib.sha256(url.encode()).hexdigest()
def get(self, url):
"""Get cached page if it exists and is fresh."""
key = self._cache_key(url)
meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
content_path = os.path.join(self.cache_dir, f"{key}.html")
if not os.path.exists(meta_path):
return None
with open(meta_path, "r") as f:
meta = json.load(f)
cached_at = datetime.fromisoformat(meta["cached_at"])
if datetime.now() - cached_at > self.ttl:
return None # Cache expired
with open(content_path, "r", encoding="utf-8") as f:
return f.read()
def set(self, url, content):
"""Cache a page."""
key = self._cache_key(url)
meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
content_path = os.path.join(self.cache_dir, f"{key}.html")
with open(content_path, "w", encoding="utf-8") as f:
f.write(content)
with open(meta_path, "w") as f:
json.dump({
"url": url,
"cached_at": datetime.now().isoformat(),
"size_bytes": len(content),
}, f)
# Step 4: The Complete Scraping Pipeline
import pandas as pd
from datetime import datetime
class ScrapingPipeline:
"""Complete web scraping pipeline with all production patterns."""
def __init__(self, base_url, rules, requests_per_second=1.0):
self.fetcher = RespectfulFetcher(base_url, requests_per_second)
self.parser = ResilientParser(rules)
self.cache = PageCache()
self.results = []
self.errors = []
def scrape_urls(self, urls):
"""Scrape a list of URLs and return structured data."""
for i, url in enumerate(urls):
logger.info(f"Scraping {i + 1}/{len(urls)}: {url}")
try:
# Check cache first
html = self.cache.get(url)
if html:
logger.info(f"Cache hit: {url}")
else:
html = self.fetcher.fetch(url)
self.cache.set(url, html)
# Parse
record, parse_errors = self.parser.parse(html, source_url=url)
record["_scraped_at"] = datetime.now().isoformat()
if parse_errors:
self.errors.extend(parse_errors)
self.results.append(record)
except PermissionError as e:
logger.warning(f"Skipped (robots.txt): {url}")
self.errors.append({"url": url, "error": str(e)})
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error for {url}: {e.response.status_code}")
self.errors.append({"url": url, "error": str(e)})
except Exception as e:
logger.error(f"Failed to scrape {url}: {e}")
self.errors.append({"url": url, "error": str(e)})
return self.to_dataframe()
def to_dataframe(self):
"""Convert results to a clean DataFrame."""
if not self.results:
return pd.DataFrame()
df = pd.DataFrame(self.results)
logger.info(f"Scraped {len(df)} records with {len(self.errors)} errors")
return df
def get_error_summary(self):
"""Return a summary of scraping errors."""
return pd.DataFrame(self.errors) if self.errors else pd.DataFrame()
def close(self):
"""Clean up resources."""
self.fetcher.close()
# Running the Pipeline
# Define what to extract
rules = [
ExtractionRule(
name="title",
selectors=["h1.product-title", "h1"],
),
ExtractionRule(
name="price",
selectors=["span.price", ".product-price"],
transform=parse_price,
),
ExtractionRule(
name="category",
selectors=[".breadcrumb li:last-child", ".category-name"],
required=False,
),
]
# Scrape
pipeline = ScrapingPipeline(
base_url="https://example-store.com",
rules=rules,
requests_per_second=0.5, # One request every 2 seconds
)
urls = [
"https://example-store.com/product/widget-pro",
"https://example-store.com/product/gadget-x",
"https://example-store.com/product/doohickey-3000",
]
df = pipeline.scrape_urls(urls)
pipeline.close()
print(df.to_string(index=False))
title price category _source_url _scraped_at
Widget Pro 1249.99 Electronics .../widget-pro 2026-04-27T14:30:00
Gadget X 899.50 Gadgets .../gadget-x 2026-04-27T14:30:02
Doohickey 3000 349.99 Accessories .../doohickey-3000 2026-04-27T14:30:04
# Step 5: Change Detection
Monitor pages for changes and only process what has actually updated.
import hashlib
class ChangeDetector:
"""Detect when scraped content has changed."""
def __init__(self, db_path="scraper_hashes.db"):
import sqlite3
self.db_path = db_path
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS page_hashes (
url TEXT PRIMARY KEY,
content_hash TEXT NOT NULL,
last_checked TEXT NOT NULL,
last_changed TEXT
)
""")
conn.commit()
conn.close()
def has_changed(self, url, content):
"""Check if page content has changed since last check."""
import sqlite3
content_hash = hashlib.sha256(content.encode()).hexdigest()
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT content_hash FROM page_hashes WHERE url = ?", (url,)
)
row = cursor.fetchone()
now = datetime.now().isoformat()
if row is None:
# First time seeing this URL
conn.execute(
"INSERT INTO page_hashes (url, content_hash, last_checked, last_changed) "
"VALUES (?, ?, ?, ?)",
(url, content_hash, now, now),
)
conn.commit()
conn.close()
return True # New page = changed
old_hash = row[0]
changed = content_hash != old_hash
if changed:
conn.execute(
"UPDATE page_hashes SET content_hash = ?, last_checked = ?, last_changed = ? "
"WHERE url = ?",
(content_hash, now, now, url),
)
logger.info(f"Content changed: {url}")
else:
conn.execute(
"UPDATE page_hashes SET last_checked = ? WHERE url = ?",
(now, url),
)
conn.commit()
conn.close()
return changed
# Using Change Detection
detector = ChangeDetector()
for url in urls:
html = fetcher.fetch(url)
if detector.has_changed(url, html):
# Parse and store — content is new or updated
record, errors = parser.parse(html, source_url=url)
save_to_database(record)
logger.info(f"Updated: {url}")
else:
logger.info(f"No change: {url}")
# Step 6: HTML Table Extraction
Many data sources are just HTML tables. Extract them directly into DataFrames.
def scrape_html_table(url, table_selector="table", fetcher=None):
"""Scrape an HTML table and return a DataFrame."""
if fetcher is None:
fetcher = RespectfulFetcher(url)
html = fetcher.fetch(url)
parser = ResilientParser([])
rows = parser.parse_table(html, table_selector)
df = pd.DataFrame(rows)
logger.info(f"Extracted table: {len(df)} rows, {len(df.columns)} columns")
return df
# Example: scrape a government statistics table
df = scrape_html_table(
"https://example.gov/statistics/population",
table_selector="table.data-table",
)
print(df.head())
# Ethical Scraping Checklist
| Rule | Implementation |
|---|---|
| Check robots.txt | RespectfulFetcher._load_robots() |
| Rate limit requests | time.sleep() between requests |
| Identify your bot | Custom User-Agent with contact info |
| Cache responses | PageCache avoids re-fetching |
| Do not scrape login-required pages | Only fetch public URLs |
Respect noindex / nofollow |
Check meta tags before indexing |
| Stop if asked | Monitor for 429/403 responses |
# What This Replaces
| Manual process | Scraping pipeline equivalent |
|---|---|
| Copy-paste from web pages | Automated extraction to DataFrame |
| Manually check for price changes | Change detection with alerting |
| Open 50 tabs, compare data | Scrape all URLs, diff in one table |
| "I think they updated the page" | Content hash comparison |
| No data from sites without APIs | Structured extraction from any HTML |
# Common Pitfalls
| Pitfall | Why it fails | Fix |
|---|---|---|
| Single CSS selector | Breaks when layout changes | Multiple fallback selectors |
| No rate limiting | Server blocks your IP | 1-2 requests per second max |
| Ignoring robots.txt | Legal and ethical issues | Always check before scraping |
| No caching | Slow, wasteful, gets you blocked | Cache with configurable TTL |
| No error handling | One bad page crashes the pipeline | Try/except with error collection |
| Storing raw HTML | Hard to analyse | Parse to structured data immediately |
# Next Steps
Start with a small set of target URLs and well-defined extraction rules. Validate that your selectors work across the pages you need, then add caching and change detection. The pipeline structure here scales from 10 URLs to 10,000 with the same code.
For storing and processing the data you scrape, see How to Design Data Pipelines for Reliable Reporting. For cleaning the messy data that scraping often produces, see How to Clean Messy Excel Data Using Python.
Data analytics services include building reliable data extraction pipelines from web sources.
Get in touch to discuss automating your data extraction.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.