Python asyncio: async/await with Real Examples (APIs, Scraping, Database)
Most Python code you write is synchronous: each line runs, finishes, and only then does the next line start. That works fine until your program spends most of its time waiting — waiting for an HTTP response, a database query, or a file read. asyncio fixes that.
Why async? Blocking vs non-blocking I/O
When a regular requests.get() call fires, your Python thread is frozen. It cannot do anything else while the network round-trip completes. With 10 URLs that each take 200 ms, you wait 2 seconds total — sequentially.
Async I/O lets the event loop keep a list of pending operations. When one task is waiting on the network, the loop switches to another task that is ready to run. The 10 URLs that took 2 seconds sequentially can finish in ~200 ms concurrently, because the waiting happens in parallel.
Key insight: async helps with I/O-bound work (network, disk, DB). It does not help with CPU-bound work (number crunching, image processing). For CPU-bound tasks, use concurrent.futures.ProcessPoolExecutor instead — more on that at the end.
The event loop is the scheduler. It runs in a single thread, continuously checking which coroutines have data ready and resuming them.
async def and await: coroutines
An async def function is a coroutine. Calling it does not execute it — it returns a coroutine object. You need to await it (or schedule it as a task) to actually run it.
import asyncio
async def fetch_data(url: str) -> str:
# Simulate a network call
await asyncio.sleep(0.2)
return f"data from {url}"
async def main():
result = await fetch_data("https://api.example.com/users")
print(result)
asyncio.run(main())
await suspends the current coroutine and gives control back to the event loop until the awaited thing completes. Only async functions can use await.
asyncio.run(): the entry point
asyncio.run() is the standard way to launch an async program. It creates a fresh event loop, runs the given coroutine to completion, and then closes the loop.
asyncio.run(main())
In Python 3.11+, asyncio.run() received a debug parameter and better traceback support. It also gained a loop_factory argument for plugging in alternative event loops like uvloop.
Never call asyncio.run() inside an already-running event loop (e.g., inside Jupyter — use await main() there instead).
Real example 1: Fetch 10 URLs concurrently with httpx
httpx is a modern HTTP client with first-class async support. Install it with pip install httpx.
import asyncio
import time
import httpx
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/uuid",
"https://httpbin.org/uuid",
"https://httpbin.org/uuid",
"https://httpbin.org/uuid",
"https://httpbin.org/uuid",
]
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url, timeout=10)
response.raise_for_status()
return {"url": url, "status": response.status_code}
async def fetch_all_concurrent():
async with httpx.AsyncClient() as client:
tasks = [fetch(client, url) for url in URLS]
results = await asyncio.gather(*tasks)
return results
# Sequential baseline for comparison
def fetch_all_sequential():
import httpx as _httpx
results = []
with _httpx.Client() as client:
for url in URLS:
r = client.get(url, timeout=10)
results.append({"url": url, "status": r.status_code})
return results
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(fetch_all_concurrent())
concurrent_time = time.perf_counter() - start
print(f"Concurrent: {concurrent_time:.2f}s")
start = time.perf_counter()
fetch_all_sequential()
sequential_time = time.perf_counter() - start
print(f"Sequential: {sequential_time:.2f}s")
print(f"Speedup: {sequential_time / concurrent_time:.1f}x")
Typical output against httpbin.org/delay/1 (each request takes ~1 second):
Concurrent: 1.12s
Sequential: 10.84s
Speedup: 9.7x
That is close to 10x because all 10 requests are in-flight at the same time.
asyncio.gather(): run multiple coroutines concurrently
asyncio.gather() takes multiple coroutines (or tasks) and runs them concurrently. It returns a list of results in the same order as the inputs.
results = await asyncio.gather(
fetch(client, "https://api.example.com/users"),
fetch(client, "https://api.example.com/posts"),
fetch(client, "https://api.example.com/comments"),
)
By default, if any coroutine raises an exception, gather() cancels the rest and re-raises. Use return_exceptions=True to collect exceptions as values instead:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
This is essential for production code where some URLs may be down.
asyncio.create_task(): background tasks
create_task() schedules a coroutine to run concurrently without waiting for it immediately. The task starts running as soon as the event loop gets control.
async def log_metrics():
while True:
print("heartbeat")
await asyncio.sleep(60)
async def main():
# Fire and forget — runs in background
task = asyncio.create_task(log_metrics())
# Do the real work
await fetch_all_concurrent()
# Clean up the background task when done
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
Always keep a reference to the task object. If you discard it, the garbage collector may cancel it unexpectedly.
Real example 2: Async web scraping with aiohttp + BeautifulSoup
aiohttp is another popular async HTTP library, particularly common in scraping pipelines. Install with pip install aiohttp beautifulsoup4.
import asyncio
import aiohttp
from bs4 import BeautifulSoup
PAGES = [
"https://news.ycombinator.com/",
"https://lobste.rs/",
"https://tildes.net/",
"https://old.reddit.com/r/python/",
"https://old.reddit.com/r/programming/",
]
async def scrape_page(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}) as resp:
html = await resp.text()
soup = BeautifulSoup(html, "html.parser")
title = soup.find("title")
links = soup.find_all("a", limit=5)
return {
"url": url,
"title": title.text.strip() if title else "N/A",
"sample_links": [a.get("href", "") for a in links],
}
async def scrape_all():
timeout = aiohttp.ClientTimeout(total=15)
connector = aiohttp.TCPConnector(limit=10) # max 10 concurrent connections
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
tasks = [
asyncio.create_task(scrape_page(session, url))
for url in PAGES
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Scrape error: {result}")
else:
print(f"{result['title']} — {result['url']}")
asyncio.run(scrape_all())
TCPConnector(limit=10) caps the total number of open connections, preventing you from accidentally hammering a server or exhausting file descriptors.
Async database with SQLAlchemy
SQLAlchemy 1.4+ supports async via sqlalchemy.ext.asyncio. Install with pip install sqlalchemy[asyncio] asyncpg (for PostgreSQL).
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"
engine = create_async_engine(DATABASE_URL, pool_size=10, max_overflow=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
email: Mapped[str]
async def get_user(session: AsyncSession, user_id: int) -> User | None:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
Note async_sessionmaker (introduced in SQLAlchemy 2.0) instead of the older sessionmaker. It produces AsyncSession objects automatically.
Real example 3: Process 1000 DB records with asyncio.Semaphore
Fetching 1000 records concurrently without any limit will open 1000 connections simultaneously and likely crash your database. asyncio.Semaphore limits concurrent operations:
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
async def process_record(session: AsyncSession, record_id: int) -> dict:
result = await session.execute(
text("SELECT id, data FROM records WHERE id = :id"),
{"id": record_id}
)
row = result.fetchone()
# Simulate some processing
await asyncio.sleep(0.01)
return {"id": row.id, "processed": True}
async def process_all_records():
record_ids = list(range(1, 1001)) # 1000 records
semaphore = asyncio.Semaphore(20) # max 20 concurrent DB operations
async def bounded_process(record_id: int) -> dict:
async with semaphore:
async with AsyncSessionLocal() as session:
return await process_record(session, record_id)
tasks = [bounded_process(rid) for rid in record_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
print(f"Processed: {len(successes)}, Failed: {len(failures)}")
asyncio.run(process_all_records())
With Semaphore(20), at most 20 sessions are open at any moment, keeping your database connection pool healthy.
asyncio.Queue: producer-consumer pattern
asyncio.Queue is perfect for streaming pipelines where data arrives faster than it can be processed, or when you want to decouple producers from consumers.
import asyncio
import httpx
async def producer(queue: asyncio.Queue, urls: list[str]):
async with httpx.AsyncClient() as client:
for url in urls:
response = await client.get(url)
await queue.put(response.text)
await queue.put(None) # Sentinel to signal completion
async def consumer(queue: asyncio.Queue, results: list):
while True:
item = await queue.get()
if item is None:
break
# Process the page content
word_count = len(item.split())
results.append(word_count)
queue.task_done()
async def pipeline():
urls = [f"https://httpbin.org/uuid" for _ in range(10)]
queue = asyncio.Queue(maxsize=5) # Buffer of 5 items
results = []
producer_task = asyncio.create_task(producer(queue, urls))
consumer_task = asyncio.create_task(consumer(queue, results))
await asyncio.gather(producer_task, consumer_task)
print(f"Processed {len(results)} pages")
asyncio.run(pipeline())
maxsize=5 means the producer blocks when the queue has 5 items waiting, providing natural backpressure.
Error handling in async functions
Error handling works exactly like synchronous Python — use try/except:
async def safe_fetch(client: httpx.AsyncClient, url: str) -> str | None:
try:
response = await client.get(url, timeout=5)
response.raise_for_status()
return response.text
except httpx.TimeoutException:
print(f"Timeout fetching {url}")
return None
except httpx.HTTPStatusError as e:
print(f"HTTP {e.response.status_code} from {url}")
return None
For batch operations, combine return_exceptions=True with isinstance checks:
results = await asyncio.gather(*tasks, return_exceptions=True)
good = [r for r in results if not isinstance(r, Exception)]
bad = [r for r in results if isinstance(r, Exception)]
asyncio.timeout(): Python 3.11+ context manager
Before 3.11, timeouts required asyncio.wait_for(coro, timeout=5). Python 3.11 introduced asyncio.timeout() as a cleaner context manager:
import asyncio
async def fetch_with_timeout(url: str):
async with asyncio.timeout(5.0): # 5 second deadline
async with httpx.AsyncClient() as client:
return await client.get(url)
# You can also update the deadline dynamically
async def adaptive_fetch(url: str):
async with asyncio.timeout(10.0) as cm:
response = await fetch_something_quick()
if response.needs_more_data:
cm.reschedule(asyncio.get_event_loop().time() + 20.0)
return await fetch_the_rest()
asyncio.timeout() raises TimeoutError (the built-in, not asyncio.TimeoutError) on expiry.
When NOT to use async: CPU-bound tasks
Async is not a magic speed-up for all code. If your bottleneck is the CPU — image resizing, JSON parsing of huge files, ML inference, encryption — async will not help. The event loop is single-threaded, and a CPU-intensive coroutine will block it just as badly as blocking I/O.
Use concurrent.futures.ProcessPoolExecutor for CPU-bound work:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(n: int) -> int:
# Purely CPU-bound: sum of squares
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
# Runs in a separate process — does not block the event loop
results = await asyncio.gather(
loop.run_in_executor(pool, cpu_heavy, 10_000_000),
loop.run_in_executor(pool, cpu_heavy, 10_000_000),
loop.run_in_executor(pool, cpu_heavy, 10_000_000),
)
print(results)
asyncio.run(main())
asyncio.to_thread(): run blocking code without blocking the event loop
Sometimes you must use a synchronous library (legacy code, a library with no async version). asyncio.to_thread() runs a blocking function in a thread pool without freezing the event loop:
import asyncio
import time
def blocking_read_file(path: str) -> str:
time.sleep(1) # Simulate slow disk I/O
with open(path) as f:
return f.read()
async def main():
# Runs in a thread — event loop stays responsive
content = await asyncio.to_thread(blocking_read_file, "/etc/hosts")
print(f"Read {len(content)} bytes")
asyncio.run(main())
asyncio.to_thread() is equivalent to loop.run_in_executor(None, fn, *args) but with a cleaner API. It uses the default ThreadPoolExecutor managed by the event loop.
Quick-reference summary
| Concept | Use when |
|---|---|
asyncio.gather() | Run a fixed set of coroutines concurrently |
asyncio.create_task() | Schedule background work, fire-and-forget |
asyncio.Semaphore | Limit concurrent access to a shared resource |
asyncio.Queue | Producer-consumer pipelines, backpressure |
asyncio.timeout() | Set a deadline on any awaitable (Python 3.11+) |
asyncio.to_thread() | Wrap a blocking function to avoid blocking the loop |
ProcessPoolExecutor | CPU-bound work that would block the event loop |
Conclusion
asyncio turns Python's single thread into a highly efficient I/O scheduler. The pattern is straightforward: write async def functions, await anything that does I/O, and let asyncio.gather() or create_task() run them concurrently. Real-world gains of 5-10x on network-heavy workloads are common and easy to achieve with httpx, aiohttp, or SQLAlchemy's async engine. Just remember: async is for waiting, not for computing — keep your CPU-heavy work in a ProcessPoolExecutor.