Senior 18 min · March 05, 2026

Python asyncio Shutdown Hangs — CancelledError Swallowing

CancelledError caught in except Exception blocks causes 30+ second SIGTERM hangs.

N
Naren Founder & Principal Engineer

20+ years shipping production Python across data and backend systems. Written from production experience, not tutorials.

Follow
Production
production tested
May 23, 2026
last updated
1,554
articles · all by Naren
 ● Production Incident 🔎 Debug Guide ⚙ Triage Commands
Quick Answer
  • Coroutine = async def function that yields control at each await point
  • Event loop runs a single thread, switching tasks when they await I/O
  • await does not block the thread – it suspends the coroutine until the future resolves
  • asyncio.gather runs tasks concurrently; return_exceptions=True prevents one failure from cancelling all
  • CancelledError must be re-raised; swallowing it breaks shutdown
  • Blocking the loop (e.g. time.sleep) freezes all concurrent tasks
✦ Definition~90s read
What is Coroutines and asyncio in Python?

Python's asyncio is a concurrency framework built around an event loop that multiplexes I/O-bound tasks via cooperative multitasking. The problem this article addresses is a specific, production-critical failure mode: when you call loop.shutdown_asyncgens() or loop.close() during graceful shutdown, your application hangs indefinitely because CancelledError exceptions are being silently swallowed by improperly structured task cancellation logic.

Imagine you're a chef cooking three dishes at once.

This isn't a bug in asyncio — it's a consequence of how the event loop's lifecycle interacts with the await contract, where a cancelled task must propagate CancelledError up through its call stack, but many developers inadvertently catch it with bare except Exception blocks or fail to re-raise it in finally clauses, leaving the loop waiting for tasks that will never complete.

In the broader Python ecosystem, asyncio competes with threading (for CPU-bound I/O) and multiprocessing (for CPU-bound computation), but it's the standard for high-concurrency network services like web servers (aiohttp, FastAPI/uvicorn), database drivers (asyncpg, aiomysql), and message queues. You should not use asyncio for CPU-bound work, legacy synchronous libraries that block the GIL, or simple scripts where the overhead of the event loop isn't justified.

The article goes beyond the simplified 'async/await is just syntactic sugar for callbacks' narrative to dissect the actual event loop architecture — the _run_once cycle, the _ready queue, and how Future objects interact with Task objects via __await__ — because understanding this is the only way to debug hangs where your asyncio.run() never returns.

Production teams at companies like Instagram (which runs one of the largest asyncio deployments) have encountered this exact issue: during rolling deploys or Kubernetes pod termination, SIGTERM triggers cancellation of all pending tasks, but if any task catches CancelledError without re-raising it, the event loop's all_tasks() set never empties, and loop.close() blocks forever. The article covers practical detection techniques — like using asyncio.Task.all_tasks() with repr() to find stuck tasks, or setting PYTHONASYNCIODEBUG=1 to log task creation — and provides patterns for safe cancellation handling, including the correct way to use asyncio.shield() and asyncio.wait_for() without leaking exceptions.

If you've ever had a Python service that refuses to shut down cleanly, this is the deep-dive you need.

Plain-English First

Imagine you're a chef cooking three dishes at once. Instead of standing over the pasta staring at it until it boils, you start the water, then chop vegetables, then check the sauce — switching tasks whenever one needs waiting. Python's asyncio works exactly like that chef: one worker (the event loop) juggles many tasks by switching between them the moment a task hits a waiting period, like a network request. You get the speed of doing many things at once without the chaos of hiring multiple cooks (threads).

Every modern Python backend eventually hits the same wall: your code spends 90% of its time waiting — waiting for a database to respond, waiting for an API call to return, waiting for a file to be read off disk. Threads are the classic answer, but they carry a tax: memory overhead, the GIL playing referee, and race conditions that appear only in production at 3am. There's a better tool for I/O-bound concurrency, and it's been in the standard library since Python 3.4, matured dramatically in 3.7, and is now the backbone of frameworks like FastAPI, aiohttp, and Starlette.

asyncio solves the 'waiting problem' through cooperative multitasking. Instead of the OS forcibly switching between threads, your coroutines voluntarily yield control back to an event loop whenever they'd otherwise block. One thread, one event loop, potentially thousands of concurrent operations — all without a mutex in sight. The catch is that everything in your call stack must understand this contract, which is what makes asyncio feel like learning a new dialect of Python at first.

By the end of this article you'll understand how the event loop actually schedules work, how coroutines differ from generators at the bytecode level, why blocking the event loop is the cardinal sin of async Python, and how to structure real production services — including proper cancellation, timeouts, error propagation, and the patterns that separate async code that scales from async code that silently serializes everything.

Why asyncio Shutdown Hangs — CancelledError Swallowing

asyncio is Python's built-in library for writing concurrent code using the async/await syntax. It implements cooperative multitasking on a single thread via an event loop that schedules coroutines — functions defined with async def that can suspend execution at await points. The core mechanic is explicit yielding: a coroutine voluntarily pauses, allowing the loop to run other coroutines until the awaited operation completes.

In practice, asyncio works by wrapping I/O-bound operations (network requests, file reads, sleep) into awaitable objects. The event loop polls registered file descriptors and timers, resuming coroutines when data arrives or time passes. Key properties: all coroutines share one thread, so no locks are needed for shared state, but a long-running CPU-bound coroutine blocks the entire loop. Cancellation is cooperative — a CancelledError is raised at the next await, but if a coroutine catches and ignores it, shutdown hangs.

Use asyncio when your application is I/O-bound with many concurrent connections — web servers, API gateways, data pipelines. It matters because it enables handling tens of thousands of connections per process without the overhead of threads or processes. The critical nuance: asyncio does not make Python faster; it makes waiting faster. If your workload is CPU-bound, use multiprocessing or threads.

CancelledError Is Not Optional
Catching BaseException or bare except: in an async function silently swallows CancelledError, causing the event loop to hang on shutdown — always use except Exception or re-raise CancelledError.
Production Insight
A microservice that catches all exceptions in a long-lived background task (e.g., a Kafka consumer loop) swallows CancelledError during graceful shutdown.
The symptom: SIGTERM is sent, the process logs nothing, and after 30 seconds Kubernetes kills it with SIGKILL, leaving uncommitted offsets and message reprocessing.
Rule: never use bare except in async code; always re-raise CancelledError or use asyncio.shield() explicitly when you intend to suppress cancellation.
Key Takeaway
CancelledError inherits from BaseException, not Exception — bare except: catches it silently.
Always use asyncio.gather(return_exceptions=True) or task.cancel() with explicit handling.
Test shutdown by sending SIGTERM and verifying all tasks complete within your grace period.
asyncio Shutdown Hang: CancelledError Swallowing THECODEFORGE.IO asyncio Shutdown Hang: CancelledError Swallowing Event loop lifecycle, task cancellation, and blocking code traps Event Loop Lifecycle Run until complete or stop() called Task Cancellation CancelledError raised on cancel() CancelledError Swallowing except Exception catches it silently Blocking Code Detection Use loop.slow_callback_duration run_in_executor Strategy Offload blocking I/O to thread pool Structured Concurrency TaskGroup ensures proper cleanup ⚠ Swallowing CancelledError prevents shutdown Use except asyncio.CancelledError: raise or TaskGroup THECODEFORGE.IO
thecodeforge.io
asyncio Shutdown Hang: CancelledError Swallowing
Coroutines Asyncio Python

How the Event Loop Actually Works — Not the Simplified Version

Most explanations stop at 'the event loop runs coroutines'. That's not enough when something breaks in production. The event loop is essentially a tight loop around a system call — select, epoll, or kqueue depending on your OS — that asks the kernel: 'which of these file descriptors are ready?'. When one is ready, the loop wakes up the coroutine that was waiting on it and resumes execution from the exact point it yielded.

