Web Scraping to Structured Data: Building Reliable Extraction Pipelines

·9 min read·Data & Dashboards

Turn web pages into clean, structured data with Python — using resilient selectors, rate limiting, change detection, and robots.txt compliance for reliable extraction pipelines.

Web Scraping to Structured Data: Building Reliable Extraction Pipelines

Not every data source has an API. Price lists live on web pages. Competitor information is in HTML tables. Government statistics are published as web pages, not endpoints. When the data you need is on the web but not available through an API, scraping is the answer.

But scraping is fragile if done carelessly. A CSS class changes and your entire pipeline breaks. You hit the server too hard and get blocked. You scrape pages you are not supposed to.

This guide builds a reliable web scraping pipeline in Python. Every pattern is designed for production use — resilient selectors, polite request behaviour, change detection, and clean structured output.

# The Scraping Pipeline

flowchart LR
  T["Target URLs"] --> R["robots.txt\nCheck"]
  R --> F["Fetch\n(rate limited)"]
  F --> P["Parse\n(BeautifulSoup)"]
  P --> V["Validate\n(schema check)"]
  V --> S["Store\n(DataFrame / DB)"]
  S --> M["Monitor\n(change detection)"]
  F -.-> C["Cache\n(avoid re-fetching)"]

Every stage matters. Skip the robots.txt check and you risk getting banned. Skip rate limiting and you risk taking down the server. Skip validation and you get garbage data.

# What You Will Need

bash
pip install httpx beautifulsoup4 lxml pandas
  • httpx — modern HTTP client (async-capable, better than requests for scraping)
  • beautifulsoup4 — HTML parsing
  • lxml — fast HTML parser backend
  • pandas — structured data output

# Step 1: Respectful Fetching

# Check robots.txt First

python
import httpx
from urllib.parse import urljoin, urlparse
import time
import logging

logger = logging.getLogger("scraper")

class RespectfulFetcher:
    """HTTP client that respects robots.txt and rate limits."""

    def __init__(self, base_url, requests_per_second=1.0, user_agent=None):
        self.base_url = base_url
        self.delay = 1.0 / requests_per_second
        self.last_request_time = 0
        self.user_agent = user_agent or "DataPipelineBot/1.0 ([email protected])"
        self.client = httpx.Client(
            headers={"User-Agent": self.user_agent},
            timeout=30.0,
            follow_redirects=True,
        )
        self.disallowed_paths = []
        self._load_robots()

    def _load_robots(self):
        """Parse robots.txt to find disallowed paths."""
        robots_url = urljoin(self.base_url, "/robots.txt")
        try:
            response = self.client.get(robots_url)
            if response.status_code == 200:
                current_agent_matches = False
                for line in response.text.splitlines():
                    line = line.strip()
                    if line.lower().startswith("user-agent:"):
                        agent = line.split(":", 1)[1].strip()
                        current_agent_matches = agent == "*" or agent in self.user_agent
                    elif line.lower().startswith("disallow:") and current_agent_matches:
                        path = line.split(":", 1)[1].strip()
                        if path:
                            self.disallowed_paths.append(path)

                logger.info(f"robots.txt loaded: {len(self.disallowed_paths)} disallowed paths")
        except httpx.HTTPError:
            logger.warning("Could not fetch robots.txt — proceeding with caution")

    def is_allowed(self, url):
        """Check if a URL is allowed by robots.txt."""
        path = urlparse(url).path
        for disallowed in self.disallowed_paths:
            if path.startswith(disallowed):
                logger.warning(f"Blocked by robots.txt: {path}")
                return False
        return True

    def fetch(self, url):
        """Fetch a URL with rate limiting and robots.txt compliance."""
        if not self.is_allowed(url):
            raise PermissionError(f"URL blocked by robots.txt: {url}")

        # Rate limiting
        elapsed = time.time() - self.last_request_time
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)

        response = self.client.get(url)
        self.last_request_time = time.time()

        response.raise_for_status()
        logger.info(f"Fetched: {url} ({len(response.text)} bytes)")
        return response.text

    def close(self):
        """Close the HTTP client."""
        self.client.close()

# Step 2: Resilient Parsing

The most common scraping failure: selectors break because the page layout changes. Use multiple fallback selectors and validate what you extract.

