Python asyncio Deep Dive: Event Loops, Coroutines & Real-World Concurrency
Every production Python service that handles real traffic eventually hits the same wall: threads are expensive, processes are heavier, and yet your app spends most of its time waiting — waiting for a database to respond, an API to reply, a file to finish reading. The traditional synchronous model burns a whole OS thread just to wait. At modest scale that's fine. At thousands of concurrent connections, it's a memory and context-switching disaster. This is the problem asyncio was designed to solve, and it solves it elegantly by never blocking a thread when it doesn't have to.
asyncio gives Python a cooperative multitasking model built on coroutines, an event loop, and non-blocking I/O. It doesn't use multiple threads or processes — it uses a single thread that is always doing something useful. When one coroutine pauses to wait for I/O, the event loop immediately hands control to another coroutine that's ready to run. The result is software that can handle thousands of concurrent I/O operations with the resource footprint of a single thread.
By the end of this article you'll understand how the event loop actually works under the hood, the difference between coroutines, tasks, and futures, how to write production-grade async code that doesn't silently swallow exceptions or deadlock, and the exact scenarios where asyncio wins and where it's the wrong tool entirely.
The Event Loop Internals: What Actually Runs Your Coroutines
The event loop is the engine of asyncio. It's a single-threaded scheduler that maintains two core structures: a queue of ready callbacks and a set of I/O watchers registered with the OS via selectors (epoll on Linux, kqueue on macOS, IOCP on Windows). On every iteration — called a 'tick' — the loop does three things: runs all callbacks that are ready right now, asks the OS 'which I/O operations finished?' via a select/poll syscall, and converts those completions into new callbacks. That's it. Everything else is built on top of this loop.
A coroutine is just a Python generator function declared with async def. Calling it doesn't run it — it returns a coroutine object. The event loop drives it by calling .send(None) repeatedly. Every time the coroutine hits an await, it yields a Future object back to the loop. The loop registers interest in that future's completion, files it away, and moves on to the next ready coroutine. When the future resolves (I/O completed, timer fired, another coroutine finished), the loop resumes the original coroutine by calling .send(result) — injecting the result directly into the point where await paused.
This means await is not magic sleep — it's a structured yield point. Your coroutine cooperatively hands control back to the loop. If you write CPU-heavy code between two await points, you block the entire event loop for every other coroutine during that time. This is the single most important internals fact about asyncio: there is no preemption. One badly written coroutine can freeze your entire application.
import asyncio import time # ------------------------------------------------------------------- # Demonstrates the event loop tick mechanism and cooperative yielding. # Run this and watch how two coroutines interleave on a SINGLE thread. # ------------------------------------------------------------------- async def fetch_user_profile(user_id: int) -> dict: """Simulates a database round-trip with asyncio.sleep.""" print(f"[{time.perf_counter():.3f}s] Starting fetch for user {user_id}") # 'await' yields control back to the event loop here. # The loop immediately runs other ready coroutines while we 'wait'. await asyncio.sleep(1.0) # represents a 1-second DB query print(f"[{time.perf_counter():.3f}s] Finished fetch for user {user_id}") return {"id": user_id, "name": f"User_{user_id}"} async def send_welcome_email(user_id: int) -> None: """Simulates an outbound SMTP call.""" print(f"[{time.perf_counter():.3f}s] Sending email to user {user_id}") await asyncio.sleep(0.5) # represents a 500ms SMTP handshake print(f"[{time.perf_counter():.3f}s] Email sent to user {user_id}") async def main() -> None: start = time.perf_counter() # asyncio.gather schedules BOTH coroutines as Tasks immediately. # The event loop runs them concurrently on the single thread. profile, _ = await asyncio.gather( fetch_user_profile(user_id=42), send_welcome_email(user_id=42), ) elapsed = time.perf_counter() - start print(f"\nAll done in {elapsed:.3f}s — not 1.5s, because they ran concurrently.") print(f"Profile retrieved: {profile}") if __name__ == "__main__": # asyncio.run() creates a NEW event loop, runs main(), then closes the loop. # Never call this inside an already-running loop (e.g. Jupyter) — use await directly. asyncio.run(main())
[0.000s] Sending email to user 42
[0.501s] Email sent to user 42
[1.001s] Finished fetch for user 42
All done in 1.001s — not 1.5s, because they ran concurrently.
Profile retrieved: {'id': 42, 'name': 'User_42'}
Coroutines vs Tasks vs Futures: The Hierarchy You Must Know
These three concepts confuse almost everyone at first because they're related but distinct, and the Python docs don't make the hierarchy obvious. Let's be precise.
A coroutine is a Python object representing a paused computation. It does nothing on its own — it needs something to drive it. It has no scheduling, no concurrency, no timeout. If you await a coroutine directly, you're running it sequentially in the current coroutine. That's fine for sequential steps but defeats the purpose of concurrency.
A Task wraps a coroutine and schedules it to run on the event loop. When you call asyncio.create_task(some_coroutine()), the loop immediately queues that coroutine for execution — it doesn't wait for you to await it. Tasks run concurrently. This is the mechanism behind actual concurrent execution. Tasks also hold the running coroutine's state, capture exceptions, and allow cancellation.
A Future is the lower-level primitive. It's a promise: a container for a value that doesn't exist yet. When I/O completes, the event loop resolves a Future and any coroutine awaiting that Future is resumed. Tasks are a subclass of Future. You rarely create raw Futures in application code — they're mostly used when wrapping callback-based APIs or writing protocol implementations. Understanding that Task IS-A Future explains why you can await task just like you await asyncio.sleep().
The practical rule: use asyncio.create_task() when you want true concurrent execution. Use bare await coroutine() only when you want sequential execution inside your current coroutine.
import asyncio import time # ------------------------------------------------------------------- # Side-by-side comparison: sequential coroutine awaiting vs concurrent Tasks. # This is the most common performance mistake in async Python code. # ------------------------------------------------------------------- async def query_database(query_name: str, latency_seconds: float) -> str: """Simulates a database query with configurable latency.""" await asyncio.sleep(latency_seconds) return f"Result of '{query_name}'" async def sequential_approach() -> None: """BAD: awaiting coroutines directly runs them one after another.""" start = time.perf_counter() # Each await BLOCKS here until the previous one finishes. # Total time = 0.3 + 0.4 + 0.2 = 0.9 seconds result_orders = await query_database("orders", latency_seconds=0.3) result_users = await query_database("users", latency_seconds=0.4) result_prefs = await query_database("prefs", latency_seconds=0.2) elapsed = time.perf_counter() - start print(f"Sequential: {elapsed:.3f}s — {result_orders}, {result_users}, {result_prefs}") async def concurrent_approach() -> None: """GOOD: create_task() schedules all three immediately.""" start = time.perf_counter() # All three tasks are scheduled NOW and run concurrently. # The event loop interleaves them — total time ≈ max(0.3, 0.4, 0.2) = 0.4 seconds task_orders = asyncio.create_task(query_database("orders", latency_seconds=0.3)) task_users = asyncio.create_task(query_database("users", latency_seconds=0.4)) task_prefs = asyncio.create_task(query_database("prefs", latency_seconds=0.2)) # Now we collect the results — tasks were ALREADY running while we did this result_orders = await task_orders result_users = await task_users result_prefs = await task_prefs elapsed = time.perf_counter() - start print(f"Concurrent: {elapsed:.3f}s — {result_orders}, {result_users}, {result_prefs}") async def gather_approach() -> None: """BEST for fire-and-collect: asyncio.gather is cleaner than manual tasks.""" start = time.perf_counter() # gather() wraps each coroutine in a Task automatically and collects results # in the SAME ORDER as the arguments, regardless of completion order. results = await asyncio.gather( query_database("orders", latency_seconds=0.3), query_database("users", latency_seconds=0.4), query_database("prefs", latency_seconds=0.2), ) elapsed = time.perf_counter() - start print(f"Gather: {elapsed:.3f}s — {results}") async def main() -> None: await sequential_approach() await concurrent_approach() await gather_approach() if __name__ == "__main__": asyncio.run(main())
Concurrent: 0.401s — Result of 'orders', Result of 'users', Result of 'prefs'
Gather: 0.401s — ['Result of orders', 'Result of users', 'Result of prefs']
Production Patterns: Timeouts, Cancellation, and Exception Handling
Concurrent code that works in development breaks in production for one consistent reason: nobody handled the failure cases. In asyncio, there are three failure modes you must design for explicitly: tasks that take too long, tasks that need to be stopped, and tasks that die silently.
Timeouts are handled with asyncio.wait_for(coroutine, timeout=N). If the coroutine doesn't complete within N seconds, it raises asyncio.TimeoutError AND cancels the inner coroutine. This is important: the cancellation happens automatically. If your coroutine holds a database connection or a lock, you need to handle asyncio.CancelledError with a try/finally to clean up.
Cancellation is how the event loop communicates to a task that it should stop. When you call task.cancel(), the loop injects a CancelledError at the next await point inside that task. Your coroutine can catch it with try/except asyncio.CancelledError — but you MUST re-raise it after cleanup. Swallowing CancelledError is a hard anti-pattern that causes resource leaks and hangs at shutdown.
Silent task failure is the most insidious bug. If you create a task with create_task() but never await it and it raises an exception, that exception is silently stored in the Task object and only surfaces as a warning when the Task is garbage-collected. Always keep a reference to created tasks and attach a done callback to handle exceptions immediately.
import asyncio import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) # ------------------------------------------------------------------- # Production patterns: timeouts, cancellation, and silent failure prevention. # These patterns separate hobby code from production-grade async services. # ------------------------------------------------------------------- async def slow_third_party_api(endpoint: str) -> dict: """Simulates an API that sometimes takes way too long.""" await asyncio.sleep(5.0) # this would block us for 5 seconds return {"endpoint": endpoint, "data": "some payload"} async def fetch_with_timeout(endpoint: str, timeout_seconds: float) -> dict | None: """ Wraps any coroutine with a hard timeout. Returns None on timeout instead of crashing the whole request. """ try: # wait_for cancels the inner coroutine automatically on timeout result = await asyncio.wait_for( slow_third_party_api(endpoint), timeout=timeout_seconds ) return result except asyncio.TimeoutError: logger.warning("API call to '%s' timed out after %ss", endpoint, timeout_seconds) return None # caller gets None, not a crash async def resource_holding_task(resource_name: str) -> str: """ Demonstrates CORRECT cancellation handling. Cleans up resources even when cancelled mid-execution. """ logger.info("Acquired resource: %s", resource_name) try: await asyncio.sleep(10.0) # long operation return f"Finished with {resource_name}" except asyncio.CancelledError: # Clean up before propagating — this is MANDATORY logger.info("Cancelled! Releasing resource: %s", resource_name) raise # ALWAYS re-raise CancelledError — never swallow it finally: # finally runs whether cancelled OR completed normally logger.info("Cleanup complete for: %s", resource_name) def log_task_exception(task: asyncio.Task) -> None: """ Done callback that surfaces exceptions from fire-and-forget tasks. Attach this to any task you don't plan to await directly. """ if task.cancelled(): return # cancellation is intentional, not an error exc = task.exception() if exc is not None: # Without this, the exception would be silently swallowed logger.error("Task '%s' failed with exception: %r", task.get_name(), exc) async def main() -> None: # --- Pattern 1: Timeout --- logger.info("=== Pattern 1: Timeout ===") result = await fetch_with_timeout("/api/recommendations", timeout_seconds=2.0) logger.info("Result: %s", result) # None — timed out # --- Pattern 2: Cancellation with cleanup --- logger.info("\n=== Pattern 2: Cancellation ===") db_task = asyncio.create_task( resource_holding_task("postgres_connection"), name="db-task" ) await asyncio.sleep(0.1) # let the task start db_task.cancel() # inject CancelledError at the next await inside db_task try: await db_task # wait for cancellation to complete fully except asyncio.CancelledError: logger.info("db_task cancellation confirmed") # --- Pattern 3: Fire-and-forget with exception surfacing --- logger.info("\n=== Pattern 3: Fire-and-forget safety ===") async def buggy_background_job() -> None: await asyncio.sleep(0.1) raise ValueError("Something went wrong in the background") background_task = asyncio.create_task( buggy_background_job(), name="background-job" ) # Attach callback — fires when task finishes (success OR failure) background_task.add_done_callback(log_task_exception) await asyncio.sleep(0.5) # give the background task time to run and fail logger.info("Main coroutine continued unaffected") if __name__ == "__main__": asyncio.run(main())
2024-01-15 10:00:02,003 WARNING API call to '/api/recommendations' timed out after 2.0s
2024-01-15 10:00:02,003 INFO Result: None
2024-01-15 10:00:02,003 INFO === Pattern 2: Cancellation ===
2024-01-15 10:00:02,003 INFO Acquired resource: postgres_connection
2024-01-15 10:00:02,103 INFO Cancelled! Releasing resource: postgres_connection
2024-01-15 10:00:02,103 INFO Cleanup complete for: postgres_connection
2024-01-15 10:00:02,103 INFO db_task cancellation confirmed
2024-01-15 10:00:02,103 INFO === Pattern 3: Fire-and-forget safety ===
2024-01-15 10:00:02,204 ERROR Task 'background-job' failed with exception: ValueError('Something went wrong in the background')
2024-01-15 10:00:02,504 INFO Main coroutine continued unaffected
When asyncio Is the Wrong Tool — and How to Mix It With Threads and Processes
asyncio is not a universal concurrency hammer. It's purpose-built for I/O-bound concurrency — situations where your code spends most of its time waiting for external systems. It's the wrong tool for CPU-bound work like image processing, ML inference, cryptography, or heavy data transformation. In those cases, asyncio doesn't help because the event loop is still blocked by the running computation — there's no I/O to yield on.
For CPU-bound work, you need ProcessPoolExecutor (multiple cores) or ThreadPoolExecutor (for I/O-bound work that uses blocking libraries like requests or psycopg2 that have no async version). The bridge between the async world and these executors is asyncio.run_in_executor() — it submits a callable to a thread or process pool and wraps the result in a Future your coroutine can await. The event loop remains free while the work happens in another thread or process.
In large production systems you'll often have a hybrid architecture: an asyncio event loop handling thousands of concurrent HTTP connections, with run_in_executor offloading CPU-intensive operations to a ProcessPoolExecutor. FastAPI, for example, is built exactly this way — async endpoints for I/O, sync functions wrapped automatically in a thread pool executor when decorated without async def.
The performance rule of thumb: if a task has more than a few milliseconds of pure CPU work with no await in it, move it to an executor. A 50ms CPU-bound operation blocks 50ms of event loop time, stalling every other concurrent connection.
import asyncio import hashlib import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor # ------------------------------------------------------------------- # Bridging asyncio with thread and process pools. # Use this when you need to call blocking or CPU-heavy code # without freezing your entire event loop. # ------------------------------------------------------------------- def compute_password_hash(password: str, iterations: int) -> str: """ CPU-bound work: PBKDF2 with high iteration count. This is PURE computation — no I/O — so it belongs in a process pool. Running this directly in a coroutine would block the event loop. """ salt = b"static_salt_for_demo" key = hashlib.pbkdf2_hmac( hash_name="sha256", password=password.encode(), salt=salt, iterations=iterations ) return key.hex() def blocking_legacy_db_query(user_id: int) -> dict: """ I/O-bound but uses a BLOCKING library (no async driver available). Thread pool is appropriate here — threads release the GIL during I/O. """ time.sleep(0.3) # simulate blocking DB call via synchronous driver return {"user_id": user_id, "email": f"user{user_id}@example.com"} async def handle_user_registration(username: str, raw_password: str) -> dict: """ Realistic registration handler that needs both: - a blocking legacy DB query (thread pool) - CPU-intensive password hashing (process pool) """ loop = asyncio.get_running_loop() # Thread pool: right tool for blocking I/O with sync libraries with ThreadPoolExecutor(max_workers=4) as thread_pool: user_record = await loop.run_in_executor( thread_pool, blocking_legacy_db_query, 42 # user_id ) # Process pool: right tool for CPU-bound work (bypasses the GIL) with ProcessPoolExecutor(max_workers=2) as process_pool: password_hash = await loop.run_in_executor( process_pool, compute_password_hash, raw_password, 200_000 # high iteration count for security ) return { "username": username, "email": user_record["email"], "password_hash": password_hash[:16] + "...", # truncated for display "status": "registered" } async def main() -> None: start = time.perf_counter() # While these run, the event loop is FREE to handle other requests result = await handle_user_registration( username="alice", raw_password="super_secret_passphrase_123" ) elapsed = time.perf_counter() - start print(f"Registration complete in {elapsed:.3f}s") print(f"Result: {result}") if __name__ == "__main__": # ProcessPoolExecutor requires if __name__ == '__main__' guard on Windows/macOS asyncio.run(main())
Result: {'username': 'alice', 'email': 'user42@example.com', 'password_hash': '3f7a9c2b1e4d8f0a...', 'status': 'registered'}
| Aspect | asyncio (single-threaded) | ThreadPoolExecutor | ProcessPoolExecutor |
|---|---|---|---|
| Best for | I/O-bound with async libraries | I/O-bound with blocking libraries | CPU-bound computation |
| Concurrency model | Cooperative (yield-based) | Preemptive (OS threads) | True parallelism (OS processes) |
| GIL impact | N/A — single thread | GIL released during I/O waits | Fully bypasses GIL |
| Memory overhead | Very low (~few KB per coroutine) | High (~8MB per thread stack) | Very high (~25MB per process) |
| Startup cost | Near-zero | Low | High (process fork/spawn) |
| Shared state | Safe — no data races | Unsafe — needs locks | Unsafe — needs IPC/queues |
| Exception visibility | Silently stored if task not awaited | Future.exception() or callback | Future.exception() or callback |
| Max concurrent ops | Tens of thousands | Hundreds (OS thread limit) | Tens (CPU core count) |
| Production use case | Web servers, microservices, scrapers | Sync DB drivers, legacy code | ML inference, image processing |
🎯 Key Takeaways
- asyncio is cooperative — there is zero preemption. Any coroutine that runs CPU-heavy code between two await points blocks every other concurrent operation. Profile your hot paths.
- Coroutine objects do nothing until driven.
await coro()runs sequentially.create_task(coro())schedules concurrent execution immediately — this distinction is the single most common asyncio performance mistake. - Always re-raise CancelledError after cleanup. Swallowing it silently breaks timeouts, graceful shutdown, and task groups — resulting in hangs that are nearly impossible to diagnose under load.
- asyncio.gather() returns results in argument order, not completion order, and raises the first exception by default. Use return_exceptions=True when you want partial results even if some coroutines fail.
⚠ Common Mistakes to Avoid
- ✕Mistake 1: Awaiting coroutines sequentially instead of creating Tasks — calling
result1 = await coro1()thenresult2 = await coro2()runs them one-by-one, wasting all concurrency benefit. Symptom: your async service is no faster than synchronous code. Fix: useasyncio.gather(coro1(), coro2())orasyncio.create_task()for each, then await the tasks separately. - ✕Mistake 2: Swallowing CancelledError in exception handlers — wrapping an await in
except Exceptioncatches CancelledError too (it inherits from BaseException in Python 3.8+, but older patterns usingexcept Exceptionstill catch it in some versions). Symptom: asyncio.wait_for() hangs forever; graceful shutdown stalls indefinitely. Fix: always catchasyncio.CancelledErrorseparately, do your cleanup, thenraiseit unconditionally. - ✕Mistake 3: Calling asyncio.run() inside an already-running event loop — common in Jupyter notebooks or when nesting async frameworks. Symptom:
RuntimeError: This event loop is already running. Fix: in Jupyter,await main()directly or usenest_asyncio.apply(). In production code, restructure so asyncio.run() is only called once at the top level, and everything else usesawaitorcreate_task().
Interview Questions on This Topic
- QExplain what happens step-by-step when a coroutine hits an `await asyncio.sleep(1)` call — what does the event loop do during that 1 second, and how does it know to resume your coroutine afterwards?
- QWhat's the difference between asyncio.gather() and asyncio.wait()? When would you prefer one over the other in a production service that calls 10 downstream APIs?
- QYou have an async web service that starts becoming unresponsive under load even though it's using asyncio. CPU and memory look normal. What's your debugging approach, and what's the most likely cause?
Frequently Asked Questions
Is Python asyncio actually faster than using threads?
For I/O-bound workloads at high concurrency, yes — dramatically so. A coroutine costs a few kilobytes of memory and has near-zero context-switch overhead compared to an OS thread's 8MB stack and expensive kernel scheduling. However, for CPU-bound work, asyncio provides zero speedup — you need processes for that. And for low-concurrency workloads (say, under 50 concurrent operations), threads and asyncio perform comparably.
Why can't I just use asyncio.run() inside a Jupyter notebook?
Jupyter already runs its own event loop internally to keep the kernel responsive. asyncio.run() tries to create and start a brand-new event loop, which raises RuntimeError because you can't nest two running loops in the same thread. The fix is either to await your coroutine directly in a notebook cell, or install nest_asyncio and call nest_asyncio.apply() once at the top of your notebook.
What does it mean when Python logs 'Task was destroyed but it is pending'?
This warning means you created a Task with create_task() but the event loop was shut down (via asyncio.run() completing) before the task had a chance to run or finish. It's a sign of a resource leak — the task was garbage-collected without ever resolving. Fix it by ensuring all tasks you create are either awaited, gathered, or explicitly cancelled and awaited during your shutdown sequence before the event loop closes.
Written and reviewed by senior developers with real-world experience across enterprise, startup and open-source projects. Every article on TheCodeForge is written to be clear, accurate and genuinely useful — not just SEO filler.