A coroutine is a function defined with async def. Under the hood it compiles to a code object whose frame can be suspended and resumed. When you await something, Python calls __await__ on the awaitable, which ultimately bottoms out in a Future object. That Future registers a callback with the event loop. The coroutine's frame is frozen — local variables and all — until the Future resolves, at which point the loop schedules its resumption.

This is why you can have 10,000 concurrent HTTP connections on a single thread: no connection holds the thread while waiting. Each coroutine's frame costs roughly 1-2 KB of heap memory — orders of magnitude cheaper than an OS thread's 1-8 MB stack.

Understanding this model is what makes the rule 'never block the event loop' feel obvious rather than arbitrary: if your coroutine calls time.sleep(5), the entire event loop freezes for five seconds because it's still on the same thread. Every other 'concurrent' coroutine is stuck waiting.

event_loop_internals.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time

# ---------------------------------------------------------------------------
# This example shows TWO tasks running concurrently on ONE thread.
# Watch the timestamps — both tasks overlap in wall time.
# ---------------------------------------------------------------------------

async def fetch_user_profile(user_id: int) -> dict:
    """Simulates a DB round-trip with a network delay."""
    print(f"[{time.perf_counter():.3f}s] START  fetch_user_profile({user_id})")
    # asyncio.sleep yields control back to the event loop.
    # The loop can run OTHER coroutines while this one 'waits'.
    await asyncio.sleep(1.0)  # pretend this is: await db.fetch_one(query)
    print(f"[{time.perf_counter():.3f}s] FINISH fetch_user_profile({user_id})")
    return {"id": user_id, "name": f"User_{user_id}"}


