Async Python for Faster Data Collection and Processing

· 8 min read · Data & Dashboards

Speed up API calls, web scraping, and file processing with async Python. Covers asyncio, aiohttp, semaphores for rate limiting, and patterns for mixing sync and async code.

Async Python for Faster Data Collection and Processing

A pipeline that calls 50 APIs sequentially spends 95% of its time waiting for network responses. Each call takes 200–500 milliseconds, and the next one cannot start until the previous one finishes. A 50-endpoint extraction that should take 2 seconds takes 15.

Async Python fixes this. Instead of waiting for each response before making the next request, you fire all of them concurrently and process results as they arrive. The same 50 API calls complete in under a second.

This guide covers practical async patterns for data collection and processing — from basic asyncio to production-ready patterns with rate limiting, error handling, and graceful degradation.

Who This Is For

  • Data engineers whose pipelines spend most of their runtime waiting for API responses
  • Developers pulling data from multiple sources who want to cut extraction time dramatically
  • Vibe coders who built a working scraper or API client and want to make it 10x faster
  • Anyone running sequential HTTP calls in a loop and wondering why it takes so long

You need solid Python fundamentals — functions, loops, error handling. No prior async experience required. This guide introduces the concepts with clear before/after comparisons.

Sync vs Async: The Core Difference

In synchronous code, each I/O operation blocks the entire program. In async code, while one request waits for a response, the event loop starts the next request.

What You Will Need

pip install aiohttp aiofiles asyncio
  • aiohttp — async HTTP client (replaces requests for concurrent calls)
  • aiofiles — async file I/O
  • asyncio — built into Python 3.7+

Step 1: Your First Async Pipeline

The Slow Way (Synchronous)

import requests
import time

def fetch_all_sync(urls: list[str]) -> list[dict]:
    """Fetch URLs one at a time. Painfully slow."""
    results = []
    for url in urls:
        response = requests.get(url, timeout=10)
        results.append(response.json())
    return results

# 50 URLs × 300ms average = ~15 seconds
start = time.perf_counter()
data = fetch_all_sync(urls)
print(f"Sync: {time.perf_counter() - start:.1f}s")
# => Sync: 14.8s

The Fast Way (Asynchronous)

import aiohttp
import asyncio
import time

async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
    """Fetch a single URL asynchronously."""
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
        return await response.json()

async def fetch_all_async(urls: list[str]) -> list[dict]:
    """Fetch all URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 50 URLs concurrently = ~0.4 seconds (limited by slowest response)
start = time.perf_counter()
data = asyncio.run(fetch_all_async(urls))
print(f"Async: {time.perf_counter() - start:.1f}s")
# => Async: 0.4s

Same result, 37× faster. The speedup comes from overlapping wait times — while one request is in flight, the event loop processes others.

Step 2: Rate Limiting with Semaphores

Firing 1,000 requests simultaneously will get you rate-limited or banned. Use a semaphore to cap concurrency:

import aiohttp
import asyncio

async def fetch_with_limit(
    urls: list[str],
    max_concurrent: int = 10,
) -> list[dict]:
    """Fetch URLs with a concurrency limit."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(session, url):
        async with semaphore:  # at most max_concurrent requests at once
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                return await resp.json()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 1,000 URLs, max 10 at a time
data = asyncio.run(fetch_with_limit(urls, max_concurrent=10))

Choosing the Right Concurrency Limit

API TypeSuggested LimitReasoning
Your own API20–50You control the server capacity
Third-party API (paid)5–10Respect rate limits in their docs
Third-party API (free tier)2–5Aggressive limits, err on the safe side
Web scraping3–5Polite crawling avoids IP bans
Database queriesPool size (e.g., 10)Match your connection pool

Step 3: Error Handling in Async Pipelines

asyncio.gather() fails fast — one error cancels everything. Use return_exceptions=True or handle errors per-task:

import aiohttp
import asyncio
import logging

logger = logging.getLogger("async_pipeline")

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3,
    backoff_base: float = 1.0,
) -> dict | None:
    """Fetch a URL with exponential backoff retry."""
    for attempt in range(1, max_retries + 1):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                resp.raise_for_status()
                return await resp.json()

        except aiohttp.ClientResponseError as e:
            if e.status == 429:  # rate limited
                wait = backoff_base * (2 ** attempt)
                logger.warning("Rate limited on %s, waiting %.1fs", url, wait)
                await asyncio.sleep(wait)
            elif e.status >= 500:  # server error, retry
                wait = backoff_base * attempt
                logger.warning("Server error %d on %s, retry %d/%d", e.status, url, attempt, max_retries)
                await asyncio.sleep(wait)
            else:
                logger.error("Client error %d on %s, not retrying", e.status, url)
                return None

        except asyncio.TimeoutError:
            logger.warning("Timeout on %s, retry %d/%d", url, attempt, max_retries)
            await asyncio.sleep(backoff_base * attempt)

        except Exception as e:
            logger.error("Unexpected error on %s: %s", url, str(e))
            return None

    logger.error("All %d retries failed for %s", max_retries, url)
    return None

Using Retry Logic in a Pipeline

async def extract_all_endpoints(endpoints: list[dict]) -> list[dict]:
    """Extract data from multiple API endpoints with error handling."""
    semaphore = asyncio.Semaphore(10)

    async def fetch_endpoint(session, endpoint):
        async with semaphore:
            result = await fetch_with_retry(session, endpoint["url"])
            if result is not None:
                return {"source": endpoint["name"], "data": result}
            return {"source": endpoint["name"], "data": None, "error": True}

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_endpoint(session, ep) for ep in endpoints]
        results = await asyncio.gather(*tasks)

    # Report extraction summary
    success = sum(1 for r in results if not r.get("error"))
    failed = len(results) - success
    logger.info("Extraction complete: %d success, %d failed", success, failed)

    return [r for r in results if not r.get("error")]

