Home Python FastAPI Async SQLAlchemy: Alembic Migrations That Survive Production
Advanced 3 min · July 05, 2026

FastAPI Async SQLAlchemy: Alembic Migrations That Survive Production

FastAPI async SQLAlchemy Alembic migrations: avoid connection pool exhaustion, deadlocks, and schema drift with battle-tested production patterns..

N
Naren Founder & Principal Engineer

20+ years shipping production Python across data and backend systems. Written from production experience, not tutorials.

Follow
Production
production tested
July 05, 2026
last updated
141
articles · all by Naren
Before you start⏱ 30 min
  • Python async/await fundamentals
  • SQLAlchemy ORM basics (sync)
  • Basic FastAPI app structure
  • Alembic migration basics
 ● Production Incident 🔎 Debug Guide ⚙ Triage Commands
Quick Answer

Use async def endpoints with AsyncSession from sqlalchemy.ext.asyncio. Create an async engine with create_async_engine, and use async_sessionmaker for sessions. Alembic migrations run synchronously but can use the same async engine via run_async if needed. Always close sessions in finally blocks or use context managers.

✦ Definition~90s read
What is FastAPI Async SQLAlchemy?

FastAPI async SQLAlchemy combines FastAPI's async endpoints with SQLAlchemy's async engine to handle database operations without blocking the event loop. Alembic migrations manage schema changes in this async context, requiring careful handling of engine and session lifecycle to avoid connection leaks and deadlocks.

Imagine a busy restaurant kitchen with one chef (the event loop).
Plain-English First

Imagine a busy restaurant kitchen with one chef (the event loop). Every time a waiter (an endpoint) needs to check the fridge (the database), they normally block the chef until they return. Async SQLAlchemy lets the chef prep other orders while the waiter is at the fridge. Alembic migrations are like the kitchen renovation plan — they change the fridge layout without shutting down the whole restaurant, but you need to make sure no waiter is mid-reach when you move the shelves.

You've built a FastAPI app with async endpoints. It's fast. Then you add SQLAlchemy. Suddenly your 3am pager goes off: connection pool exhausted. The classic rookie mistake: using sync SQLAlchemy in async endpoints. The event loop blocks, threads pile up, and your service dies. I've seen this bring down a payments service when the thread pool was exhausted at 3am during a flash sale. The fix isn't just 'use async' — it's understanding how to wire async SQLAlchemy correctly with Alembic so your migrations don't corrupt data under load. This article gives you the exact patterns I've used in production to keep schema changes safe and connections healthy. By the end, you'll be able to set up async SQLAlchemy with Alembic, avoid the six most common production failures, and debug session leaks like a senior engineer.

Why Sync SQLAlchemy Kills Async Performance

The naive approach: use sync SQLAlchemy in async endpoints. FastAPI runs sync endpoints in a thread pool, but that defeats async's purpose. Each sync DB call blocks a thread, and if your DB is slow, threads pile up. I've seen a service with 10 workers and 20 DB connections handle 50 req/s before the thread pool exhausted. The fix: use SQLAlchemy's async extension. It uses asyncio to await DB calls, freeing the event loop for other requests. The performance difference is stark: async can handle 10x the concurrent requests with the same resources. But you must wire it correctly — one mistake and you're back to blocking.

AsyncEngineSetup.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# io.thecodeforge — Python tutorial

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase

# Single engine per app — created once at startup
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,          # Max connections in pool
    max_overflow=10,       # Extra connections if pool exhausted
    pool_pre_ping=True,    # Verify connections before use
    echo=False,            # Set True for debugging SQL
)

# Session factory — thread-safe, creates new sessions per request
async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Avoid expired attribute access after commit
)

class Base(DeclarativeBase):
    pass
Output
No output — this is setup code.
Production Trap: Engine per Request
Never create an AsyncEngine inside a request handler. Each engine creates its own connection pool. After enough requests, you'll hit OS file descriptor limits and memory exhaustion. One engine per app lifetime.

Session Lifecycle: The Most Common Leak

The session is your transactional boundary. Every request gets one session, uses it, and closes it. Leak a session and the connection stays in the pool, eventually exhausting it. The pattern: use a dependency that yields an AsyncSession and closes it in finally. FastAPI's Depends with yield handles cleanup automatically. But watch out: if you catch exceptions and don't re-raise, the session might not close. I've debugged a service where a try/except swallowed an error, the session never closed, and connections leaked at 2 per minute. After an hour, the pool was dead.

SessionDependency.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# io.thecodeforge — Python tutorial

from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

async def get_session() -> AsyncSession:
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()  # Commit if no exception
        except Exception:
            await session.rollback()  # Rollback on error
            raise  # Re-raise so FastAPI handles it
        # Session closed automatically by async with

@app.post("/orders/")
async def create_order(session: AsyncSession = Depends(get_session)):
    # Use session — no need to close manually
    result = await session.execute(...)
    return {"status": "ok"}
Output
No output — this is a pattern.
Senior Shortcut: Use `async with`
Always use async with async_session_factory() as session. It guarantees session.close() is called even if an exception occurs. Never manually call session.close() — use context managers.

Alembic Migrations in an Async World

Alembic migrations run synchronously by default. That's fine — migrations are rare and usually run offline or during maintenance windows. But if you need to run migrations against an async engine (e.g., in a CI pipeline that also runs async tests), you can use alembic run_async. The key: Alembic's env.py needs to create a sync connection from the async engine. The pattern: use run_async with a function that gets a sync connection via engine.sync_engine. This avoids creating a second engine. I've seen teams create a separate sync engine for migrations, which works but doubles connection overhead. Stick to one engine.

alembic/env.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# io.thecodeforge — Python tutorial

from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from alembic import context

config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

from app.models import Base  # your declarative base
target_metadata = Base.metadata

# Use the same async engine as your app
async_engine: AsyncEngine = create_async_engine(
    config.get_main_option("sqlalchemy.url"),
    poolclass=pool.NullPool,  # No pooling for migrations
)