async def fetch_user_orders(user_id: int) -> list:
    """Simulates a second DB query running at the same time."""
    print(f"[{time.perf_counter():.3f}s] START  fetch_user_orders({user_id})")
    await asyncio.sleep(1.5)  # slightly longer query
    print(f"[{time.perf_counter():.3f}s] FINISH fetch_user_orders({user_id})")
    return [{"order_id": 101, "item": "Widget"}, {\"order_id\": 102, \"item\": \"Gadget\"}]\n\n\nasync def build_user_dashboard(user_id: int) -> None:\n    wall_start = time.perf_counter()\n\n    # asyncio.gather schedules BOTH coroutines as Tasks immediately.\n    # They run concurrently — the loop switches between them at each 'await'.\n    profile, orders = await asyncio.gather(\n        fetch_user_profile(user_id),\n        fetch_user_orders(user_id),\n    )\n\n    wall_elapsed = time.perf_counter() - wall_start\n    print(f\"\\nDashboard ready in {wall_elapsed:.3f}s  (would be 2.5s if sequential)\")\n    print(f\"Profile : {profile}\")\n    print(f\"Orders  : {orders}\")\n\n\nif __name__ == \"__main__\":\n    # asyncio.run() creates a fresh event loop, runs the coroutine to\n    # completion, then closes and cleans up the loop. Use this as your\n    # single entry point — never nest asyncio.run() calls.\n    asyncio.run(build_user_dashboard(user_id=42))",
        "output": "[0.001s] START  fetch_user_profile(42)\n[0.001s] START  fetch_user_orders(42)\n[1.002s] FINISH fetch_user_profile(42)\n[1.502s] FINISH fetch_user_orders(42)\n\nDashboard ready in 1.503s  (would be 2.5s if sequential)\nProfile : {'id': 42, 'name': 'User_42'}\nOrders  : [{'order_id': 101, 'item': 'Widget'}, {'order_id': 102, 'item': 'Gadget'}]"
      }

Event Loop Architecture — Visualising the Lifecycle

The event loop is not a black box. It follows a predictable cycle, and understanding each phase helps diagnose hangs and latency. At its core, the loop continuously runs three phases:

  1. Polling – asks the OS (via epoll/kqueue/select) which file descriptors are ready for reading or writing. This is where the loop blocks when there is truly nothing to do – but it blocks with a timeout so it can wake periodically to run scheduled callbacks.
  2. Running ready callbacks – for each ready FD, the loop invokes the associated callback (e.g., resuming a coroutine that was waiting on a socket). Callbacks are run until the ready list is empty or a maximum number is processed (to avoid starving other tasks).
  3. Scheduling – after callbacks, the loop runs any scheduled callbacks (e.g., from asyncio.sleep or call_later). These are stored in a heap and sorted by deadline. The loop also checks for cancelled tasks and prepares the next poll timeout.

The loop also maintains internal data structures: a ready deque, a heap of scheduled timers, and a mapping of FDs to callbacks. The debug mode exposes these with the PYTHONASYNCIODEBUG=1 environment variable, which logs each phase’s duration.

A common performance win is tuning the loop.slow_callback_duration: set it to something aggressive (e.g., 10ms) in development to catch when a callback takes too long. In production, use structured logging to emit metric-like timestamps for each loop iteration.

Slow Loop Iterations Are a Red Flag
If the loop takes longer than a few milliseconds per iteration under load, you are almost certainly blocking it with synchronous code. The first step is to enable debug mode and look for 'Executing <Task...> took N seconds' messages.
Production Insight
Monitor loop iteration duration as a key health metric. In high-throughput services, the loop should complete each iteration in under 1ms when idle. Anything above 10ms consistently indicates a blocking call–profile with loop.slow_callback_duration=0.01.
Use structured logging to emit loop stats: iteration_time, pending_tasks, active_handles.
Key Takeaway
The event loop iterates through polling, callbacks, and scheduling. Each phase is observable with debug mode. A slow iteration signals a blocking call.
Event Loop Lifecycle
YesNoYesNoEvent loop iteration startsCalculate poll timeout fromscheduled tasksReady IO events?Run all ready callbacks up tolimitRun expired scheduledcallbacksUnhandled task exceptions?Log error and cancel taskSleep until next timer or IONext iteration begins

Tasks, Futures, and Awaitable Contracts — The Object Model Behind await

There are three things you can await in Python: coroutines, Tasks, and Futures. Understanding the difference is critical for writing correct async code.

A coroutine object (what you get when you call an async def function without await) is a lazy generator-like object. Nothing runs until the event loop drives it. If you write fetch_user_profile(42) without awaiting it, Python will create the object and immediately warn you it was never awaited.

A Future is a low-level promise object. It starts in a pending state and transitions to done (with a result or an exception) exactly once. You almost never create Futures manually in application code — they live inside the networking and I/O layers.

A Task wraps a coroutine and schedules it to run on the event loop immediately via asyncio.create_task(). This is the key difference from a bare await: await coroutine() runs that coroutine sequentially from your perspective, while asyncio.create_task(coroutine()) schedules it concurrently and returns a handle you can await later.

The await keyword calls __await__() on the right-hand side object, which must return an iterator. For Tasks and Futures that iterator suspends the current coroutine and resumes it when the Future resolves. This is the same protocol yield from used in Python 3.4 generators — asyncio is built on top of that generator machinery.

tasks_vs_coroutines.pyPYTHON
1
2
3
4
5
import asyncio
import time


async def slow_api_call(endpoint: str
Output
--- Sequential approach ---
Results: Response from /users, Response from /products, Response from /inventory
Time: 3.01s ← 3 seconds: purely sequential
--- Task-based approach ---
Results: Response from /users, Response from /products, Response from /inventory
Time: 1.01s ← ~1 second: all tasks ran concurrently
--- gather approach ---
OK: Response from /users
OK: Response from /products
OK: Response from /inventory
Time: 1.01s
Watch Out: Forgetting create_task Serializes Your 'Concurrent' Code
This is the #1 asyncio mistake in production. If you write result = await my_coroutine() inside a loop, you've written synchronous code with extra steps. The coroutine only becomes concurrent when it's wrapped in a Task via create_task() or gather(). The sequential_approach() example above takes 3x longer — and it's async code. Profile first if async 'isn't helping'.
Production Insight
Bare await inside a loop is synchronous code in disguise. Your async API will perform no better than sync code because each call waits for the previous to finish.
Always compare wall time with and without concurrency to validate.
Rule: Use create_task or gather whenever you have multiple independent I/O operations.
Key Takeaway
Tasks are scheduled immediately and run concurrently.
await only blocks the current coroutine, not the whole loop (unless you await a coroutine directly).
Rule: Never write await coro() inside a loop for independent operations — use gather or TaskGroup.

Cancellation, Timeouts, and Error Handling — The Production Minefield

Happy-path async code is easy. Production async code is defined by how it handles failure. Three scenarios trip up even experienced developers: task cancellation, timeouts, and exception propagation through gather.

Cancellation in asyncio is cooperative, not forcible. When you call task.cancel(), Python injects a CancelledError into the coroutine at its next await point. If the coroutine catches CancelledError and doesn't re-raise it, the cancellation is silently swallowed — a serious bug. Always re-raise CancelledError or use finally blocks for cleanup.

Timeouts are best handled with asyncio.timeout() (Python 3.11+) or asyncio.wait_for() on earlier versions. Both wrap a CancelledError in a TimeoutError so you can distinguish 'took too long' from 'was cancelled by a parent task'.

Exception propagation through gather has a sharp edge: by default, if one task raises, gather cancels the remaining tasks and re-raises the first exception. You lose the results of tasks that succeeded. Pass return_exceptions=True in production so you can inspect every result individually.

asyncio.TaskGroup (Python 3.11+) is now the preferred pattern — it provides structured concurrency where all child tasks are cancelled if any one fails, and all exceptions are surfaced together via an ExceptionGroup.

cancellation_and_timeouts.pyPYTHON
1
2
3
4
5
import asyncio
import time


async def database_query(query_name: str
Output
=== wait_for timeout demo ===
[slow_analytics] starting...
[slow_analytics] cancelled — cleaning up resources
[slow_analytics] finally block ran
Query timed out — returning cached/default data instead
=== TaskGroup structured concurrency demo ===
[user_lookup] starting...
[order_history] starting...
[recommendations] starting...
[recommendations] finally block ran
[user_lookup] finally block ran
[order_history] finally block ran
[user_lookup] result after 0.8s
[order_history] result after 1.0s
[recommendations] result after 0.6s
TaskGroup finished in 1.01s
=== Cancellation propagation demo ===
[child_a] starting...
[child_b] starting...
[child_a] cancelled — cleaning up resources
[child_a] finally block ran
[child_b] cancelled — cleaning up resources
[child_b] finally block ran
Parent task was cancelled — both children were also cancelled
Never Silence CancelledError — It Breaks Shutdown
asyncio uses CancelledError to shut down cleanly when your app receives SIGTERM. If a coroutine catches CancelledError and doesn't re-raise it, your app will hang on shutdown waiting for a task that thinks it's still running. This is a production incident waiting to happen. The rule: always re-raise CancelledError, or let it propagate through a finally block.
Production Insight
A swallowed CancelledError causes the application to hang on shutdown. The process won't exit until the watchdog kills it.
Use TaskGroup for structured concurrency — it ensures all children are cancelled when one fails.
Rule: always re-raise CancelledError in except blocks.
Key Takeaway
Cancellation is cooperative — inject CancelledError at next await.
Timeout wraps CancelledError in TimeoutError.
Rule: Use TaskGroup for robust group cancellation (Python 3.11+).

Blocking the Event Loop — How to Detect It and What to Do Instead

Blocking the event loop is the cardinal sin of async Python and the most common source of 'asyncio isn't faster than sync code' complaints. Any call that holds the thread without yielding — a requests.get(), a time.sleep(), a CPU-heavy loop, even a naively-called json.loads() on a 50MB payload — freezes every other coroutine in your application.

The asyncio event loop has a built-in slow-callback detector: set loop.slow_callback_duration to a low threshold (e.g. 50ms) and enable debug mode. Python will log a warning whenever a callback holds the loop longer than that threshold. This is invaluable in production profiling.

For blocking I/O you can't make async (a synchronous DB driver, a legacy library), use loop.run_in_executor() to offload work to a thread pool. For CPU-bound work, use ProcessPoolExecutor — threads won't help here because of the GIL.

asyncio.to_thread() (Python 3.9+) is a clean shorthand for run_in_executor with the default thread pool. It's idiomatic for wrapping synchronous file I/O, synchronous HTTP calls, or any legacy synchronous function you can't replace yet.

The mental model: the event loop is the single thread. Think of it as a very important person's personal assistant. Every synchronous call is a task that physically occupies the assistant. Every await is handing a task to someone else while the assistant handles the next thing.

blocking_vs_nonblocking.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import asyncio
import time
import httpx          # async HTTP client — pip install httpx
import requests       # synchronous HTTP client — pip install requests
from concurrent.futures import ThreadPoolExecutor


# ---------------------------------------------------------------------------
# Simulating CPU-heavy work (e.g. image processing, parsing, encryption)
# ---------------------------------------------------------------------------

def cpu_bound_resize(image_id: int) -> str:
    """Pretend this is PIL image processing — pure Python, GIL-bound."""
    # Simulate work with a tight loop — this BLOCKS the event loop if called
    # directly from a coroutine.
    result = sum(i * i for i in range(500_000))  # ~50ms on typical hardware
    return f"image_{image_id}_resized (checksum={result % 9999})"


async def bad_blocking_example() -> None:
    """This coroutine BLOCKS the event loop for ~50ms per call."""
    print("[BAD]  Calling CPU-bound work directly in a coroutine...")
    start = time.perf_counter()

    # These run SEQUENTIALLY and BLOCK the loop — no other coroutine can run.
    results = [cpu_bound_resize(img_id) for img_id in range(5)]

    elapsed = time.perf_counter() - start
    print(f"[BAD]  Done in {elapsed:.3f}s — event loop was blocked the whole time")


async def good_thread_executor_example() -> None:
    """Offload blocking work to a thread pool — loop stays free."""
    print("[GOOD] Offloading CPU-bound work to thread pool...")
    loop = asyncio.get_running_loop()
    start = time.perf_counter()

    # run_in_executor submits work to a ThreadPoolExecutor.
    # The event loop is FREE while the thread runs — other coroutines proceed.
    with ThreadPoolExecutor(max_workers=4) as pool:
        tasks = [
            loop.run_in_executor(pool, cpu_bound_resize, img_id)
            for img_id in range(5)
        ]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"[GOOD] Done in {elapsed:.3f}s — ran 4 tasks in parallel via threads")
    for r in results:
        print(f"  {r}")


async def good_asyncio_to_thread_example() -> None:
    """asyncio.to_thread (Python 3.9+) — cleaner syntax for the same pattern."""
    print("\n[BEST] Using asyncio.to_thread for legacy sync functions...")
    start = time.perf_counter()

    # to_thread wraps the sync function in the default executor automatically.
    # This is the idiomatic way to call synchronous code from async code.
    tasks = [
        asyncio.to_thread(cpu_bound_resize, img_id)
        for img_id in range(5)
    ]
    results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"[BEST] Done in {elapsed:.3f}s")
    for r in results:
        print(f"  {r}")


async def demonstrate_slow_callback_detection() -> None:
    """Enable asyncio debug mode to catch blocking calls automatically."""
    loop = asyncio.get_running_loop()
    loop.slow_callback_duration = 0.05  # warn if any callback takes > 50ms
    # In production: set PYTHONASYNCIODEBUG=1 environment variable OR
    # asyncio.run(main(), debug=True)
    # Python will log: 'Executing <Task ...> took 0.123 seconds'
    print("Slow callback detection threshold set to 50ms")


if __name__ == "__main__":
    asyncio.run(bad_blocking_example())
    asyncio.run(good_thread_executor_example())
    asyncio.run(good_asyncio_to_thread_example())
    asyncio.run(demonstrate_slow_callback_detection())
Output
[BAD] Calling CPU-bound work directly in a coroutine...
[BAD] Done in 0.284s — event loop was blocked the whole time
[GOOD] Offloading CPU-bound work to thread pool...
[GOOD] Done in 0.089s — ran 4 tasks in parallel via threads
image_0_resized (checksum=8764)
image_1_resized (checksum=8764)
image_2_resized (checksum=8764)
image_3_resized (checksum=8764)
image_4_resized (checksum=8764)
[BEST] Using asyncio.to_thread for legacy sync functions...
[BEST] Done in 0.091s
image_0_resized (checksum=8764)
image_1_resized (checksum=8764)
image_2_resized (checksum=8764)
image_3_resized (checksum=8764)
image_4_resized (checksum=8764)
Slow callback detection threshold set to 50ms
Pro Tip: Run asyncio.run(main(), debug=True) During Development
Debug mode activates the slow-callback detector, enables extra logging for unawaited coroutines, and makes asyncio raise errors on many common mistakes that are silent in production mode. Set the PYTHONASYNCIODEBUG=1 environment variable in your dev Docker container. Your future self will thank you when you catch a blocking DB call before it reaches prod.
Production Insight
A single blocking call (e.g. time.sleep(0.5)) can degrade throughput from 10k req/s to below 100 req/s because every request gets serialised.
Use asyncio.to_thread() for legacy sync code; use ProcessPoolExecutor for CPU-bound tasks.
Rule: Profile with debug mode to detect blocking calls early.
Key Takeaway
Any call that holds the thread without yielding blocks all tasks.
Use loop.slow_callback_duration to catch it automatically.
Rule: Offload CPU work to a process pool; offload sync I/O to a thread pool.

Handling Blocking Code — A Strategy Guide for run_in_executor and to_thread

When you inevitably encounter a blocking call inside a coroutine, you have three options: replace it with an async alternative, offload it to a thread pool, or offload it to a process pool. The right choice depends on the nature of the work.

Decision Strategy:

  1. Is there an async-native library? Use it directly (e.g., httpx.AsyncClient instead of requests, aiofiles instead of file I/O). This is the fastest path — zero context switching overhead.
  2. Is the work I/O-bound but synchronous? (e.g., legacy database driver, filesystem with open()). Use asyncio.to_thread() (Python 3.9+) or loop.run_in_executor(None, func, ...). Both submit the call to the default ThreadPoolExecutor, freeing the event loop. The default thread pool has min(32, os.cpu_count() + 4) workers, which is sufficient for most I/O workloads. Tune max_workers via a custom executor if you see thread starvation (e.g., many slow synchronous calls).
  3. Is the work CPU-bound? (e.g., parsing JSON, image processing, cryptography). Use ProcessPoolExecutor via loop.run_in_executor(pool, func, ...). Threads won’t help because of the GIL. A process pool gives true parallelism. Be mindful of the overhead: each call pickles the function and arguments, so only use it for work that takes at least several hundred milliseconds.
  4. Is the work a blocking C extension that releases the GIL? (e.g., some numpy routines). You can use a thread pool — the GIL is released during the operation, so threads provide parallelism. Profile to confirm.
ScenarioRecommended ApproachPitfall to Avoid
Synchronous HTTP callhttpx.AsyncClient or aiohttpUsing requests directly → blocks loop
Synchronous file readaiofiles or asyncio.to_thread(open)Blocking the loop for large files
CPU-heavy pure PythonProcessPoolExecutorUsing threads → still GIL-bound
Legacy sync library (I/O)asyncio.to_thread()Forgetting to await the future
Multiple independent blocking callsasyncio.gather with to_threadSequential offloading (defeats concurrency)

Always wrap blocking calls with a timeout using asyncio.wait_for on the offloaded future, so you can detect when the executor worker is stuck. This prevents your whole application from hanging on a blocked worker.

blocking_strategy_examples.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def cpu_heavy_sort(n: int) -> list:
    """CPU-bound: sorts a large list. Should use ProcessPoolExecutor."""
    import random
    data = [random.random() for _ in range(n)]
    data.sort()
    return data[:10]  # first 10 sorted values


def sync_file_read(path: str) -> str:
    """I/O-bound: reads a file. Use to_thread."""
    with open(path) as f:
        return f.read()


async def handle_blocking_code_example():
    loop = asyncio.get_running_loop()

    # Strategy 1: CPU-bound → ProcessPoolExecutor
    with ProcessPoolExecutor(max_workers=2) as pool:
        tasks = [
            loop.run_in_executor(pool, cpu_heavy_sort, 1_000_000),
            loop.run_in_executor(pool, cpu_heavy_sort, 2_000_000),
        ]
        results = await asyncio.gather(*tasks)
        print(f"CPU results: {results}")

    # Strategy 2: I/O-bound sync → to_thread
    file_results = await asyncio.gather(
        asyncio.to_thread(sync_file_read, "/etc/hostname"),
        asyncio.to_thread(sync_file_read, "/etc/resolv.conf"),
    )
    print(f"File results: {[r[:50] for r in file_results]}")

    # Strategy 3: Use a custom ThreadPoolExecutor for heavy I/O
    with ThreadPoolExecutor(max_workers=10) as custom_pool:
        heavy_io_tasks = [
            loop.run_in_executor(custom_pool, sync_file_read, "/var/log/syslog"),
            loop.run_in_executor(custom_pool, sync_file_read, "/var/log/dmesg"),
        ]
        # Add a timeout to prevent hang
        try:
            syslog, dmesg = await asyncio.wait_for(
                asyncio.gather(*heavy_io_tasks), timeout=5.0
            )
            print("Read heavy files successfully")
        except asyncio.TimeoutError:
            print("Timed out reading heavy files")


asyncio.run(handle_blocking_code_example())
Output
CPU results: [[0.0001, 0.0002, ...], [0.0001, 0.0002, ...]]
File results: ['myhost', 'nameserver 127.0.0.53']
Read heavy files successfully
Thread Pool Size Tuning for I/O
The default ThreadPoolExecutor uses min(32, os.cpu_count() + 4) workers. This is usually fine for short I/O calls. If you have many long-running synchronous calls (e.g., a DB driver that blocks for seconds), increase the pool size to avoid starvation. Monitor the number of active threads and adjust. For CPU-bound work, stick to ProcessPoolExecutor – the GIL limits threads even if you have many cores.
Production Insight
Always wrap offloaded blocking calls with a timeout to prevent hanging the entire application. The executor itself can become a bottleneck if workers are all busy on long-running tasks — use separate executors for different workloads (e.g., one for I/O, one for CPU).
Monitor executor queue depth as a health metric: if tasks pile up, you either need more workers or to replace the sync library with an async one.
Rule: Prefer asyncio.to_thread() for simple I/O offload; use loop.run_in_executor with a custom pool for advanced control.
Key Takeaway
Offload blocking code to threads for I/O, processes for CPU. Always set timeouts on offloaded work. Tune pool sizes based on workload duration.

Production Patterns: Structured Concurrency with TaskGroup vs gather

asyncio.gather() and asyncio.TaskGroup both run multiple tasks concurrently, but they differ fundamentally in failure handling. gather() by default cancels all tasks if one raises, and it swallows the CancelledError that was injected into surviving tasks — you'll never see them. TaskGroup (Python 3.11+) provides structured concurrency: every task is a child of the group, and if any one fails, all children are cancelled. When all finish, exceptions are merged into an ExceptionGroup.

The critical distinction: with gather(return_exceptions=False), you get the first exception and the rest are cancelled silently. With gather(return_exceptions=True), you get a list with mixed results and exceptions, but the tasks that finished after the first failure are already cancelled by the time you inspect the results. TaskGroup avoids this surprise: you commit to either all succeed or all are cancelled, and you handle exceptions after the context manager exits.

In high-throughput services, prefer gather with return_exceptions=True when you want to preserve successful results despite some failures — for example, fetching data from multiple caches where one miss is acceptable. Use TaskGroup when you want atomicity: if any part of the workflow fails, the whole operation should be abandoned.

Another pattern: asyncio.wait() gives fine-grained control over FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED. It's useful for race patterns (e.g., fetch from primary, fallback to secondary on timeout).

gather_vs_taskgroup.pyPYTHON
1
2
3
4
import asyncio


async def fetch_cache(key: str
Output
Caught early: cache miss for b
Result: cached_value_a
Task failed: cache miss for b
Result: cached_value_c
TaskGroup raised ExceptionGroup with 1 exceptions
Winner: cached_value_fallback (or exception from primary)
gather vs TaskGroup vs wait
  • gather(return_exceptions=False): first failure cancels all siblings and re-raises immediately. Use when a single failure should abort the whole batch.
  • gather(return_exceptions=True): no cancellation on failure; you get a list of mixed results/exceptions. Use for fault-tolerant batches (e.g. multiple cache lookups).
  • TaskGroup: if any child fails, all siblings are cancelled. Exceptions collected in ExceptionGroup after all children finish. Use for atomic workflows.
  • wait(FIRST_COMPLETED): get the first task that finishes, useful for race patterns (primary/fallback) or timeout as a task.
Production Insight
In production, gather with return_exceptions=False can cause the system to lose already-completed results when a sibling fails. If you're collecting non-critical metrics from multiple sources, a single failure shouldn't discard all data.
Use TaskGroup when you need transactional semantics: if one leg fails, stop the others.
Rule: default to return_exceptions=True in gather unless you explicitly need abort-on-error.
Key Takeaway
gather cancels siblings on exception; TaskGroup does too but catches all exceptions together.
Know the difference: atomic vs fault-tolerant.
Rule: When in doubt, use gather with return_exceptions=True or switch to TaskGroup.
Choosing the Right Concurrency Primitive
IfAll tasks must succeed or none should; atomic workflow
UseUse asyncio.TaskGroup (3.11+) or gather with return_exceptions=False and handle the exception
IfSome failures okay; want to keep partial results
UseUse asyncio.gather with return_exceptions=True and inspect each result
IfNeed first available result (race condition, fallback)
UseUse asyncio.wait with FIRST_COMPLETED or asyncio.as_completed
IfNeed to run tasks in background without awaiting all at once
UseUse asyncio.create_task for each, store references, await later

Asyncio Batching Strategies — gather() vs TaskGroup vs as_completed

Choosing the right batching primitive is critical for both performance and correctness. Below is a comparison of the three main approaches. Use this as a quick reference when designing production async workflows.

Featureasyncio.gather()asyncio.TaskGroupasyncio.as_completed()
Python version3.5+3.11+3.5+
Concurrency typeWraps coroutines in Tasks implicitlyStructured concurrency with explicit task creationIterate over futures as they complete
Failure behavior (default)First exception cancels siblings; raises immediatelyAll children cancelled; all exceptions collected in ExceptionGroupEach future yields result or exception as it completes; no cancellation of others
Partial results on failureLost (siblings cancelled)Lost (siblings cancelled)Preserved (each future independent)
Works with existing tasks?Yes, pass list of awaitablesOnly via tg.create_task()Yes, iterate over any iterable of futures
Memory overheadLow (internal list of futures)Low (tracked by context manager)Low (one future at a time)
Use caseFire-and-wait, when you need a single list of resultsAtomic workflows, where all-or-nothing semantics are requiredStreaming results, e.g., making many HTTP requests and processing each as soon as it arrives
Error handling patternWrap with try/except or use return_exceptions=TrueUse except* to handle multiple exception typesHandle each result as it comes; use asyncio.wait() for fine-grained control

When to use each:

  • gather: Best for most cases where you have a fixed set of I/O operations and want to collect all results at once. Use return_exceptions=True for fault tolerance.
  • TaskGroup: Use when you need structured concurrency and atomicity — if one task fails, you want the entire group to abort cleanly. Excellent for complex workflows where cleanup matters.
  • as_completed: Use when you want to process results as soon as they arrive, rather than waiting for the slowest task. Useful for progressive UI updates or streaming pipelines.

All three can be combined: for example, use TaskGroup for a batch of related operations, and inside a task use gather to collect sub-results. The key is to match the semantic to your failure tolerance.

When to Use as_completed() Over gather()
If you are making 100 HTTP requests and each response takes between 200ms and 2s, gather will return only when the slowest request completes—meaning you wait 2s before processing the first result. as_completed gives you each result as soon as it finishes, allowing you to start processing faster, especially useful for real-time dashboards or progressive data loading.
Production Insight
In high-concurrency systems, as_completed can reduce time-to-first-byte for end users. However, it requires managing each future individually, which can increase code complexity. gather with return_exceptions=True remains the workhorse for most batch operations.
If using as_completed, always iterate over a list of create_task results to ensure all tasks start concurrently — iterating directly over coroutines will serialize them.
Rule: Use gather for simplicity, TaskGroup for atomicity, and as_completed for streaming.
Key Takeaway
Choose batching primitive based on failure semantics and result delivery order: gather for batch collect, TaskGroup for atomic, as_completed for streaming.

The async/await Keyword Contract — What Really Happens Under the Hood

Stop treating async def as magic. It's a compiler transformation that rewrites your function into a state machine. When you call an async function, Python doesn't execute a single line of your code. It returns a coroutine object — a frozen generator that holds your function's frame and local variables.

The await keyword is the yield point. It suspends execution by returning control to the event loop, passing a Future or awaitable that signals when the coroutine should resume. The event loop tracks these awaitables in its run queue. When the underlying I/O completes or a timer fires, the loop calls .send(None) on your coroutine, resuming exactly where it paused.

This is not threads. There's no preemption. Your coroutine runs until it hits an await that blocks, then it parks itself. That's why blocking inside a coroutine kills concurrency — the loop can't switch to another task until your coroutine voluntarily yields.

CoroutineContract.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// io.thecodeforge — python tutorial

import asyncio
import inspect

async def fetch_data(url):
    print(f"Fetching {url}")
    await asyncio.sleep(0.5)  # yield point
    print(f"Done {url}")
    return f"data from {url}"

async def main():
    # Calling async function returns a coroutine object
    coro = fetch_data("https://api.billing.io")
    print(f"Type: {type(coro)}")
    print(f"Is coroutine: {asyncio.iscoroutine(coro)}")
    print(f"Frame locals before await: {coro.cr_frame.f_locals}")
    
    # Only here does execution begin
    result = await coro
    print(f"Result: {result}")

asyncio.run(main())
Output
Type: <class 'coroutine'>
Is coroutine: True
Frame locals before await: {'url': 'https://api.billing.io'}
Fetching https://api.billing.io
Done https://api.billing.io
Result: data from https://api.billing.io
Production Trap: Premature Execution
Never pass a coroutine to a function that expects an awaitable without awaiting it. Forgetting await returns the coroutine object, not the result — and silently drops the exception. The linter won't catch this in complex chaining.
Key Takeaway
async def compiles to a state machine. await is the yield point that surrenders control to the event loop — no preemption, only voluntary suspension.

Coroutine Chaining — Why Your Pipeline Must Be an Awaitable Stack

Real async applications chain coroutines. Your entry point awaits a fetch_payment(), which awaits validate_card(), which awaits authorize_gateway(). Each await peels back one layer of the call stack. This is not free — every suspension and resume costs a microsecond of loop overhead.

The performance trap: if you chain too many shallow coroutines that just delegate to the next, you create a cascading series of mini-suspensions. The event loop switches between runnable tasks, but each chain switch requires a context switch in the C implementation. Benchmark with asyncio.get_running_loop().slow_callback_duration to detect when your chain latency exceeds 100ms.

Design rule: flatten your chains. Combine multiple small async operations into one coroutine if they share I/O context. Use asyncio.gather() only when tasks are genuinely independent — false parallelism through chains that serialize on the same resource is slower than synchronous code.

ChainedPipeline.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// io.thecodeforge — python tutorial

import asyncio
import time

async def validate_payment(payment_id):
    await asyncio.sleep(0.1)  # simulate DB lookup
    return {"id": payment_id, "valid": True}

async def process_refund(payment):
    if not payment["valid"]:
        raise ValueError("Invalid payment")
    await asyncio.sleep(0.2)  # simulate API call
    return {"refund_id": f"ref_{payment['id']}"}

async def main():
    start = time.monotonic()
    
    # Chained awaits — each suspension adds overhead
    payment = await validate_payment(42)
    refund = await process_refund(payment)
    
    elapsed = time.monotonic() - start
    print(f"Chained: {elapsed:.3f}s — refund_id: {refund['refund_id']}")

asyncio.run(main())
Output
Chained: 0.304s — refund_id: ref_42
Senior Shortcut: Flatten With asyncio.TaskGroup
Use TaskGroup for parallel I/O with structured error handling. It's not a magic bullet — for CPU-bound work inside coroutines, you still need run_in_executor. Reserve gather() for fire-and-forget fan-out where you don't need cancellation propagation.
Key Takeaway
Chain coroutines vertically only when they share dependency state. For independent I/O, use TaskGroup or gather — every sequential await costs visible latency at scale.

The asyncio Event Loop — Multitasking Without a Scheduler (and Why It Matters)

The event loop is not a scheduler. There's no preemptive time-slicing. It's a select()-style reactor loop that polls registered file descriptors and callbacks. When you await asyncio.sleep(5), the loop registers a timer callback in its internal heap. When the timer fires, the loop calls .send() on your coroutine to resume it.

The critical difference from threads: the loop can only run one task at a time. If you have 10,000 concurrent I/O operations, the loop cycles through them, checking readiness. No context switch overhead. No GIL contention. But the moment any single coroutine does CPU work without yielding — parsing a JSON response, hashing a password — the entire loop freezes. All 10,000 connections stall.

Design your tasks to yield frequently. For CPU work, use loop.run_in_executor(None, cpu_bound_func). The default executor uses a ThreadPoolExecutor with min(32, os.cpu_count() + 4) workers. Tune that number based on your I/O vs CPU ratio. Blocking for more than 50ms in a single coroutine is a production incident waiting to happen.

EventLoopYielding.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// io.thecodeforge — python tutorial

import asyncio
import time

def cpu_intensive_task(n):
    # Simulates parsing a large payload
    total = 0
    for i in range(n):
        total += i ** 2
    return total

async def good_coroutine(task_id, n):
    # Yields to loop; allows other tasks to run
    start = time.monotonic()
    result = await asyncio.get_event_loop().run_in_executor(
        None, cpu_intensive_task, n
    )
    elapsed = time.monotonic() - start
    print(f"Task {task_id}: {elapsed:.3f}s — result {result}")

async def bad_coroutine(task_id, n):
    # Blocks the loop — no other task runs
    start = time.monotonic()
    result = cpu_intensive_task(n)
    elapsed = time.monotonic() - start
    print(f"Task {task_id}: {elapsed:.3f}s — blocked loop")

async def main():
    tasks = [good_coroutine(1, 10_000_000), good_coroutine(2, 10_000_000)]
    await asyncio.gather(*tasks)

asyncio.run(main())
Output
Task 1: 0.042s — result 333333283333335000000
Task 2: 0.044s — result 333333283333335000000
Production Trap: Blocking the Loop Silently
Key Takeaway
The event loop is a reactor, not a scheduler. Any coroutine that blocks for >50ms without yielding is a loop-level Denial of Service — use run_in_executor for CPU work.

Rate Limiting with Semaphores — Controlling Concurrency in Production

Without rate limiting, async tasks can overwhelm external APIs, databases, or rate-limited services. asyncio.Semaphore caps the number of concurrent coroutines executing a critical section. Create a semaphore with the max concurrency count, then await it before each protected operation. The semaphore blocks when full, releasing a slot after the context exits. This prevents 429 errors and kernel resource exhaustion. Use it with TaskGroup or gather to batch parallel work. Always set an upper bound based on the downstream service limits, never guess. Combine with exponential backoff for retries to build resilient pipelines.

RateLimiter.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// io.thecodeforge — python tutorial

import asyncio

async def fetch_url(sem: asyncio.Semaphore, url: str):
    async with sem:
        print(f"Fetching {url}")
        await asyncio.sleep(1)  # simulate I/O
        return url

async def main():
    sem = asyncio.Semaphore(3)  # max 3 concurrent
    tasks = [fetch_url(sem, f"https://api.example/{i}") for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"Completed: {len(results)}")

asyncio.run(main())
Output
Fetching https://api.example/0
Fetching https://api.example/1
Fetching https://api.example/2
... (completing 3 at a time)
Completed: 10
Production Trap:
Never recreate a Semaphore per request—it must be shared across tasks to throttle globally. Leaking semaphore contexts causes deadlocks.
Key Takeaway
Always share one Semaphore instance among all concurrent tasks to enforce a global concurrency cap.

Retry Logic with Exponential Backoff — Resilient Async Calls

Network calls fail. A retry strategy with exponential backoff prevents cascading failures and respects server recovery time. The classic pattern: on failure, wait 2^attempt seconds (plus jitter), then retry up to a max retry count. In asyncio, wrap the call in a loop: try the operation, catch expected exceptions, sleep asynchronously with asyncio.sleep (never time.sleep—that blocks the event loop). Add random jitter to avoid thundering herd. Use a timeout per attempt to avoid hanging. Integrate with semaphore rate limiting for production-grade reliability.

RetryBackoff.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// io.thecodeforge — python tutorial

import asyncio
import random

async def fetch_with_retry(url: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            # simulate flaky fetch
            if random.random() < 0.7:
                raise ConnectionError("timeout")
            return f"OK: {url}"
        except ConnectionError as e:
            if attempt == max_retries - 1:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retry {attempt+1} in {wait:.2f}s")
            await asyncio.sleep(wait)

async def main():
    result = await fetch_with_retry("https://api.example/data")
    print(result)

asyncio.run(main())
Output
Retry 1 in 1.23s
Retry 2 in 2.84s
OK: https://api.example/data
Production Trap:
Mixing asyncio.sleep with synchronous time.sleep in a coroutine freezes the event loop—always use await asyncio.sleep.
Key Takeaway
Use exponential backoff with jitter and async sleep to avoid retry storms; always set a max retry count to prevent indefinite looping.

Async Iterators and Async Comprehensions — Streaming Data Efficiently

Async iterators let you consume data as it arrives, not all at once. Define an __aiter__ returning self and __anext__ raising StopAsyncIteration when done. Use async for in loops to process chunks—ideal for streaming API responses or file lines. Async comprehensions (async for inside list/dict/set creation) build collections from async sources in one expression. Both require an async context (inside an async def). They avoid buffering entire datasets, reducing memory pressure. Combine with async generators (yield in async def) for clean, lazy pipelines.

AsyncIterator.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// io.thecodeforge — python tutorial

import asyncio

class AsyncCounter:
    def __init__(self, limit: int):
        self.limit = limit
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # simulate I/O
        self.current += 1
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(f"Got: {num}")
    squares = [x async for x in AsyncCounter(3)]
    print(f"Squares: {squares}")

asyncio.run(main())
Output
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Squares: [1, 2, 3]
Production Trap:
Synchronous iterators passed to async for won't work—you must provide an async iterable. Verify it has __aiter__ and __anext__.
Key Takeaway
Use async for and async comprehensions for memory-efficient streaming; always implement __aiter__ and __anext__ for custom async iterables.

Async I/O Isn’t Simple

Many developers treat async I/O as a magic performance switch: just add async/await and everything speeds up. The reality is far more subtle. Async Python runs all coroutines on a single thread, interleaving them via cooperative yielding. If any coroutine blocks — even for 50ms — the entire event loop stalls. Unlike threading, the OS won’t preempt a misbehaving task. The cognitive load skyrockets when debugging race conditions in shared state because asyncio gives you no locks for free. Mixing synchronous libraries (requests, time.sleep) with async code silently serializes your program. Profiling async bottlenecks requires specialized tools like asyncio.Task.cancel() or loop.slow_callback_duration. The paradox: async I/O simplifies throughput for I/O-bound problems but adds complexity for every other concern — error propagation, cancellation, and resource cleanup. Before adopting asyncio, ask whether your bottleneck is genuinely I/O latency, not CPU work or developer productivity.

async_illusion.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// io.thecodeforge — python tutorial
import asyncio

def sync_block():
    import time
    time.sleep(0.1)  # blocks event loop

async def bad():
    loop = asyncio.get_running_loop()
    print(loop.time())
    sync_block()  # Never do this — stalls all tasks
    print(loop.time())

asyncio.run(bad())
Output
10000.0
10000.1
Production Trap:
A single blocking call (time.sleep, requests.get) can freeze hundreds of concurrent coroutines. Use asyncio.to_thread() or loop.run_in_executor() to offload synchronously blocking work.
Key Takeaway
Async concurrency ≠ parallelism; one blocking call freezes all tasks.

Libraries Supporting Async I/O

The async ecosystem is fragmented but essential for production systems. For HTTP, aiohttp and httpx (with async support) replace requests. Databases: asyncpg (PostgreSQL) and aiosqlite (SQLite) give true non-blocking queries. Redis: aioredis (merged into redis-py as redis.asyncio). For task queues, use aio-pika (RabbitMQ) or nats-py. If you need HTTP servers, aiohttp or FastAPI with uvicorn’s async workers provide high throughput. The tricky part is that many popular libraries (Django ORM, SQLAlchemy 1.x, boto3) lack native async support, forcing you to run them in executors. Newer tools like SQLAlchemy 2.0 async, beanie (MongoDB ODM), and motor (MongoDB driver) fill gaps. Always verify the library’s async support by checking for async def methods or look for an async submodule. Pro tip: use anyio’s backends to write library-agnostic async code that works with both asyncio and trio, future-proofing your stack against event loop lock-in.

async_http.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// io.thecodeforge — python tutorial
import asyncio
import httpx

async def fetch(url: str) -> str:
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)
        return resp.text

async def main():
    data = await fetch("https://httpbin.org/get")
    print(data[:50])

asyncio.run(main())
Output
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"User-Agent": "python-httpx/0.27.0",
"X-Amzn-Trace-Id": "Root=1-abc"
}
}
Ecosystem Trap:
Not all async libraries are equal — some claim async but still use blocking I/O internally. Always inspect the source or benchmark with asyncio.run() before trusting.
Key Takeaway
Audit libraries for true async support; use anyio for portability.

asyncio.sleep() vs time.sleep() — The One That Kills Your Event Loop

You call time.sleep(2) inside an async function thinking you're just pausing. What you've actually done is freeze the entire event loop — all your other coroutines, their network calls, your semaphores, everything — stops dead for 2 seconds. time.sleep() is a blocking system call that suspends the whole Python thread. The event loop can't switch to another task because it never gets a chance to run. asyncio.sleep() yields control back to the event loop, which then schedules other pending coroutines during that 2-second pause. The fundamental distinction: blocking vs yielding. One starves the loop, the other feeds it. Never import time.sleep in async code unless you enjoy watching your concurrent system collapse into sequential misery.

sleep_showdown.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// io.thecodeforge — python tutorial

import asyncio
import time

async def block_the_loop():
    print("Blocking starts")
    time.sleep(2)  # Freezes EVERYTHING
    print("Blocking done")

async def async_good():
    print("Async sleep starts")
    await asyncio.sleep(2)  # Yields to loop
    print("Async sleep done")

async def main():
    await asyncio.gather(block_the_loop(), async_good())

asyncio.run(main())
Output
Blocking starts
Async sleep starts
(2 second freeze)
Blocking done
Async sleep done
Production Trap:
Third-party sync libraries often sneak in time.sleep() via retry loops. Always wrap sync calls in loop.run_in_executor() to avoid accidentally nuking your event loop's throughput.
Key Takeaway
time.sleep() blocks the thread, asyncio.sleep() yields to the loop — use the right one or watch your concurrency die.