Step 4: Async File Processing

Process large files without blocking the event loop:

import aiofiles
import asyncio
import json
from pathlib import Path

async def process_json_files(directory: str) -> list[dict]:
    """Read and parse multiple JSON files concurrently."""
    files = list(Path(directory).glob("*.json"))
    semaphore = asyncio.Semaphore(20)  # limit open file handles

    async def read_one(filepath: Path) -> dict | None:
        async with semaphore:
            try:
                async with aiofiles.open(filepath, mode="r") as f:
                    content = await f.read()
                    return json.loads(content)
            except Exception as e:
                logger.error("Failed to read %s: %s", filepath.name, str(e))
                return None

    tasks = [read_one(f) for f in files]
    results = await asyncio.gather(*tasks)
    return [r for r in results if r is not None]

Writing Results Asynchronously

async def save_results(results: list[dict], output_path: str):
    """Write results to a JSON file asynchronously."""
    async with aiofiles.open(output_path, mode="w") as f:
        await f.write(json.dumps(results, indent=2))
    logger.info("Saved %d results to %s", len(results), output_path)

Step 5: Mixing Sync and Async Code

Most data pipelines have sync components (pandas, database drivers). Use asyncio.to_thread() to run them without blocking:

import asyncio
import pandas as pd

def transform_sync(data: list[dict]) -> pd.DataFrame:
    """Synchronous pandas transformation (CPU-bound work)."""
    df = pd.DataFrame(data)
    df["amount"] = df["amount"].astype(float)
    df["date"] = pd.to_datetime(df["date"])
    df = df.dropna(subset=["amount"])
    return df.groupby("date")["amount"].sum().reset_index()

async def async_pipeline():
    """Full pipeline mixing async I/O with sync transforms."""
    # Step 1: Async extraction (I/O bound)
    raw_data = await fetch_all_async(api_urls)

    # Step 2: Sync transformation (CPU bound) — run in thread
    df = await asyncio.to_thread(transform_sync, raw_data)

    # Step 3: Async file write (I/O bound)
    await save_results(df.to_dict(orient="records"), "output.json")

    return df

# Run the full pipeline
result = asyncio.run(async_pipeline())

When to Use What

OperationTypeUse
HTTP requestsI/O boundaiohttp (async)
File reads/writesI/O boundaiofiles (async)
Database queriesI/O boundasyncpg or aiomysql (async)
pandas transformsCPU boundasyncio.to_thread()
NumPy calculationsCPU boundasyncio.to_thread()
Image processingCPU boundasyncio.to_thread() or ProcessPoolExecutor

Step 6: Production-Ready Async Extractor

Combining all patterns into a reusable class:

import aiohttp
import asyncio
import logging
import time

logger = logging.getLogger("extractor")

class AsyncExtractor:
    """Production async data extractor with rate limiting and retries."""

    def __init__(self, max_concurrent=10, max_retries=3, timeout=10):
        self.max_concurrent = max_concurrent
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.stats = {"success": 0, "failed": 0, "retried": 0}

    async def extract(self, urls: list[str]) -> list[dict]:
        """Extract data from multiple URLs concurrently."""
        start = time.perf_counter()

        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            tasks = [self._fetch(session, url) for url in urls]
            results = await asyncio.gather(*tasks)

        duration = time.perf_counter() - start
        valid = [r for r in results if r is not None]

        logger.info(
            "Extraction complete: %d/%d success in %.1fs (%.0f req/s)",
            len(valid),
            len(urls),
            duration,
            len(urls) / max(duration, 0.001),
        )

        return valid

    async def _fetch(self, session, url):
        async with self.semaphore:
            return await fetch_with_retry(
                session, url, max_retries=self.max_retries
            )

Usage

async def main():
    extractor = AsyncExtractor(max_concurrent=10, max_retries=3)
    data = await extractor.extract(urls)
    df = await asyncio.to_thread(transform_sync, data)
    print(f"Processed {len(df)} records")

asyncio.run(main())

Performance Comparison

Approach50 APIs200 APIs1000 APIs
Synchronous (requests)15s60s300s
Threading (concurrent.futures)3s10s45s
Async (aiohttp, limit=10)1.5s6s30s
Async (aiohttp, limit=50)0.4s1.5s7s

Async with a reasonable concurrency limit gives the best balance of speed and reliability. Threading has higher memory overhead and is harder to manage at scale.

What This Replaces

Sequential PipelineAsync Pipeline
One request at a timeConcurrent requests with controlled parallelism
15 seconds for 50 API callsUnder 1 second for 50 API calls
No rate limitingSemaphore-based concurrency caps
One failure blocks everythingPer-request retry with backoff
requests library onlyaiohttp + asyncio.to_thread() for sync code
Wastes 95% of time waitingOverlaps I/O wait with useful work

Next Steps

Async extraction is the first step to faster data pipelines. Build on it with:

Start by replacing your slowest extraction script. If it makes sequential API calls, wrap them in asyncio.gather() with a semaphore. The speedup is immediate and the code change is minimal.

Need help optimising your data collection pipelines? Get in touch or explore our data analytics services.

async python data pipeline asyncio python tutorial aiohttp api calls python python concurrent requests async data collection python python asyncio semaphore parallel api requests python async web scraping python python async await patterns fast data extraction python

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles