Async Python for Faster Data Collection and Processing
Speed up API calls, web scraping, and file processing with async Python. Covers asyncio, aiohttp, semaphores for rate limiting, and patterns for mixing sync and async code.
AI Generated ImageA pipeline that calls 50 APIs sequentially spends 95% of its time waiting for network responses. Each call takes 200–500 milliseconds, and the next one cannot start until the previous one finishes. A 50-endpoint extraction that should take 2 seconds takes 15.
Async Python fixes this. Instead of waiting for each response before making the next request, you fire all of them concurrently and process results as they arrive. The same 50 API calls complete in under a second.
This guide covers practical async patterns for data collection and processing — from basic asyncio to production-ready patterns with rate limiting, error handling, and graceful degradation.
# Who This Is For
- Data engineers whose pipelines spend most of their runtime waiting for API responses
- Developers pulling data from multiple sources who want to cut extraction time dramatically
- Vibe coders who built a working scraper or API client and want to make it 10x faster
- Anyone running sequential HTTP calls in a loop and wondering why it takes so long
You need solid Python fundamentals — functions, loops, error handling. No prior async experience required. This guide introduces the concepts with clear before/after comparisons.
# Sync vs Async: The Core Difference
flowchart LR
subgraph Synchronous
direction TB
S1["Call API 1\n(wait 300ms)"] --> S2["Call API 2\n(wait 250ms)"]
S2 --> S3["Call API 3\n(wait 400ms)"]
S3 --> SR["Total: 950ms"]
end
subgraph Asynchronous
direction TB
A1["Call API 1"] --> AR["Total: 400ms\n(slowest call)"]
A2["Call API 2"] --> AR
A3["Call API 3"] --> AR
endIn synchronous code, each I/O operation blocks the entire program. In async code, while one request waits for a response, the event loop starts the next request.
# What You Will Need
pip install aiohttp aiofiles asyncio
- aiohttp — async HTTP client (replaces requests for concurrent calls)
- aiofiles — async file I/O
- asyncio — built into Python 3.7+
# Step 1: Your First Async Pipeline
# The Slow Way (Synchronous)
import requests
import time
def fetch_all_sync(urls: list[str]) -> list[dict]:
"""Fetch URLs one at a time. Painfully slow."""
results = []
for url in urls:
response = requests.get(url, timeout=10)
results.append(response.json())
return results
# 50 URLs × 300ms average = ~15 seconds
start = time.perf_counter()
data = fetch_all_sync(urls)
print(f"Sync: {time.perf_counter() - start:.1f}s")
# => Sync: 14.8s
# The Fast Way (Asynchronous)
import aiohttp
import asyncio
import time
async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
"""Fetch a single URL asynchronously."""
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return await response.json()
async def fetch_all_async(urls: list[str]) -> list[dict]:
"""Fetch all URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 50 URLs concurrently = ~0.4 seconds (limited by slowest response)
start = time.perf_counter()
data = asyncio.run(fetch_all_async(urls))
print(f"Async: {time.perf_counter() - start:.1f}s")
# => Async: 0.4s
Same result, 37× faster. The speedup comes from overlapping wait times — while one request is in flight, the event loop processes others.
# Step 2: Rate Limiting with Semaphores
Firing 1,000 requests simultaneously will get you rate-limited or banned. Use a semaphore to cap concurrency:
import aiohttp
import asyncio
async def fetch_with_limit(
urls: list[str],
max_concurrent: int = 10,
) -> list[dict]:
"""Fetch URLs with a concurrency limit."""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(session, url):
async with semaphore: # at most max_concurrent requests at once
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return await resp.json()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 1,000 URLs, max 10 at a time
data = asyncio.run(fetch_with_limit(urls, max_concurrent=10))
# Choosing the Right Concurrency Limit
| API Type | Suggested Limit | Reasoning |
|---|---|---|
| Your own API | 20–50 | You control the server capacity |
| Third-party API (paid) | 5–10 | Respect rate limits in their docs |
| Third-party API (free tier) | 2–5 | Aggressive limits, err on the safe side |
| Web scraping | 3–5 | Polite crawling avoids IP bans |
| Database queries | Pool size (e.g., 10) | Match your connection pool |
# Step 3: Error Handling in Async Pipelines
asyncio.gather() fails fast — one error cancels everything. Use return_exceptions=True or handle errors per-task:
import aiohttp
import asyncio
import logging
logger = logging.getLogger("async_pipeline")
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3,
backoff_base: float = 1.0,
) -> dict | None:
"""Fetch a URL with exponential backoff retry."""
for attempt in range(1, max_retries + 1):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientResponseError as e:
if e.status == 429: # rate limited
wait = backoff_base * (2 ** attempt)
logger.warning("Rate limited on %s, waiting %.1fs", url, wait)
await asyncio.sleep(wait)
elif e.status >= 500: # server error, retry
wait = backoff_base * attempt
logger.warning("Server error %d on %s, retry %d/%d", e.status, url, attempt, max_retries)
await asyncio.sleep(wait)
else:
logger.error("Client error %d on %s, not retrying", e.status, url)
return None
except asyncio.TimeoutError:
logger.warning("Timeout on %s, retry %d/%d", url, attempt, max_retries)
await asyncio.sleep(backoff_base * attempt)
except Exception as e:
logger.error("Unexpected error on %s: %s", url, str(e))
return None
logger.error("All %d retries failed for %s", max_retries, url)
return None
# Using Retry Logic in a Pipeline
async def extract_all_endpoints(endpoints: list[dict]) -> list[dict]:
"""Extract data from multiple API endpoints with error handling."""
semaphore = asyncio.Semaphore(10)
async def fetch_endpoint(session, endpoint):
async with semaphore:
result = await fetch_with_retry(session, endpoint["url"])
if result is not None:
return {"source": endpoint["name"], "data": result}
return {"source": endpoint["name"], "data": None, "error": True}
async with aiohttp.ClientSession() as session:
tasks = [fetch_endpoint(session, ep) for ep in endpoints]
results = await asyncio.gather(*tasks)
# Report extraction summary
success = sum(1 for r in results if not r.get("error"))
failed = len(results) - success
logger.info("Extraction complete: %d success, %d failed", success, failed)
return [r for r in results if not r.get("error")]
# Step 4: Async File Processing
Process large files without blocking the event loop:
import aiofiles
import asyncio
import json
from pathlib import Path
async def process_json_files(directory: str) -> list[dict]:
"""Read and parse multiple JSON files concurrently."""
files = list(Path(directory).glob("*.json"))
semaphore = asyncio.Semaphore(20) # limit open file handles
async def read_one(filepath: Path) -> dict | None:
async with semaphore:
try:
async with aiofiles.open(filepath, mode="r") as f:
content = await f.read()
return json.loads(content)
except Exception as e:
logger.error("Failed to read %s: %s", filepath.name, str(e))
return None
tasks = [read_one(f) for f in files]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
# Writing Results Asynchronously
async def save_results(results: list[dict], output_path: str):
"""Write results to a JSON file asynchronously."""
async with aiofiles.open(output_path, mode="w") as f:
await f.write(json.dumps(results, indent=2))
logger.info("Saved %d results to %s", len(results), output_path)
# Step 5: Mixing Sync and Async Code
Most data pipelines have sync components (pandas, database drivers). Use asyncio.to_thread() to run them without blocking:
import asyncio
import pandas as pd
def transform_sync(data: list[dict]) -> pd.DataFrame:
"""Synchronous pandas transformation (CPU-bound work)."""
df = pd.DataFrame(data)
df["amount"] = df["amount"].astype(float)
df["date"] = pd.to_datetime(df["date"])
df = df.dropna(subset=["amount"])
return df.groupby("date")["amount"].sum().reset_index()
async def async_pipeline():
"""Full pipeline mixing async I/O with sync transforms."""
# Step 1: Async extraction (I/O bound)
raw_data = await fetch_all_async(api_urls)
# Step 2: Sync transformation (CPU bound) — run in thread
df = await asyncio.to_thread(transform_sync, raw_data)
# Step 3: Async file write (I/O bound)
await save_results(df.to_dict(orient="records"), "output.json")
return df
# Run the full pipeline
result = asyncio.run(async_pipeline())
# When to Use What
| Operation | Type | Use |
|---|---|---|
| HTTP requests | I/O bound | aiohttp (async) |
| File reads/writes | I/O bound | aiofiles (async) |
| Database queries | I/O bound | asyncpg or aiomysql (async) |
| pandas transforms | CPU bound | asyncio.to_thread() |
| NumPy calculations | CPU bound | asyncio.to_thread() |
| Image processing | CPU bound | asyncio.to_thread() or ProcessPoolExecutor |
# Step 6: Production-Ready Async Extractor
Combining all patterns into a reusable class:
import aiohttp
import asyncio
import logging
import time
logger = logging.getLogger("extractor")
class AsyncExtractor:
"""Production async data extractor with rate limiting and retries."""
def __init__(self, max_concurrent=10, max_retries=3, timeout=10):
self.max_concurrent = max_concurrent
self.max_retries = max_retries
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
self.stats = {"success": 0, "failed": 0, "retried": 0}
async def extract(self, urls: list[str]) -> list[dict]:
"""Extract data from multiple URLs concurrently."""
start = time.perf_counter()
async with aiohttp.ClientSession(timeout=self.timeout) as session:
tasks = [self._fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
duration = time.perf_counter() - start
valid = [r for r in results if r is not None]
logger.info(
"Extraction complete: %d/%d success in %.1fs (%.0f req/s)",
len(valid),
len(urls),
duration,
len(urls) / max(duration, 0.001),
)
return valid
async def _fetch(self, session, url):
async with self.semaphore:
return await fetch_with_retry(
session, url, max_retries=self.max_retries
)
# Usage
async def main():
extractor = AsyncExtractor(max_concurrent=10, max_retries=3)
data = await extractor.extract(urls)
df = await asyncio.to_thread(transform_sync, data)
print(f"Processed {len(df)} records")
asyncio.run(main())
# Performance Comparison
| Approach | 50 APIs | 200 APIs | 1000 APIs |
|---|---|---|---|
Synchronous (requests) |
15s | 60s | 300s |
Threading (concurrent.futures) |
3s | 10s | 45s |
Async (aiohttp, limit=10) |
1.5s | 6s | 30s |
Async (aiohttp, limit=50) |
0.4s | 1.5s | 7s |
Async with a reasonable concurrency limit gives the best balance of speed and reliability. Threading has higher memory overhead and is harder to manage at scale.
# What This Replaces
| Sequential Pipeline | Async Pipeline |
|---|---|
| One request at a time | Concurrent requests with controlled parallelism |
| 15 seconds for 50 API calls | Under 1 second for 50 API calls |
| No rate limiting | Semaphore-based concurrency caps |
| One failure blocks everything | Per-request retry with backoff |
requests library only |
aiohttp + asyncio.to_thread() for sync code |
| Wastes 95% of time waiting | Overlaps I/O wait with useful work |
# Next Steps
Async extraction is the first step to faster data pipelines. Build on it with:
- Process extracted data through reliable pipeline architectures
- Add structured logging to track request timings and failures
- Feed async extractions into event-driven pipelines for real-time processing
- Test async code with pytest-asyncio fixtures
Start by replacing your slowest extraction script. If it makes sequential API calls, wrap them in asyncio.gather() with a semaphore. The speedup is immediate and the code change is minimal.
Need help optimising your data collection pipelines? Get in touch or explore our data analytics services.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.