Eager Task Factory — Fire Coroutines Now, Schedule Later

You write task = asyncio.create_task(my_coro()) and think it starts immediately. It doesn't — the coroutine gets scheduled on the event loop's run queue and won't execute until the current task yields control via an await. That's lazy scheduling overhead. When you need a coroutine to begin execution right now, not after you finish your current await, you use the eager task factory: asyncio.Task(my_coro(), eager_start=True). This creates the task and runs it until its first suspension point before returning. Perfect for kicking off background work like health checks or cache warming that must start immediately while your main coroutine continues. The factory bypasses the queue and executes directly. Use it when latency of first execution matters. Overusing it on trivial coroutines adds unnecessary stack overhead — reserve for operations where starting instantly beats waiting for the next event loop iteration.

eager_factory.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// io.thecodeforge — python tutorial

import asyncio

async def speed_demon():
    print("I started immediately!")
    await asyncio.sleep(0)
    return 42

async def slow_poke():
    await asyncio.sleep(0.1)
    print("I had to wait for the loop")

async def main():
    asyncio.get_event_loop().set_task_factory(
        asyncio.eager_task_factory
    )
    eager = asyncio.Task(speed_demon(), eager_start=True)
    lazy = asyncio.create_task(slow_poke())
    print("Main continues...")
    await asyncio.sleep(0.2)
    print(f"Eager result: {eager.result()}")

