Async Python for Faster Data Collection and Processing

·8 min read·Data & Dashboards

Speed up API calls, web scraping, and file processing with async Python. Covers asyncio, aiohttp, semaphores for rate limiting, and patterns for mixing sync and async code.

Async Python for Faster Data Collection and ProcessingAI Generated Image

A pipeline that calls 50 APIs sequentially spends 95% of its time waiting for network responses. Each call takes 200–500 milliseconds, and the next one cannot start until the previous one finishes. A 50-endpoint extraction that should take 2 seconds takes 15.

Async Python fixes this. Instead of waiting for each response before making the next request, you fire all of them concurrently and process results as they arrive. The same 50 API calls complete in under a second.

This guide covers practical async patterns for data collection and processing — from basic asyncio to production-ready patterns with rate limiting, error handling, and graceful degradation.

# Who This Is For

  • Data engineers whose pipelines spend most of their runtime waiting for API responses
  • Developers pulling data from multiple sources who want to cut extraction time dramatically
  • Vibe coders who built a working scraper or API client and want to make it 10x faster
  • Anyone running sequential HTTP calls in a loop and wondering why it takes so long

You need solid Python fundamentals — functions, loops, error handling. No prior async experience required. This guide introduces the concepts with clear before/after comparisons.

# Sync vs Async: The Core Difference

flowchart LR
  subgraph Synchronous
    direction TB
    S1["Call API 1\n(wait 300ms)"] --> S2["Call API 2\n(wait 250ms)"]
    S2 --> S3["Call API 3\n(wait 400ms)"]
    S3 --> SR["Total: 950ms"]
  end

  subgraph Asynchronous
    direction TB
    A1["Call API 1"] --> AR["Total: 400ms\n(slowest call)"]
    A2["Call API 2"] --> AR
    A3["Call API 3"] --> AR
  end

In synchronous code, each I/O operation blocks the entire program. In async code, while one request waits for a response, the event loop starts the next request.

# What You Will Need

bash
pip install aiohttp aiofiles asyncio
  • aiohttp — async HTTP client (replaces requests for concurrent calls)
  • aiofiles — async file I/O
  • asyncio — built into Python 3.7+

# Step 1: Your First Async Pipeline

# The Slow Way (Synchronous)

python
import requests
import time

def fetch_all_sync(urls: list[str]) -> list[dict]:
    """Fetch URLs one at a time. Painfully slow."""
    results = []
    for url in urls:
        response = requests.get(url, timeout=10)
        results.append(response.json())
    return results

# 50 URLs × 300ms average = ~15 seconds
start = time.perf_counter()
data = fetch_all_sync(urls)
print(f"Sync: {time.perf_counter() - start:.1f}s")
# => Sync: 14.8s

# The Fast Way (Asynchronous)

python
import aiohttp
import asyncio
import time

async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
    """Fetch a single URL asynchronously."""
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
        return await response.json()

async def fetch_all_async(urls: list[str]) -> list[dict]:
    """Fetch all URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 50 URLs concurrently = ~0.4 seconds (limited by slowest response)
start = time.perf_counter()
data = asyncio.run(fetch_all_async(urls))
print(f"Async: {time.perf_counter() - start:.1f}s")
# => Async: 0.4s

Same result, 37× faster. The speedup comes from overlapping wait times — while one request is in flight, the event loop processes others.

# Step 2: Rate Limiting with Semaphores

Firing 1,000 requests simultaneously will get you rate-limited or banned. Use a semaphore to cap concurrency:

python
import aiohttp
import asyncio

async def fetch_with_limit(
    urls: list[str],
    max_concurrent: int = 10,
) -> list[dict]:
    """Fetch URLs with a concurrency limit."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(session, url):
        async with semaphore:  # at most max_concurrent requests at once
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                return await resp.json()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 1,000 URLs, max 10 at a time
data = asyncio.run(fetch_with_limit(urls, max_concurrent=10))

# Choosing the Right Concurrency Limit

