}

Python asyncio: async/await with Real Examples (APIs, Scraping, Database)

Python asyncio: async/await with Real Examples (APIs, Scraping, Database)

Most Python code you write is synchronous: each line runs, finishes, and only then does the next line start. That works fine until your program spends most of its time waiting — waiting for an HTTP response, a database query, or a file read. asyncio fixes that.

Why async? Blocking vs non-blocking I/O

When a regular requests.get() call fires, your Python thread is frozen. It cannot do anything else while the network round-trip completes. With 10 URLs that each take 200 ms, you wait 2 seconds total — sequentially.

Async I/O lets the event loop keep a list of pending operations. When one task is waiting on the network, the loop switches to another task that is ready to run. The 10 URLs that took 2 seconds sequentially can finish in ~200 ms concurrently, because the waiting happens in parallel.

Key insight: async helps with I/O-bound work (network, disk, DB). It does not help with CPU-bound work (number crunching, image processing). For CPU-bound tasks, use concurrent.futures.ProcessPoolExecutor instead — more on that at the end.

The event loop is the scheduler. It runs in a single thread, continuously checking which coroutines have data ready and resuming them.

async def and await: coroutines

An async def function is a coroutine. Calling it does not execute it — it returns a coroutine object. You need to await it (or schedule it as a task) to actually run it.

import asyncio

async def fetch_data(url: str) -> str:
    # Simulate a network call
    await asyncio.sleep(0.2)
    return f"data from {url}"

async def main():
    result = await fetch_data("https://api.example.com/users")
    print(result)

asyncio.run(main())

await suspends the current coroutine and gives control back to the event loop until the awaited thing completes. Only async functions can use await.

asyncio.run(): the entry point

asyncio.run() is the standard way to launch an async program. It creates a fresh event loop, runs the given coroutine to completion, and then closes the loop.

asyncio.run(main())

In Python 3.11+, asyncio.run() received a debug parameter and better traceback support. It also gained a loop_factory argument for plugging in alternative event loops like uvloop.

Never call asyncio.run() inside an already-running event loop (e.g., inside Jupyter — use await main() there instead).

Real example 1: Fetch 10 URLs concurrently with httpx

httpx is a modern HTTP client with first-class async support. Install it with pip install httpx.

import asyncio
import time
import httpx

URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/uuid",
    "https://httpbin.org/uuid",
    "https://httpbin.org/uuid",
    "https://httpbin.org/uuid",
    "https://httpbin.org/uuid",
]

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url, timeout=10)
    response.raise_for_status()
    return {"url": url, "status": response.status_code}

async def fetch_all_concurrent():
    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url) for url in URLS]
        results = await asyncio.gather(*tasks)
    return results

# Sequential baseline for comparison
def fetch_all_sequential():
    import httpx as _httpx
    results = []
    with _httpx.Client() as client:
        for url in URLS:
            r = client.get(url, timeout=10)
            results.append({"url": url, "status": r.status_code})
    return results

if __name__ == "__main__":
    start = time.perf_counter()
    asyncio.run(fetch_all_concurrent())
    concurrent_time = time.perf_counter() - start
    print(f"Concurrent: {concurrent_time:.2f}s")

    start = time.perf_counter()
    fetch_all_sequential()
    sequential_time = time.perf_counter() - start
    print(f"Sequential: {sequential_time:.2f}s")
    print(f"Speedup: {sequential_time / concurrent_time:.1f}x")

Typical output against httpbin.org/delay/1 (each request takes ~1 second):

Concurrent: 1.12s
Sequential: 10.84s
Speedup: 9.7x

That is close to 10x because all 10 requests are in-flight at the same time.

asyncio.gather(): run multiple coroutines concurrently

asyncio.gather() takes multiple coroutines (or tasks) and runs them concurrently. It returns a list of results in the same order as the inputs.

results = await asyncio.gather(
    fetch(client, "https://api.example.com/users"),
    fetch(client, "https://api.example.com/posts"),
    fetch(client, "https://api.example.com/comments"),
)

By default, if any coroutine raises an exception, gather() cancels the rest and re-raises. Use return_exceptions=True to collect exceptions as values instead:

results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Task {i} failed: {result}")
    else:
        print(f"Task {i} succeeded: {result}")

This is essential for production code where some URLs may be down.

asyncio.create_task(): background tasks

create_task() schedules a coroutine to run concurrently without waiting for it immediately. The task starts running as soon as the event loop gets control.

async def log_metrics():
    while True:
        print("heartbeat")
        await asyncio.sleep(60)

async def main():
    # Fire and forget — runs in background
    task = asyncio.create_task(log_metrics())

    # Do the real work
    await fetch_all_concurrent()

    # Clean up the background task when done
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

Always keep a reference to the task object. If you discard it, the garbage collector may cancel it unexpectedly.

Real example 2: Async web scraping with aiohttp + BeautifulSoup

aiohttp is another popular async HTTP library, particularly common in scraping pipelines. Install with pip install aiohttp beautifulsoup4.

import asyncio
import aiohttp
from bs4 import BeautifulSoup

PAGES = [
    "https://news.ycombinator.com/",
    "https://lobste.rs/",
    "https://tildes.net/",
    "https://old.reddit.com/r/python/",
    "https://old.reddit.com/r/programming/",
]

async def scrape_page(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}) as resp:
        html = await resp.text()

    soup = BeautifulSoup(html, "html.parser")
    title = soup.find("title")
    links = soup.find_all("a", limit=5)

    return {
        "url": url,
        "title": title.text.strip() if title else "N/A",
        "sample_links": [a.get("href", "") for a in links],
    }

async def scrape_all():
    timeout = aiohttp.ClientTimeout(total=15)
    connector = aiohttp.TCPConnector(limit=10)  # max 10 concurrent connections

    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        tasks = [
            asyncio.create_task(scrape_page(session, url))
            for url in PAGES
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, Exception):
            print(f"Scrape error: {result}")
        else:
            print(f"{result['title']} — {result['url']}")

asyncio.run(scrape_all())

TCPConnector(limit=10) caps the total number of open connections, preventing you from accidentally hammering a server or exhausting file descriptors.

Async database with SQLAlchemy

SQLAlchemy 1.4+ supports async via sqlalchemy.ext.asyncio. Install with pip install sqlalchemy[asyncio] asyncpg (for PostgreSQL).

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"