python
from bs4 import BeautifulSoup
from dataclasses import dataclass, field


@dataclass
class ExtractionRule:
    """Define how to extract a field with fallback selectors."""

    name: str
    selectors: list         # Try each selector in order
    attribute: str = None   # Extract an attribute instead of text
    required: bool = True   # Fail if not found?
    transform: callable = None  # Post-processing function


class ResilientParser:
    """Parse HTML with fallback selectors and validation."""

    def __init__(self, rules):
        self.rules = rules

    def parse(self, html, source_url=""):
        """Extract structured data from HTML."""
        soup = BeautifulSoup(html, "lxml")
        record = {"_source_url": source_url}
        errors = []

        for rule in self.rules:
            value = self._extract_field(soup, rule)

            if value is None and rule.required:
                errors.append(f"Required field '{rule.name}' not found")
                continue

            if value and rule.transform:
                try:
                    value = rule.transform(value)
                except Exception as e:
                    errors.append(f"Transform failed for '{rule.name}': {e}")
                    continue

            record[rule.name] = value

        if errors:
            logger.warning(f"Parse issues for {source_url}: {errors}")

        return record, errors

    def _extract_field(self, soup, rule):
        """Try multiple selectors until one works."""
        for selector in rule.selectors:
            try:
                element = soup.select_one(selector)
                if element:
                    if rule.attribute:
                        return element.get(rule.attribute)
                    return element.get_text(strip=True)
            except Exception:
                continue
        return None

    def parse_table(self, html, table_selector="table"):
        """Extract an HTML table into a list of dicts."""
        soup = BeautifulSoup(html, "lxml")
        table = soup.select_one(table_selector)

        if not table:
            return []

        # Get headers
        headers = []
        header_row = table.select_one("thead tr") or table.select_one("tr")
        if header_row:
            headers = [th.get_text(strip=True) for th in header_row.select("th, td")]

        # Get rows
        rows = []
        body_rows = table.select("tbody tr") or table.select("tr")[1:]
        for tr in body_rows:
            cells = [td.get_text(strip=True) for td in tr.select("td")]
            if cells and len(cells) == len(headers):
                rows.append(dict(zip(headers, cells)))

        return rows

# Defining Extraction Rules

python
import re

def parse_price(text):
    """Extract numeric price from text like '$1,234.56'."""
    match = re.search(r'[\d,]+\.?\d*', text.replace(",", ""))
    return float(match.group()) if match else None

# Define rules for a product page
product_rules = [
    ExtractionRule(
        name="title",
        selectors=[
            "h1.product-title",
            "h1[data-testid='product-name']",
            ".product-info h1",
            "h1",  # Last resort
        ],
    ),
    ExtractionRule(
        name="price",
        selectors=[
            "span.price-current",
            "[data-testid='price']",
            ".product-price span",
            ".price",
        ],
        transform=parse_price,
    ),
    ExtractionRule(
        name="description",
        selectors=[
            "div.product-description",
            "[data-testid='description']",
            ".description p",
        ],
        required=False,
    ),
    ExtractionRule(
        name="image_url",
        selectors=[
            "img.product-image",
            ".product-gallery img",
            ".product img",
        ],
        attribute="src",
        required=False,
    ),
    ExtractionRule(
        name="availability",
        selectors=[
            "span.stock-status",
            "[data-testid='availability']",
            ".availability",
        ],
        required=False,
    ),
]

# Step 3: Caching

Avoid re-fetching pages that have not changed. Saves time and is polite to the server.

python
import hashlib
import json
import os
from datetime import datetime, timedelta

class PageCache:
    """Cache fetched pages to avoid unnecessary requests."""

    def __init__(self, cache_dir="scraper_cache", ttl_hours=24):
        self.cache_dir = cache_dir
        self.ttl = timedelta(hours=ttl_hours)
        os.makedirs(cache_dir, exist_ok=True)

    def _cache_key(self, url):
        """Generate a filesystem-safe cache key."""
        return hashlib.sha256(url.encode()).hexdigest()

    def get(self, url):
        """Get cached page if it exists and is fresh."""
        key = self._cache_key(url)
        meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
        content_path = os.path.join(self.cache_dir, f"{key}.html")

        if not os.path.exists(meta_path):
            return None

        with open(meta_path, "r") as f:
            meta = json.load(f)

        cached_at = datetime.fromisoformat(meta["cached_at"])
        if datetime.now() - cached_at > self.ttl:
            return None  # Cache expired

        with open(content_path, "r", encoding="utf-8") as f:
            return f.read()

    def set(self, url, content):
        """Cache a page."""
        key = self._cache_key(url)
        meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
        content_path = os.path.join(self.cache_dir, f"{key}.html")

        with open(content_path, "w", encoding="utf-8") as f:
            f.write(content)

        with open(meta_path, "w") as f:
            json.dump({
                "url": url,
                "cached_at": datetime.now().isoformat(),
                "size_bytes": len(content),
            }, f)

# Step 4: The Complete Scraping Pipeline

python
import pandas as pd
from datetime import datetime

class ScrapingPipeline:
    """Complete web scraping pipeline with all production patterns."""

    def __init__(self, base_url, rules, requests_per_second=1.0):
        self.fetcher = RespectfulFetcher(base_url, requests_per_second)
        self.parser = ResilientParser(rules)
        self.cache = PageCache()
        self.results = []
        self.errors = []

    def scrape_urls(self, urls):
        """Scrape a list of URLs and return structured data."""
        for i, url in enumerate(urls):
            logger.info(f"Scraping {i + 1}/{len(urls)}: {url}")

            try:
                # Check cache first
                html = self.cache.get(url)
                if html:
                    logger.info(f"Cache hit: {url}")
                else:
                    html = self.fetcher.fetch(url)
                    self.cache.set(url, html)

                # Parse
                record, parse_errors = self.parser.parse(html, source_url=url)
                record["_scraped_at"] = datetime.now().isoformat()

                if parse_errors:
                    self.errors.extend(parse_errors)

                self.results.append(record)

            except PermissionError as e:
                logger.warning(f"Skipped (robots.txt): {url}")
                self.errors.append({"url": url, "error": str(e)})

            except httpx.HTTPStatusError as e:
                logger.error(f"HTTP error for {url}: {e.response.status_code}")
                self.errors.append({"url": url, "error": str(e)})

            except Exception as e:
                logger.error(f"Failed to scrape {url}: {e}")
                self.errors.append({"url": url, "error": str(e)})

        return self.to_dataframe()

    def to_dataframe(self):
        """Convert results to a clean DataFrame."""
        if not self.results:
            return pd.DataFrame()

        df = pd.DataFrame(self.results)
        logger.info(f"Scraped {len(df)} records with {len(self.errors)} errors")
        return df

    def get_error_summary(self):
        """Return a summary of scraping errors."""
        return pd.DataFrame(self.errors) if self.errors else pd.DataFrame()

    def close(self):
        """Clean up resources."""
        self.fetcher.close()

# Running the Pipeline

python
# Define what to extract
rules = [
    ExtractionRule(
        name="title",
        selectors=["h1.product-title", "h1"],
    ),
    ExtractionRule(
        name="price",
        selectors=["span.price", ".product-price"],
        transform=parse_price,
    ),
    ExtractionRule(
        name="category",
        selectors=[".breadcrumb li:last-child", ".category-name"],
        required=False,
    ),
]

# Scrape
pipeline = ScrapingPipeline(
    base_url="https://example-store.com",
    rules=rules,
    requests_per_second=0.5,  # One request every 2 seconds
)

urls = [
    "https://example-store.com/product/widget-pro",
    "https://example-store.com/product/gadget-x",
    "https://example-store.com/product/doohickey-3000",
]

df = pipeline.scrape_urls(urls)
pipeline.close()

print(df.to_string(index=False))
text
          title    price    category         _source_url                _scraped_at
     Widget Pro  1249.99  Electronics  .../widget-pro    2026-04-27T14:30:00
       Gadget X   899.50    Gadgets    .../gadget-x      2026-04-27T14:30:02
 Doohickey 3000   349.99   Accessories .../doohickey-3000 2026-04-27T14:30:04

# Step 5: Change Detection

Monitor pages for changes and only process what has actually updated.

python
import hashlib

class ChangeDetector:
    """Detect when scraped content has changed."""

    def __init__(self, db_path="scraper_hashes.db"):
        import sqlite3
        self.db_path = db_path
        conn = sqlite3.connect(db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS page_hashes (
                url TEXT PRIMARY KEY,
                content_hash TEXT NOT NULL,
                last_checked TEXT NOT NULL,
                last_changed TEXT
            )
        """)
        conn.commit()
        conn.close()

    def has_changed(self, url, content):
        """Check if page content has changed since last check."""
        import sqlite3
        content_hash = hashlib.sha256(content.encode()).hexdigest()

        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            "SELECT content_hash FROM page_hashes WHERE url = ?", (url,)
        )
        row = cursor.fetchone()

        now = datetime.now().isoformat()

        if row is None:
            # First time seeing this URL
            conn.execute(
                "INSERT INTO page_hashes (url, content_hash, last_checked, last_changed) "
                "VALUES (?, ?, ?, ?)",
                (url, content_hash, now, now),
            )
            conn.commit()
            conn.close()
            return True  # New page = changed

        old_hash = row[0]
        changed = content_hash != old_hash

        if changed:
            conn.execute(
                "UPDATE page_hashes SET content_hash = ?, last_checked = ?, last_changed = ? "
                "WHERE url = ?",
                (content_hash, now, now, url),
            )
            logger.info(f"Content changed: {url}")
        else:
            conn.execute(
                "UPDATE page_hashes SET last_checked = ? WHERE url = ?",
                (now, url),
            )

        conn.commit()
        conn.close()
        return changed

# Using Change Detection

python
detector = ChangeDetector()

for url in urls:
    html = fetcher.fetch(url)

    if detector.has_changed(url, html):
        # Parse and store — content is new or updated
        record, errors = parser.parse(html, source_url=url)
        save_to_database(record)
        logger.info(f"Updated: {url}")
    else:
        logger.info(f"No change: {url}")

# Step 6: HTML Table Extraction

Many data sources are just HTML tables. Extract them directly into DataFrames.

python
def scrape_html_table(url, table_selector="table", fetcher=None):
    """Scrape an HTML table and return a DataFrame."""
    if fetcher is None:
        fetcher = RespectfulFetcher(url)

    html = fetcher.fetch(url)
    parser = ResilientParser([])
    rows = parser.parse_table(html, table_selector)

    df = pd.DataFrame(rows)
    logger.info(f"Extracted table: {len(df)} rows, {len(df.columns)} columns")
    return df


# Example: scrape a government statistics table
df = scrape_html_table(
    "https://example.gov/statistics/population",
    table_selector="table.data-table",
)
print(df.head())

# Ethical Scraping Checklist

Rule Implementation
Check robots.txt RespectfulFetcher._load_robots()
Rate limit requests time.sleep() between requests
Identify your bot Custom User-Agent with contact info
Cache responses PageCache avoids re-fetching
Do not scrape login-required pages Only fetch public URLs
Respect noindex / nofollow Check meta tags before indexing
Stop if asked Monitor for 429/403 responses

# What This Replaces

Manual process Scraping pipeline equivalent
Copy-paste from web pages Automated extraction to DataFrame
Manually check for price changes Change detection with alerting
Open 50 tabs, compare data Scrape all URLs, diff in one table
"I think they updated the page" Content hash comparison
No data from sites without APIs Structured extraction from any HTML

# Common Pitfalls

Pitfall Why it fails Fix
Single CSS selector Breaks when layout changes Multiple fallback selectors
No rate limiting Server blocks your IP 1-2 requests per second max
Ignoring robots.txt Legal and ethical issues Always check before scraping
No caching Slow, wasteful, gets you blocked Cache with configurable TTL
No error handling One bad page crashes the pipeline Try/except with error collection
Storing raw HTML Hard to analyse Parse to structured data immediately

# Next Steps

Start with a small set of target URLs and well-defined extraction rules. Validate that your selectors work across the pages you need, then add caching and change detection. The pipeline structure here scales from 10 URLs to 10,000 with the same code.

For storing and processing the data you scrape, see How to Design Data Pipelines for Reliable Reporting. For cleaning the messy data that scraping often produces, see How to Clean Messy Excel Data Using Python.

Data analytics services include building reliable data extraction pipelines from web sources.

Get in touch to discuss automating your data extraction.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

web scraping pythonbeautifulsoup data extractionpython scraping pipelinestructured data scrapingweb scraping automationresilient web scraperscraping to dataframepython httpx scrapingchange detection scrapingethical web scraping