API Type Suggested Limit Reasoning
Your own API 20–50 You control the server capacity
Third-party API (paid) 5–10 Respect rate limits in their docs
Third-party API (free tier) 2–5 Aggressive limits, err on the safe side
Web scraping 3–5 Polite crawling avoids IP bans
Database queries Pool size (e.g., 10) Match your connection pool

# Step 3: Error Handling in Async Pipelines

asyncio.gather() fails fast — one error cancels everything. Use return_exceptions=True or handle errors per-task:

python
import aiohttp
import asyncio
import logging

logger = logging.getLogger("async_pipeline")

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3,
    backoff_base: float = 1.0,
) -> dict | None:
    """Fetch a URL with exponential backoff retry."""
    for attempt in range(1, max_retries + 1):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                resp.raise_for_status()
                return await resp.json()

        except aiohttp.ClientResponseError as e:
            if e.status == 429:  # rate limited
                wait = backoff_base * (2 ** attempt)
                logger.warning("Rate limited on %s, waiting %.1fs", url, wait)
                await asyncio.sleep(wait)
            elif e.status >= 500:  # server error, retry
                wait = backoff_base * attempt
                logger.warning("Server error %d on %s, retry %d/%d", e.status, url, attempt, max_retries)
                await asyncio.sleep(wait)
            else:
                logger.error("Client error %d on %s, not retrying", e.status, url)
                return None

        except asyncio.TimeoutError:
            logger.warning("Timeout on %s, retry %d/%d", url, attempt, max_retries)
            await asyncio.sleep(backoff_base * attempt)

        except Exception as e:
            logger.error("Unexpected error on %s: %s", url, str(e))
            return None

    logger.error("All %d retries failed for %s", max_retries, url)
    return None

# Using Retry Logic in a Pipeline

python
async def extract_all_endpoints(endpoints: list[dict]) -> list[dict]:
    """Extract data from multiple API endpoints with error handling."""
    semaphore = asyncio.Semaphore(10)

    async def fetch_endpoint(session, endpoint):
        async with semaphore:
            result = await fetch_with_retry(session, endpoint["url"])
            if result is not None:
                return {"source": endpoint["name"], "data": result}
            return {"source": endpoint["name"], "data": None, "error": True}

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_endpoint(session, ep) for ep in endpoints]
        results = await asyncio.gather(*tasks)

    # Report extraction summary
    success = sum(1 for r in results if not r.get("error"))
    failed = len(results) - success
    logger.info("Extraction complete: %d success, %d failed", success, failed)

    return [r for r in results if not r.get("error")]

# Step 4: Async File Processing

Process large files without blocking the event loop:

python
import aiofiles
import asyncio
import json
from pathlib import Path

async def process_json_files(directory: str) -> list[dict]:
    """Read and parse multiple JSON files concurrently."""
    files = list(Path(directory).glob("*.json"))
    semaphore = asyncio.Semaphore(20)  # limit open file handles

    async def read_one(filepath: Path) -> dict | None:
        async with semaphore:
            try:
                async with aiofiles.open(filepath, mode="r") as f:
                    content = await f.read()
                    return json.loads(content)
            except Exception as e:
                logger.error("Failed to read %s: %s", filepath.name, str(e))
                return None

    tasks = [read_one(f) for f in files]
    results = await asyncio.gather(*tasks)
    return [r for r in results if r is not None]

# Writing Results Asynchronously

python
async def save_results(results: list[dict], output_path: str):
    """Write results to a JSON file asynchronously."""
    async with aiofiles.open(output_path, mode="w") as f:
        await f.write(json.dumps(results, indent=2))
    logger.info("Saved %d results to %s", len(results), output_path)

# Step 5: Mixing Sync and Async Code

Most data pipelines have sync components (pandas, database drivers). Use asyncio.to_thread() to run them without blocking:

python
import asyncio
import pandas as pd

