Web Scraping to Structured Data: Building Reliable Extraction Pipelines

· 9 min read · Data & Dashboards

Turn web pages into clean, structured data with Python — using resilient selectors, rate limiting, change detection, and robots.txt compliance for reliable extraction pipelines.

Web Scraping to Structured Data: Building Reliable Extraction Pipelines

Not every data source has an API. Price lists live on web pages. Competitor information is in HTML tables. Government statistics are published as web pages, not endpoints. When the data you need is on the web but not available through an API, scraping is the answer.

But scraping is fragile if done carelessly. A CSS class changes and your entire pipeline breaks. You hit the server too hard and get blocked. You scrape pages you are not supposed to.

This guide builds a reliable web scraping pipeline in Python. Every pattern is designed for production use — resilient selectors, polite request behaviour, change detection, and clean structured output.

Who This Is For

  • Data engineers who need to extract data from websites that do not offer APIs
  • Analysts who manually copy data from web pages into spreadsheets and want to automate it
  • Vibe coders building price trackers, lead scrapers, or competitive monitoring tools
  • Anyone who needs structured data from the web but has had scrapers break after a week

Basic Python is all you need. The guide covers HTML structure, CSS selectors, and request handling from scratch — no web development experience required.

The Scraping Pipeline

Every stage matters. Skip the robots.txt check and you risk getting banned. Skip rate limiting and you risk taking down the server. Skip validation and you get garbage data.

What You Will Need

pip install httpx beautifulsoup4 lxml pandas
  • httpx — modern HTTP client (async-capable, better than requests for scraping)
  • beautifulsoup4 — HTML parsing
  • lxml — fast HTML parser backend
  • pandas — structured data output

Step 1: Respectful Fetching

Check robots.txt First

import httpx
from urllib.parse import urljoin, urlparse
import time
import logging

logger = logging.getLogger("scraper")

class RespectfulFetcher:
    """HTTP client that respects robots.txt and rate limits."""

    def __init__(self, base_url, requests_per_second=1.0, user_agent=None):
        self.base_url = base_url
        self.delay = 1.0 / requests_per_second
        self.last_request_time = 0
        self.user_agent = user_agent or "DataPipelineBot/1.0 ([email protected])"
        self.client = httpx.Client(
            headers={"User-Agent": self.user_agent},
            timeout=30.0,
            follow_redirects=True,
        )
        self.disallowed_paths = []
        self._load_robots()

    def _load_robots(self):
        """Parse robots.txt to find disallowed paths."""
        robots_url = urljoin(self.base_url, "/robots.txt")
        try:
            response = self.client.get(robots_url)
            if response.status_code == 200:
                current_agent_matches = False
                for line in response.text.splitlines():
                    line = line.strip()
                    if line.lower().startswith("user-agent:"):
                        agent = line.split(":", 1)[1].strip()
                        current_agent_matches = agent == "*" or agent in self.user_agent
                    elif line.lower().startswith("disallow:") and current_agent_matches:
                        path = line.split(":", 1)[1].strip()
                        if path:
                            self.disallowed_paths.append(path)

                logger.info(f"robots.txt loaded: {len(self.disallowed_paths)} disallowed paths")
        except httpx.HTTPError:
            logger.warning("Could not fetch robots.txt — proceeding with caution")

    def is_allowed(self, url):
        """Check if a URL is allowed by robots.txt."""
        path = urlparse(url).path
        for disallowed in self.disallowed_paths:
            if path.startswith(disallowed):
                logger.warning(f"Blocked by robots.txt: {path}")
                return False
        return True

    def fetch(self, url):
        """Fetch a URL with rate limiting and robots.txt compliance."""
        if not self.is_allowed(url):
            raise PermissionError(f"URL blocked by robots.txt: {url}")

        # Rate limiting
        elapsed = time.time() - self.last_request_time
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)

        response = self.client.get(url)
        self.last_request_time = time.time()

        response.raise_for_status()
        logger.info(f"Fetched: {url} ({len(response.text)} bytes)")
        return response.text

    def close(self):
        """Close the HTTP client."""
        self.client.close()

Step 2: Resilient Parsing

The most common scraping failure: selectors break because the page layout changes. Use multiple fallback selectors and validate what you extract.

from bs4 import BeautifulSoup
from dataclasses import dataclass, field