engine = create_async_engine(DATABASE_URL, pool_size=10, max_overflow=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    email: Mapped[str]

async def get_user(session: AsyncSession, user_id: int) -> User | None:
    result = await session.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()

Note async_sessionmaker (introduced in SQLAlchemy 2.0) instead of the older sessionmaker. It produces AsyncSession objects automatically.

Real example 3: Process 1000 DB records with asyncio.Semaphore

Fetching 1000 records concurrently without any limit will open 1000 connections simultaneously and likely crash your database. asyncio.Semaphore limits concurrent operations:

import asyncio
from sqlalchemy.ext.asyncio import AsyncSession

async def process_record(session: AsyncSession, record_id: int) -> dict:
    result = await session.execute(
        text("SELECT id, data FROM records WHERE id = :id"),
        {"id": record_id}
    )
    row = result.fetchone()
    # Simulate some processing
    await asyncio.sleep(0.01)
    return {"id": row.id, "processed": True}

async def process_all_records():
    record_ids = list(range(1, 1001))  # 1000 records
    semaphore = asyncio.Semaphore(20)  # max 20 concurrent DB operations

    async def bounded_process(record_id: int) -> dict:
        async with semaphore:
            async with AsyncSessionLocal() as session:
                return await process_record(session, record_id)

    tasks = [bounded_process(rid) for rid in record_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    print(f"Processed: {len(successes)}, Failed: {len(failures)}")

asyncio.run(process_all_records())

With Semaphore(20), at most 20 sessions are open at any moment, keeping your database connection pool healthy.

asyncio.Queue: producer-consumer pattern

asyncio.Queue is perfect for streaming pipelines where data arrives faster than it can be processed, or when you want to decouple producers from consumers.

import asyncio
import httpx

async def producer(queue: asyncio.Queue, urls: list[str]):
    async with httpx.AsyncClient() as client:
        for url in urls:
            response = await client.get(url)
            await queue.put(response.text)
    await queue.put(None)  # Sentinel to signal completion

async def consumer(queue: asyncio.Queue, results: list):
    while True:
        item = await queue.get()
        if item is None:
            break
        # Process the page content
        word_count = len(item.split())
        results.append(word_count)
        queue.task_done()

async def pipeline():
    urls = [f"https://httpbin.org/uuid" for _ in range(10)]
    queue = asyncio.Queue(maxsize=5)  # Buffer of 5 items
    results = []

    producer_task = asyncio.create_task(producer(queue, urls))
    consumer_task = asyncio.create_task(consumer(queue, results))

    await asyncio.gather(producer_task, consumer_task)
    print(f"Processed {len(results)} pages")

asyncio.run(pipeline())

maxsize=5 means the producer blocks when the queue has 5 items waiting, providing natural backpressure.

Error handling in async functions

Error handling works exactly like synchronous Python — use try/except:

async def safe_fetch(client: httpx.AsyncClient, url: str) -> str | None:
    try:
        response = await client.get(url, timeout=5)
        response.raise_for_status()
        return response.text
    except httpx.TimeoutException:
        print(f"Timeout fetching {url}")
        return None
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code} from {url}")
        return None

For batch operations, combine return_exceptions=True with isinstance checks:

results = await asyncio.gather(*tasks, return_exceptions=True)
good = [r for r in results if not isinstance(r, Exception)]
bad  = [r for r in results if isinstance(r, Exception)]

asyncio.timeout(): Python 3.11+ context manager

Before 3.11, timeouts required asyncio.wait_for(coro, timeout=5). Python 3.11 introduced asyncio.timeout() as a cleaner context manager:

import asyncio

async def fetch_with_timeout(url: str):
    async with asyncio.timeout(5.0):  # 5 second deadline
        async with httpx.AsyncClient() as client:
            return await client.get(url)

# You can also update the deadline dynamically
async def adaptive_fetch(url: str):
    async with asyncio.timeout(10.0) as cm:
        response = await fetch_something_quick()
        if response.needs_more_data:
            cm.reschedule(asyncio.get_event_loop().time() + 20.0)
        return await fetch_the_rest()

asyncio.timeout() raises TimeoutError (the built-in, not asyncio.TimeoutError) on expiry.

When NOT to use async: CPU-bound tasks

Async is not a magic speed-up for all code. If your bottleneck is the CPU — image resizing, JSON parsing of huge files, ML inference, encryption — async will not help. The event loop is single-threaded, and a CPU-intensive coroutine will block it just as badly as blocking I/O.

Use concurrent.futures.ProcessPoolExecutor for CPU-bound work:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(n: int) -> int:
    # Purely CPU-bound: sum of squares
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        # Runs in a separate process — does not block the event loop
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_heavy, 10_000_000),
            loop.run_in_executor(pool, cpu_heavy, 10_000_000),
            loop.run_in_executor(pool, cpu_heavy, 10_000_000),
        )
    print(results)

asyncio.run(main())

asyncio.to_thread(): run blocking code without blocking the event loop

Sometimes you must use a synchronous library (legacy code, a library with no async version). asyncio.to_thread() runs a blocking function in a thread pool without freezing the event loop:

import asyncio
import time

def blocking_read_file(path: str) -> str:
    time.sleep(1)  # Simulate slow disk I/O
    with open(path) as f:
        return f.read()

async def main():
    # Runs in a thread — event loop stays responsive
    content = await asyncio.to_thread(blocking_read_file, "/etc/hosts")
    print(f"Read {len(content)} bytes")

asyncio.run(main())

asyncio.to_thread() is equivalent to loop.run_in_executor(None, fn, *args) but with a cleaner API. It uses the default ThreadPoolExecutor managed by the event loop.

Quick-reference summary

ConceptUse when
asyncio.gather()Run a fixed set of coroutines concurrently
asyncio.create_task()Schedule background work, fire-and-forget
asyncio.SemaphoreLimit concurrent access to a shared resource
asyncio.QueueProducer-consumer pipelines, backpressure
asyncio.timeout()Set a deadline on any awaitable (Python 3.11+)
asyncio.to_thread()Wrap a blocking function to avoid blocking the loop
ProcessPoolExecutorCPU-bound work that would block the event loop

Conclusion

asyncio turns Python's single thread into a highly efficient I/O scheduler. The pattern is straightforward: write async def functions, await anything that does I/O, and let asyncio.gather() or create_task() run them concurrently. Real-world gains of 5-10x on network-heavy workloads are common and easy to achieve with httpx, aiohttp, or SQLAlchemy's async engine. Just remember: async is for waiting, not for computing — keep your CPU-heavy work in a ProcessPoolExecutor.

Leonardo Lazzaro

Software engineer and technical writer. 10+ years experience in DevOps, Python, and Linux systems.

More articles by Leonardo Lazzaro