def transform_sync(data: list[dict]) -> pd.DataFrame:
    """Synchronous pandas transformation (CPU-bound work)."""
    df = pd.DataFrame(data)
    df["amount"] = df["amount"].astype(float)
    df["date"] = pd.to_datetime(df["date"])
    df = df.dropna(subset=["amount"])
    return df.groupby("date")["amount"].sum().reset_index()

async def async_pipeline():
    """Full pipeline mixing async I/O with sync transforms."""
    # Step 1: Async extraction (I/O bound)
    raw_data = await fetch_all_async(api_urls)

    # Step 2: Sync transformation (CPU bound) — run in thread
    df = await asyncio.to_thread(transform_sync, raw_data)

    # Step 3: Async file write (I/O bound)
    await save_results(df.to_dict(orient="records"), "output.json")

    return df

# Run the full pipeline
result = asyncio.run(async_pipeline())

# When to Use What

Operation Type Use
HTTP requests I/O bound aiohttp (async)
File reads/writes I/O bound aiofiles (async)
Database queries I/O bound asyncpg or aiomysql (async)
pandas transforms CPU bound asyncio.to_thread()
NumPy calculations CPU bound asyncio.to_thread()
Image processing CPU bound asyncio.to_thread() or ProcessPoolExecutor

# Step 6: Production-Ready Async Extractor

Combining all patterns into a reusable class:

python
import aiohttp
import asyncio
import logging
import time

logger = logging.getLogger("extractor")

class AsyncExtractor:
    """Production async data extractor with rate limiting and retries."""

    def __init__(self, max_concurrent=10, max_retries=3, timeout=10):
        self.max_concurrent = max_concurrent
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.stats = {"success": 0, "failed": 0, "retried": 0}

    async def extract(self, urls: list[str]) -> list[dict]:
        """Extract data from multiple URLs concurrently."""
        start = time.perf_counter()

        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            tasks = [self._fetch(session, url) for url in urls]
            results = await asyncio.gather(*tasks)

        duration = time.perf_counter() - start
        valid = [r for r in results if r is not None]

        logger.info(
            "Extraction complete: %d/%d success in %.1fs (%.0f req/s)",
            len(valid),
            len(urls),
            duration,
            len(urls) / max(duration, 0.001),
        )

        return valid

    async def _fetch(self, session, url):
        async with self.semaphore:
            return await fetch_with_retry(
                session, url, max_retries=self.max_retries
            )

# Usage

python
async def main():
    extractor = AsyncExtractor(max_concurrent=10, max_retries=3)
    data = await extractor.extract(urls)
    df = await asyncio.to_thread(transform_sync, data)
    print(f"Processed {len(df)} records")

asyncio.run(main())

# Performance Comparison

Approach 50 APIs 200 APIs 1000 APIs
Synchronous (requests) 15s 60s 300s
Threading (concurrent.futures) 3s 10s 45s
Async (aiohttp, limit=10) 1.5s 6s 30s
Async (aiohttp, limit=50) 0.4s 1.5s 7s

Async with a reasonable concurrency limit gives the best balance of speed and reliability. Threading has higher memory overhead and is harder to manage at scale.

# What This Replaces

Sequential Pipeline Async Pipeline
One request at a time Concurrent requests with controlled parallelism
15 seconds for 50 API calls Under 1 second for 50 API calls
No rate limiting Semaphore-based concurrency caps
One failure blocks everything Per-request retry with backoff
requests library only aiohttp + asyncio.to_thread() for sync code
Wastes 95% of time waiting Overlaps I/O wait with useful work

# Next Steps

Async extraction is the first step to faster data pipelines. Build on it with:

Start by replacing your slowest extraction script. If it makes sequential API calls, wrap them in asyncio.gather() with a semaphore. The speedup is immediate and the code change is minimal.

Need help optimising your data collection pipelines? Get in touch or explore our data analytics services.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

async python data pipelineasyncio python tutorialaiohttp api calls pythonpython concurrent requestsasync data collection pythonpython asyncio semaphoreparallel api requests pythonasync web scraping pythonpython async await patternsfast data extraction python