@dataclass
class ExtractionRule:
    """Define how to extract a field with fallback selectors."""

    name: str
    selectors: list         # Try each selector in order
    attribute: str = None   # Extract an attribute instead of text
    required: bool = True   # Fail if not found?
    transform: callable = None  # Post-processing function


class ResilientParser:
    """Parse HTML with fallback selectors and validation."""

    def __init__(self, rules):
        self.rules = rules

    def parse(self, html, source_url=""):
        """Extract structured data from HTML."""
        soup = BeautifulSoup(html, "lxml")
        record = {"_source_url": source_url}
        errors = []

        for rule in self.rules:
            value = self._extract_field(soup, rule)

            if value is None and rule.required:
                errors.append(f"Required field '{rule.name}' not found")
                continue

            if value and rule.transform:
                try:
                    value = rule.transform(value)
                except Exception as e:
                    errors.append(f"Transform failed for '{rule.name}': {e}")
                    continue

            record[rule.name] = value

        if errors:
            logger.warning(f"Parse issues for {source_url}: {errors}")

        return record, errors

    def _extract_field(self, soup, rule):
        """Try multiple selectors until one works."""
        for selector in rule.selectors:
            try:
                element = soup.select_one(selector)
                if element:
                    if rule.attribute:
                        return element.get(rule.attribute)
                    return element.get_text(strip=True)
            except Exception:
                continue
        return None

    def parse_table(self, html, table_selector="table"):
        """Extract an HTML table into a list of dicts."""
        soup = BeautifulSoup(html, "lxml")
        table = soup.select_one(table_selector)

        if not table:
            return []

        # Get headers
        headers = []
        header_row = table.select_one("thead tr") or table.select_one("tr")
        if header_row:
            headers = [th.get_text(strip=True) for th in header_row.select("th, td")]

        # Get rows
        rows = []
        body_rows = table.select("tbody tr") or table.select("tr")[1:]
        for tr in body_rows:
            cells = [td.get_text(strip=True) for td in tr.select("td")]
            if cells and len(cells) == len(headers):
                rows.append(dict(zip(headers, cells)))

        return rows

Defining Extraction Rules

import re

def parse_price(text):
    """Extract numeric price from text like '$1,234.56'."""
    match = re.search(r'[\d,]+\.?\d*', text.replace(",", ""))
    return float(match.group()) if match else None

# Define rules for a product page
product_rules = [
    ExtractionRule(
        name="title",
        selectors=[
            "h1.product-title",
            "h1[data-testid='product-name']",
            ".product-info h1",
            "h1",  # Last resort
        ],
    ),
    ExtractionRule(
        name="price",
        selectors=[
            "span.price-current",
            "[data-testid='price']",
            ".product-price span",
            ".price",
        ],
        transform=parse_price,
    ),
    ExtractionRule(
        name="description",
        selectors=[
            "div.product-description",
            "[data-testid='description']",
            ".description p",
        ],
        required=False,
    ),
    ExtractionRule(
        name="image_url",
        selectors=[
            "img.product-image",
            ".product-gallery img",
            ".product img",
        ],
        attribute="src",
        required=False,
    ),
    ExtractionRule(
        name="availability",
        selectors=[
            "span.stock-status",
            "[data-testid='availability']",
            ".availability",
        ],
        required=False,
    ),
]

Step 3: Caching

Avoid re-fetching pages that have not changed. Saves time and is polite to the server.

import hashlib
import json
import os
from datetime import datetime, timedelta

class PageCache:
    """Cache fetched pages to avoid unnecessary requests."""

    def __init__(self, cache_dir="scraper_cache", ttl_hours=24):
        self.cache_dir = cache_dir
        self.ttl = timedelta(hours=ttl_hours)
        os.makedirs(cache_dir, exist_ok=True)

    def _cache_key(self, url):
        """Generate a filesystem-safe cache key."""
        return hashlib.sha256(url.encode()).hexdigest()

    def get(self, url):
        """Get cached page if it exists and is fresh."""
        key = self._cache_key(url)
        meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
        content_path = os.path.join(self.cache_dir, f"{key}.html")

        if not os.path.exists(meta_path):
            return None

        with open(meta_path, "r") as f:
            meta = json.load(f)

        cached_at = datetime.fromisoformat(meta["cached_at"])
        if datetime.now() - cached_at > self.ttl:
            return None  # Cache expired

        with open(content_path, "r", encoding="utf-8") as f:
            return f.read()

    def set(self, url, content):
        """Cache a page."""
        key = self._cache_key(url)
        meta_path = os.path.join(self.cache_dir, f"{key}.meta.json")
        content_path = os.path.join(self.cache_dir, f"{key}.html")

        with open(content_path, "w", encoding="utf-8") as f:
            f.write(content)

        with open(meta_path, "w") as f:
            json.dump({
                "url": url,
                "cached_at": datetime.now().isoformat(),
                "size_bytes": len(content),
            }, f)