asyncio.run(main())
Output
I started immediately!
Main continues...
I had to wait for the loop
Eager result: 42
Production Trap:
Eager tasks still execute on the event loop thread — they don't spawn new threads. Use them only when you need immediate execution within the same event loop iteration, not for true parallelism.
Key Takeaway
Eager task factory runs your coroutine until first suspension before returning — no queue wait, but no thread escape either.
● Production incidentPOST-MORTEMseverity: high

The Silent Shutdown Hang: A CancelledError Swallowed in Production

Symptom
Application takes >30 seconds to respond to SIGTERM. Kubernetes pod termination hangs. Clients see 'connection refused' for several seconds before the process finally exits.
Assumption
We assumed asyncio.run() handles cancellation automatically. We thought we didn't need to re-raise CancelledError because we caught all exceptions in the top-level coroutine.
Root cause
A deeply nested coroutine caught CancelledError in a broad except Exception: block and did not re-raise it. The parent task never completed its cleanup because it was waiting on a child that was effectively defying cancellation. The event loop kept the process alive until the watchdog killed it.
Fix
Changed all except blocks inside coroutines to either not catch CancelledError at all or to re-raise it immediately using raise. We also enabled debug mode (PYTHONASYNCIODEBUG=1) to log pending tasks during shutdown.
Key lesson
  • CancelledError inherits from BaseException, not Exception — a bare except Exception: will never catch it, but except: or except Exception as e with a wrong hierarchy can. Always re-raise CancelledError.
  • Use asyncio.TaskGroup or asyncio.gather() with return_exceptions=True to minimise the risk of silent cancellation swallowing.
  • Enable slow-callback detection and debug mode during dev and staging to catch hanging tasks before they reach production.
Production debug guideSymptom-based guide to diagnosing the most common asyncio failures4 entries
Symptom · 01
Application hangs on shutdown after SIGTERM
Fix
Set PYTHONASYNCIODEBUG=1 and look for messages like 'Task was destroyed but it is pending!'. Also check for coroutines that catch CancelledError without re-raising. Use asyncio.all_tasks(loop) to list all pending tasks.
Symptom · 02
Coroutines run sequentially despite using gather
Fix
Verify you're not awaiting the coroutines one by one. Use asyncio.gather(*coros) or asyncio.create_task() for each. Check for accidental blocking calls (e.g. time.sleep instead of asyncio.sleep). Enable debug mode and check slow-callback warnings.
Symptom · 03
Unexpected timeouts, but tasks complete later
Fix
If you use asyncio.wait_for() and a timeout occurs, the inner coroutine gets a CancelledError. Ensure the inner coroutine re-raises and doesn't hold resources. Use asyncio.to_thread() for any synchronous I/O that might have caused a real delay.
Symptom · 04
asyncio.run() raises 'Event loop is closed'
Fix
Do not nest asyncio.run() calls. If you need to run async code inside a synchronous context (e.g. pytest), use asyncio.run() only once at the entry point. For testing, use pytest-asyncio with the @pytest.mark.asyncio decorator.
★ Quick Debug Cheat Sheet for asyncioFix common asyncio production issues fast with these commands and checks.
All tasks appear frozen / nothing runs
Immediate action
Enable debug mode and check for blocking calls
Commands
PYTHONASYNCIODEBUG=1 python app.py
Check logs for 'Executing <Task ...> took N seconds'
Fix now
Replace blocking functions (time.sleep, requests.get) with async alternatives (asyncio.sleep, httpx.AsyncClient) or offload them with asyncio.to_thread().
Unclosed coroutine warning on exit+
Immediate action
Ensure all async context managers are used with `async with`
Commands
grep -rn 'client\.get\|session\.get' src/ | grep -v 'async with'
Check for missing `await` before async function calls (e.g. `asyncio.gather(...)` without await)
Fix now
Add await to the call or wrap in a Task. For background fire-and-forget, use asyncio.create_task() and keep a reference until completion.
Exception not propagated from gather()+
Immediate action
Add return_exceptions=True to gather and inspect results
Commands
results = await asyncio.gather(*tasks, return_exceptions=True) for r in results: if isinstance(r, Exception): handle_error(r)
Use TaskGroup for structured concurrency (Python 3.11+)
Fix now
If using gather without return_exceptions, one failing task cancels all others. Switch to TaskGroup or add the parameter.
App is taking too long on startup / first request+
Immediate action
Check if you're accidentally running sync code inside event loop startup
Commands
Run with DEBUG=1 and look for slow callbacks during startup events
Profile calls inside `@app.on_event('startup')` with asyncio current loop
Fix now
Move heavy startup tasks (e.g. creating aiohttp sessions) to lazy initialisation or use asyncio.sleep(0) to yield control.
Comparing asyncio, Threading, and Multiprocessing
Aspectasyncio (Coroutines)Threading (Thread per task)Multiprocessing
Best use caseI/O-bound: APIs, DBs, socketsI/O-bound + legacy sync libsCPU-bound: parsing, ML, crypto
Concurrency modelCooperative (yields voluntarily)Preemptive (OS switches threads)True parallelism (multiple processes)
Memory per unit~1-2 KB per coroutine frame~1-8 MB per OS thread stackHigh — full interpreter copy
GIL impactNo GIL contention (single thread)GIL limits CPU parallelismNo GIL — full CPU parallelism
Max concurrency10,000s of tasks easily~100-1000 threads typicalLimited by CPU cores
Shared state safetySafe — single thread, no racesUnsafe — need locks/queuesUnsafe — need IPC
Blocking call impactFreezes ALL other tasksOnly blocks that one threadOnly blocks that one process
Library requirementsMust use async-native libsAny synchronous library worksMust be picklable
Error propagationCancelledError + ExceptionGroupException in thread is silent unless joinedException must be passed via queue/pipe
Python version3.7+ for modern API, 3.11+ for TaskGroupAll versionsAll versions

Key takeaways

1
asyncio runs a single thread with cooperative multitasking
coroutines yield at each await.
2
Coroutine frames cost ~1-2 KB vs 1-8 MB for threads
enabling 10,000s of connections.
3
Never block the event loop with synchronous I/O or CPU-heavy code; use to_thread() or run_in_executor().
4
Always re-raise CancelledError; swallowing it breaks shutdown.
5
Prefer asyncio.gather with return_exceptions=True or use TaskGroup (3.11+) for structured concurrency.
6
Enable debug mode (PYTHONASYNCIODEBUG=1) during development to catch blocking calls early.

Common mistakes to avoid

4 patterns
×

Using await on each coroutine inside a loop (sequentialisation)

Symptom
Your async API performs no better than synchronous code. Wall time is sum of all operations instead of max.
Fix
Collect coroutines into a list and pass to asyncio.gather(), or create tasks with asyncio.create_task() before awaiting.
×

Silently swallowing CancelledError

Symptom
Application hangs on shutdown. Pod takes >30 seconds to terminate. Cleanup never runs.
Fix
Re-raise CancelledError in except blocks. Use except Exception: instead of bare except: to avoid catching CancelledError inadvertently. Always use finally for cleanup that must run regardless.
×

Using time.sleep() instead of asyncio.sleep() inside a coroutine

Symptom
Entire application freezes for the duration of sleep. Other requests time out.
Fix
Replace time.sleep() with await asyncio.sleep(). For synchronous sleep during startup/shutdown, use a thread pool.
×

Calling synchronous I/O directly (requests.get, file read) inside coroutine

Symptom
Performance degradation under load. Slow-callback warnings if debug mode enabled.
Fix
Use async libraries (aiohttp, httpx.AsyncClient, aiofiles) or offload to thread pool with asyncio.to_thread().
INTERVIEW PREP · PRACTICE MODE

Interview Questions on This Topic

Q01SENIOR
Explain the difference between asyncio.gather with return_exceptions=Fal...
Q02SENIOR
What happens if you catch CancelledError and don't re-raise it?
Q03SENIOR
How does the event loop detect blocking calls, and how can you debug the...
Q01 of 03SENIOR

Explain the difference between asyncio.gather with return_exceptions=False and True. When would you use each?

ANSWER
With return_exceptions=False (default), if any task raises an exception, gather cancels all other pending tasks and re-raises the first exception immediately. You lose results from tasks that already completed. Use this when the operation is atomic — any failure should abort the whole batch. With return_exceptions=True, exceptions are returned as values in the results list; no tasks are cancelled. Use this for fault-tolerant operations where partial success is valuable, e.g., fetching data from multiple caches where some may fail.
FAQ · 3 QUESTIONS

Frequently Asked Questions

01
Why does my async code run slower than sync code?
02
What's the difference between asyncio.run() and creating an event loop manually?
03
How do I handle timeouts properly in asyncio?
N
Naren Founder & Principal Engineer

20+ years shipping production Python across data and backend systems. Written from production experience, not tutorials.

Follow
Verified
production tested
May 23, 2026
last updated
1,554
articles · all by Naren
🔥

That's Advanced Python. Mark it forged?

18 min read · try the examples if you haven't

Previous
Iterators and Iterables in Python
2 / 17 · Advanced Python
Next
Metaclasses in Python