def run_migrations_offline() -> None:
    context.configure(url=async_engine.url, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations():
    async with async_engine.connect() as conn:
        await conn.run_sync(do_run_migrations)
    await async_engine.dispose()

def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    import asyncio
    asyncio.run(run_async_migrations())

# Choose mode based on context
if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()
Output
No output — this is configuration.
Never Do This: Sync Engine for Migrations
Don't create a separate sync engine for Alembic. Use run_sync on your async engine. Two engines mean two connection pools, twice the memory, and potential deadlocks if both try to access the same schema simultaneously.

Handling Concurrent Migrations Safely

Running migrations while the app is live is risky. Alembic acquires a lock on the migration table, but DDL statements like ALTER TABLE can lock tables and block reads/writes. The pattern: use op.execute('SET lock_timeout = 5000') at the start of a migration to abort if the lock can't be acquired within 5 seconds. Also, use op.create_index with postgresql_concurrently=True for indexes to avoid table locks. I've seen a migration that added a column to a 10M-row table lock the table for 30 seconds, causing a 5xx spike. The fix: add columns with ALTER TABLE ... ALTER COLUMN ... SET DEFAULT first, then backfill data in batches.

migrations/versions/xxxx_add_column.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# io.thecodeforge — Python tutorial

"""Add is_active column to users table."""
from alembic import op
import sqlalchemy as sa

revision = 'xxxx'
down_revision = 'yyyy'

def upgrade():
    # Set lock timeout to avoid blocking production
    op.execute('SET lock_timeout = 5000')
    
    # Add column as nullable first
    op.add_column('users', sa.Column('is_active', sa.Boolean(), nullable=True))
    
    # Backfill data in batches (run separately in low traffic)
    # This is a placeholder — actual backfill should be a separate script
    
    # Then set NOT NULL if needed
    op.alter_column('users', 'is_active', nullable=False, server_default=sa.text('true'))

def downgrade():
    op.drop_column('users', 'is_active')
Output
No output — migration script.
The Classic Bug: Long-Running Migration
A migration that runs for minutes will block all table writes. Always set lock_timeout and run migrations during maintenance windows. For large tables, use batching or online schema change tools like pgroll.

Testing Migrations in CI

Migrations that work on your laptop fail in production because of different data, indexes, or permissions. The fix: test migrations against a copy of production data. In CI, spin up a temporary PostgreSQL instance, restore a sanitized dump, run migrations, and verify the schema. Use alembic upgrade head and then alembic downgrade -1 to test rollback. I've caught a migration that dropped a column still referenced by a trigger — the test failed because the trigger existed in the dump. Without the test, that would have been a production outage.

ci_test_migrations.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# io.thecodeforge — Python tutorial

import subprocess
import psycopg2
from sqlalchemy import create_engine

def test_migrations():
    # Assume a test DB is already created and populated
    # Run migrations
    result = subprocess.run(
        ["alembic", "upgrade", "head"],
        capture_output=True, text=True
    )
    assert result.returncode == 0, f"Migration failed: {result.stderr}"
    
    # Verify schema by inspecting tables
    engine = create_engine("postgresql://user:pass@localhost/testdb")
    with engine.connect() as conn:
        # Check that expected columns exist
        result = conn.execute("SELECT column_name FROM information_schema.columns WHERE table_name='users'")
        columns = [row[0] for row in result]
        assert 'is_active' in columns, "Column is_active missing after migration"
    
    # Test rollback
    result = subprocess.run(
        ["alembic", "downgrade", "-1"],
        capture_output=True, text=True
    )
    assert result.returncode == 0, f"Rollback failed: {result.stderr}"
    print("All migration tests passed.")

if __name__ == "__main__":
    test_migrations()
Output
All migration tests passed.
Interview Gold: Rollback Testing
Always test alembic downgrade -1. Many developers only test upgrades. A failed rollback means you can't undo a bad migration without manual SQL — a nightmare during an incident.

Connection Pool Tuning for Production

Default pool size is 5. That's fine for development but will choke under load. The right size depends on your DB's max connections and your app's concurrency. Rule of thumb: pool_size = (max_db_connections / number_of_app_instances) * 0.8. For a DB with 200 connections and 10 app instances, pool_size = 16. Add max_overflow=10 for bursts. Also set pool_pre_ping=True to avoid stale connections. I've seen a service where pool_recycle was set to 3600 seconds, but the DB's wait_timeout was 300 seconds — every connection after 5 minutes was dead. Match pool_recycle to your DB's timeout minus 60 seconds.

PoolTuning.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
# io.thecodeforge — Python tutorial

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=16,          # Adjust based on DB limits
    max_overflow=10,       # Allow bursts
    pool_pre_ping=True,    # Check connection health before use
    pool_recycle=240,      # Recycle connections every 4 minutes
    pool_use_lifo=True,    # Use LIFO to reduce connection churn
)
Output
No output — configuration.
Production Trap: pool_recycle Mismatch
If pool_recycle is longer than your DB's wait_timeout or idle_in_transaction_session_timeout, connections will be dropped. Set pool_recycle to 80% of the DB timeout. For PostgreSQL, check SHOW idle_in_transaction_session_timeout;.

When Not to Use Async SQLAlchemy

Async SQLAlchemy adds complexity. If your app is I/O-bound but mostly CPU-bound (e.g., image processing), async won't help much. Also, if you're using a sync-only library (e.g., some Redis clients), you'll need to run it in a thread pool anyway, negating the benefit. For simple CRUD apps with low concurrency (< 100 req/s), sync SQLAlchemy with FastAPI's thread pool is fine. I've seen teams over-engineer with async for a 10 req/s admin panel — wasted effort. Use async when you have many concurrent DB calls (e.g., chat, real-time dashboards) or when you're integrating with other async services.

WhenToUseAsync.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# io.thecodeforge — Python tutorial

# Use async SQLAlchemy when:
# - High concurrency (> 1000 concurrent requests)
# - Many I/O calls per request (DB + external APIs)
# - Real-time features (WebSockets, streaming)
#
# Use sync SQLAlchemy when:
# - Low concurrency (< 100 req/s)
# - Simple CRUD with minimal I/O
# - Team is not comfortable with async debugging
#
# Example: a dashboard that fetches 10 DB queries per request
# Async can run them concurrently, cutting response time from 1s to 200ms.
Output
No output — decision guide.
Senior Shortcut: Profile First
Before rewriting everything async, profile your endpoints. If the bottleneck is CPU (e.g., JSON serialization), async won't help. Use py-spy or cProfile to see where time is spent.

Debugging Session Leaks in Production

Session leaks manifest as `TimeoutError: QueuePool limit of size X overflow Y reached. The fix: add logging to track session creation and closure. Use a custom AsyncSession subclass that logs when it's created and closed. Or use async_session_factory with a context manager that logs. I've also used gc.get_objects()` to find unreleased sessions. The pattern: in a health endpoint, check the number of active sessions. If it's growing, you have a leak.

DebugSessionLeaks.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# io.thecodeforge — Python tutorial

import logging
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import sessionmaker

logger = logging.getLogger(__name__)

class LoggedAsyncSession(AsyncSession):
    async def close(self):
        logger.debug(f"Closing session {id(self)}")
        await super().close()

# Use this factory to track sessions
async_session_factory = async_sessionmaker(
    engine,
    class_=LoggedAsyncSession,
    expire_on_commit=False,
)

# In a health endpoint:
@app.get("/health")
async def health():
    # Check pool status
    pool = engine.pool
    return {
        "pool_size": pool.size(),
        "checked_in_connections": pool.checkedin(),
        "overflow": pool.overflow(),
    }
Output
{"pool_size": 16, "checked_in_connections": 10, "overflow": 2}
Never Do This: Silent Exception Handling
If you catch an exception in a session and don't re-raise, the session won't be closed. Always re-raise or use async with which guarantees cleanup.
● Production incidentPOST-MORTEMseverity: high

The 4GB Container That Kept Dying

Symptom
A FastAPI microservice handling checkout would OOM-kill every 2 hours under moderate load. Memory usage grew linearly with uptime.
Assumption
Memory leak in business logic — maybe a cache that wasn't evicting.
Root cause
Every request created a new AsyncEngine via create_async_engine in a dependency. The engine's connection pool never got disposed, and each engine held 5 connections open. After 1000 requests, 5000 idle connections consumed all memory.
Fix
Moved engine creation to app startup: @app.on_event('startup') with a single engine = create_async_engine(...). Used async_sessionmaker as a dependency. Added engine.dispose() on shutdown.
Key lesson
  • One engine per app lifetime.
  • Never create engines in request handlers — that's a memory bomb.
Production debug guideSystematic recovery paths for the failure modes engineers actually hit.3 entries
Symptom · 01
sqlalchemy.exc.TimeoutError: QueuePool limit of size X overflow Y reached
Fix
1. Check if sessions are being closed: add logging to session close. 2. Check pool_size and max_overflow settings. 3. Restart the app to clear pool. 4. Fix session leaks by using context managers.
Symptom · 02
RuntimeError: Task <Task pending ...> got Future <Future pending ...> attached to a different loop
Fix
1. Ensure engine is created once per event loop (e.g., in lifespan). 2. Use poolclass=NullPool if engine must be shared. 3. Restart the app.
Symptom · 03
Alembic migration hangs or deadlocks
Fix
1. Connect to DB and run SELECT * FROM pg_locks WHERE NOT granted; to see blocked locks. 2. Kill blocking queries. 3. Set lock_timeout in migration. 4. Run migration during maintenance window.
★ FastAPI Async SQLAlchemy — Alembic Migrations and Production Patterns Triage Cheat SheetFirst-response commands for when things go wrong — copy-paste ready.
Connection pool exhausted: `TimeoutError: QueuePool limit of size 16 overflow 10 reached`
Immediate action
Check if sessions are leaking
Commands
`docker logs <container> | grep 'Closing session'`
`curl http://localhost:8000/health | jq '.checked_in_connections'`
Fix now
Restart app and fix session lifecycle: use async with in dependencies.
Different event loop error: `RuntimeError: Task <Task pending ...> got Future <Future pending ...> attached to a different loop`+
Immediate action
Check engine creation location
Commands
`grep -r 'create_async_engine' app/`
`grep -r 'lifespan' app/`
Fix now
Move engine creation to lifespan or @app.on_event('startup').
Migration deadlock: `ERROR: deadlock detected`+
Immediate action
Check blocked queries
Commands
`SELECT * FROM pg_locks WHERE NOT granted;`
`SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE state = 'idle in transaction';`
Fix now
Add op.execute('SET lock_timeout = 5000') at start of migration.
Slow queries: `DatabaseError: server closed the connection unexpectedly`+
Immediate action
Check `pool_recycle` vs DB timeout
Commands
`SHOW idle_in_transaction_session_timeout;`
`SHOW statement_timeout;`
Fix now
Set pool_recycle to 80% of the smallest timeout value.
Feature / AspectSync SQLAlchemy in FastAPIAsync SQLAlchemy in FastAPI
Concurrency modelThread pool (blocking)Asyncio (non-blocking)
Max throughput (same resources)~500 req/s (10 workers)~5000 req/s (single worker)
ComplexityLowMedium (async/await, session lifecycle)
Connection pool usageOne pool per workerOne pool shared across event loop
Best forLow concurrency, simple CRUDHigh concurrency, many I/O calls
⚙ Quick Reference
7 commands from this guide
FileCommand / CodePurpose
AsyncEngineSetup.pyfrom sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, Asyn...Why Sync SQLAlchemy Kills Async Performance
SessionDependency.pyfrom fastapi import Depends, FastAPISession Lifecycle
alembicenv.pyfrom logging.config import fileConfigAlembic Migrations in an Async World
migrationsversionsxxxx_add_column.py"""Add is_active column to users table."""Handling Concurrent Migrations Safely
ci_test_migrations.pyfrom sqlalchemy import create_engineTesting Migrations in CI
PoolTuning.pyfrom sqlalchemy.ext.asyncio import create_async_engineConnection Pool Tuning for Production
DebugSessionLeaks.pyfrom sqlalchemy.ext.asyncio import AsyncSession, async_sessionmakerDebugging Session Leaks in Production

Key takeaways

1
One engine per app lifetime
never create engines in request handlers.
2
Always use async with for sessions
it guarantees cleanup even on exceptions.
3
Set pool_recycle to 80% of your DB's idle timeout to avoid stale connections.
4
Test migrations against a production data copy in CI, including rollback.
INTERVIEW PREP · PRACTICE MODE

Interview Questions on This Topic

Q01SENIOR
How does async SQLAlchemy handle connection pooling differently from syn...
Q02SENIOR
When would you choose async SQLAlchemy over sync SQLAlchemy in a FastAPI...
Q03SENIOR
What happens when you create an `AsyncEngine` inside a request handler, ...
Q04JUNIOR
What is the purpose of `expire_on_commit=False` in `async_sessionmaker`?
Q05SENIOR
You see `TimeoutError: QueuePool limit of size X overflow Y reached` in ...
Q06SENIOR
How would you design a migration strategy for a high-traffic FastAPI ser...
Q01 of 06SENIOR

How does async SQLAlchemy handle connection pooling differently from sync SQLAlchemy in a FastAPI app?

ANSWER
Async SQLAlchemy uses a single connection pool shared across the event loop, whereas sync SQLAlchemy creates one pool per thread (worker). This means async can handle more concurrent connections with fewer total connections, but requires careful session lifecycle management to avoid leaks.
FAQ · 4 QUESTIONS

Frequently Asked Questions

01
How do I use Alembic with async SQLAlchemy in FastAPI?
02
What's the difference between `AsyncSession` and `Session` in SQLAlchemy?
03
How do I avoid connection pool exhaustion in FastAPI with async SQLAlchemy?
04
Can I run Alembic migrations while the FastAPI app is running?
COMPLETE GUIDE
FastAPI Complete Guide — Interactive Tutorial for Production APIs →

Every FastAPI concept with runnable in-browser examples — params, Pydantic, dependency injection, JWT auth, async, SQLAlchemy, testing, WebSockets, and Docker deployment. The interactive reference for production engineers.

N
Naren Founder & Principal Engineer

20+ years shipping production Python across data and backend systems. Written from production experience, not tutorials.

Follow
Verified
production tested
July 05, 2026
last updated
141
articles · all by Naren
🔥

That's Python Libraries. Mark it forged?

3 min read · try the examples if you haven't

Previous
FastAPI Streaming Responses and File Responses
56 / 57 · Python Libraries
Next
FastAPI Cloud — Official Deployment Platform