Step 4: The Complete Scraping Pipeline

import pandas as pd
from datetime import datetime

class ScrapingPipeline:
    """Complete web scraping pipeline with all production patterns."""

    def __init__(self, base_url, rules, requests_per_second=1.0):
        self.fetcher = RespectfulFetcher(base_url, requests_per_second)
        self.parser = ResilientParser(rules)
        self.cache = PageCache()
        self.results = []
        self.errors = []

    def scrape_urls(self, urls):
        """Scrape a list of URLs and return structured data."""
        for i, url in enumerate(urls):
            logger.info(f"Scraping {i + 1}/{len(urls)}: {url}")

            try:
                # Check cache first
                html = self.cache.get(url)
                if html:
                    logger.info(f"Cache hit: {url}")
                else:
                    html = self.fetcher.fetch(url)
                    self.cache.set(url, html)

                # Parse
                record, parse_errors = self.parser.parse(html, source_url=url)
                record["_scraped_at"] = datetime.now().isoformat()

                if parse_errors:
                    self.errors.extend(parse_errors)

                self.results.append(record)

            except PermissionError as e:
                logger.warning(f"Skipped (robots.txt): {url}")
                self.errors.append({"url": url, "error": str(e)})

            except httpx.HTTPStatusError as e:
                logger.error(f"HTTP error for {url}: {e.response.status_code}")
                self.errors.append({"url": url, "error": str(e)})

            except Exception as e:
                logger.error(f"Failed to scrape {url}: {e}")
                self.errors.append({"url": url, "error": str(e)})

        return self.to_dataframe()

    def to_dataframe(self):
        """Convert results to a clean DataFrame."""
        if not self.results:
            return pd.DataFrame()

        df = pd.DataFrame(self.results)
        logger.info(f"Scraped {len(df)} records with {len(self.errors)} errors")
        return df

    def get_error_summary(self):
        """Return a summary of scraping errors."""
        return pd.DataFrame(self.errors) if self.errors else pd.DataFrame()

    def close(self):
        """Clean up resources."""
        self.fetcher.close()

Running the Pipeline

# Define what to extract
rules = [
    ExtractionRule(
        name="title",
        selectors=["h1.product-title", "h1"],
    ),
    ExtractionRule(
        name="price",
        selectors=["span.price", ".product-price"],
        transform=parse_price,
    ),
    ExtractionRule(
        name="category",
        selectors=[".breadcrumb li:last-child", ".category-name"],
        required=False,
    ),
]

# Scrape
pipeline = ScrapingPipeline(
    base_url="https://example-store.com",
    rules=rules,
    requests_per_second=0.5,  # One request every 2 seconds
)

urls = [
    "https://example-store.com/product/widget-pro",
    "https://example-store.com/product/gadget-x",
    "https://example-store.com/product/doohickey-3000",
]

df = pipeline.scrape_urls(urls)
pipeline.close()

print(df.to_string(index=False))
          title    price    category         _source_url                _scraped_at
     Widget Pro  1249.99  Electronics  .../widget-pro    2026-04-27T14:30:00
       Gadget X   899.50    Gadgets    .../gadget-x      2026-04-27T14:30:02
 Doohickey 3000   349.99   Accessories .../doohickey-3000 2026-04-27T14:30:04

Step 5: Change Detection

Monitor pages for changes and only process what has actually updated.

import hashlib

class ChangeDetector:
    """Detect when scraped content has changed."""

    def __init__(self, db_path="scraper_hashes.db"):
        import sqlite3
        self.db_path = db_path
        conn = sqlite3.connect(db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS page_hashes (
                url TEXT PRIMARY KEY,
                content_hash TEXT NOT NULL,
                last_checked TEXT NOT NULL,
                last_changed TEXT
            )
        """)
        conn.commit()
        conn.close()

    def has_changed(self, url, content):
        """Check if page content has changed since last check."""
        import sqlite3
        content_hash = hashlib.sha256(content.encode()).hexdigest()

        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            "SELECT content_hash FROM page_hashes WHERE url = ?", (url,)
        )
        row = cursor.fetchone()

        now = datetime.now().isoformat()

        if row is None:
            # First time seeing this URL
            conn.execute(
                "INSERT INTO page_hashes (url, content_hash, last_checked, last_changed) "
                "VALUES (?, ?, ?, ?)",
                (url, content_hash, now, now),
            )
            conn.commit()
            conn.close()
            return True  # New page = changed

        old_hash = row[0]
        changed = content_hash != old_hash

        if changed:
            conn.execute(
                "UPDATE page_hashes SET content_hash = ?, last_checked = ?, last_changed = ? "
                "WHERE url = ?",
                (content_hash, now, now, url),
            )
            logger.info(f"Content changed: {url}")
        else:
            conn.execute(
                "UPDATE page_hashes SET last_checked = ? WHERE url = ?",
                (now, url),
            )

        conn.commit()
        conn.close()
        return changed

Using Change Detection

detector = ChangeDetector()

for url in urls:
    html = fetcher.fetch(url)

    if detector.has_changed(url, html):
        # Parse and store — content is new or updated
        record, errors = parser.parse(html, source_url=url)
        save_to_database(record)
        logger.info(f"Updated: {url}")
    else:
        logger.info(f"No change: {url}")

Step 6: HTML Table Extraction

Many data sources are just HTML tables. Extract them directly into DataFrames.

def scrape_html_table(url, table_selector="table", fetcher=None):
    """Scrape an HTML table and return a DataFrame."""
    if fetcher is None:
        fetcher = RespectfulFetcher(url)

    html = fetcher.fetch(url)
    parser = ResilientParser([])
    rows = parser.parse_table(html, table_selector)

    df = pd.DataFrame(rows)
    logger.info(f"Extracted table: {len(df)} rows, {len(df.columns)} columns")
    return df


# Example: scrape a government statistics table
df = scrape_html_table(
    "https://example.gov/statistics/population",
    table_selector="table.data-table",
)
print(df.head())

Ethical Scraping Checklist

RuleImplementation
Check robots.txtRespectfulFetcher._load_robots()
Rate limit requeststime.sleep() between requests
Identify your botCustom User-Agent with contact info
Cache responsesPageCache avoids re-fetching
Do not scrape login-required pagesOnly fetch public URLs
Respect noindex / nofollowCheck meta tags before indexing
Stop if askedMonitor for 429/403 responses

What This Replaces

Manual processScraping pipeline equivalent
Copy-paste from web pagesAutomated extraction to DataFrame
Manually check for price changesChange detection with alerting
Open 50 tabs, compare dataScrape all URLs, diff in one table
”I think they updated the page”Content hash comparison
No data from sites without APIsStructured extraction from any HTML

Common Pitfalls

PitfallWhy it failsFix
Single CSS selectorBreaks when layout changesMultiple fallback selectors
No rate limitingServer blocks your IP1-2 requests per second max
Ignoring robots.txtLegal and ethical issuesAlways check before scraping
No cachingSlow, wasteful, gets you blockedCache with configurable TTL
No error handlingOne bad page crashes the pipelineTry/except with error collection
Storing raw HTMLHard to analyseParse to structured data immediately

Next Steps

Start with a small set of target URLs and well-defined extraction rules. Validate that your selectors work across the pages you need, then add caching and change detection. The pipeline structure here scales from 10 URLs to 10,000 with the same code.

For storing and processing the data you scrape, see How to Design Data Pipelines for Reliable Reporting. For cleaning the messy data that scraping often produces, see How to Clean Messy Excel Data Using Python.

Data analytics services include building reliable data extraction pipelines from web sources.

Get in touch to discuss automating your data extraction.

web scraping python beautifulsoup data extraction python scraping pipeline structured data scraping web scraping automation resilient web scraper scraping to dataframe python httpx scraping change detection scraping ethical web scraping

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles