Skip to content
Home Python FastAPI Database Integration with SQLAlchemy

FastAPI Database Integration with SQLAlchemy

Where developers are forged. · Structured learning · Free forever.
📍 Part of: Python Libraries → Topic 43 of 51
Architect a robust database layer in FastAPI using SQLAlchemy 2.
⚙️ Intermediate — basic Python knowledge assumed
In this tutorial, you'll learn
Architect a robust database layer in FastAPI using SQLAlchemy 2.
  • The get_db() generator is a 'Unit of Work' pattern implementation—ensuring atomic transactions per request.
  • Async Drivers: Always use asyncpg or aiosqlite with FastAPI to avoid stalling the event loop on database waits.
  • SQLAlchemy 2.0 Syntax: Prefer select(Model) over the legacy session.query(Model) for better type safety and future-proofing.
✦ Plain-English analogy ✦ Real code with output ✦ Interview questions
Quick Answer
  • FastAPI uses Depends() to inject a database session per request
  • The get_db() generator yields a session and guarantees cleanup in finally
  • Always use AsyncSession with asyncpg to avoid blocking the event loop
  • Session pool size must match your worker concurrency — too many connections can starve PostgreSQL
  • Biggest mistake: reusing a session across multiple handlers causes stale data and transaction leaks
🚨 START HERE
Quick AsyncSession Debug Cheat Sheet
Commands to diagnose session leaks, pool exhaustion, and greenlet errors
🟡Get pool exhausted error
Immediate ActionCheck number of active connections
Commands
docker exec <pg_container> psql -U user -c "SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
docker logs <fastapi_container> | grep 'connection'
Fix NowReduce max_overflow to 0 and increase pool_size, or kill idle sessions with pg_terminate_backend()
🟡Greenlet error spamming logs
Immediate ActionIdentify which endpoint triggered it
Commands
grep 'greenlet_spawn' /var/log/app.log | tail -20
Review code for missing await before db methods
Fix NowAdd await to all async database calls — use a linter like flake8-async to catch missing awaits
🟠Slow queries under load
Immediate ActionEnable SQLAlchemy echo=True temporarily
Commands
Set environment variable SQLALCHEMY_ECHO=1
Log all queries to find slow ones
Fix NowAdd database indexes or use selectinload to avoid N+1
Production IncidentAsyncSession Not Closed – Connection Pool ExhaustionA background task forgot to close the session, causing the pool to drain and all new requests to hang.
SymptomAfter 2 hours of steady traffic, all endpoints start returning 503 or timing out. No errors in logs until connections reach max.
AssumptionThe session is automatically closed at the end of the request via Depends() — it should be fine.
Root causeA developer used async_sessionmaker() directly in a background task without the context manager, so session.close() was never called.
FixWrap all session usage in async with AsyncSessionLocal() as session: or ensure finally block calls await session.close().
Key Lesson
Every session must be scoped to a unit of work — never create sessions without a guaranteed close.Monitor connection pool usage with pg_stat_activity.Set a statement_timeout and idle_in_transaction_session_timeout on PostgreSQL as a safety net.
Production Debug GuideDiagnose session lifecycle issues fast
Greenlet error: 'greenlet_spawn has not been called'Check if you called await on db.commit() or db.execute(). Missing await leaves the coroutine unexecuted.
Connection refused after many requestsVerify pool_size and max_overflow in create_async_engine. Run SHOW max_connections on PostgreSQL.
Transactions appear to hang or rollback unexpectedlyEnsure each session has try-except with await session.rollback() on exception. Use logging to trace transaction boundaries.
Data returned from database is stale or expiredSet expire_on_commit=False in async_sessionmaker. Without it, attributes are expired after commit and need refresh.

Async Database Configuration and Session Lifecycle

For modern FastAPI apps, we utilize asyncio. This configuration uses the postgresql+asyncpg driver. The get_db generator is the heart of the integration: it provides a session to your endpoint and guarantees closure, even if the request crashes.

io/thecodeforge/db/session.py · PYTHON
1234567891011121314151617181920212223242526272829303132
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase
import os

# Production-grade PostgreSQL async connection string
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost/forge_db")

# create_async_engine prevents the event loop from blocking during I/O
engine = create_async_engine(DATABASE_URL, echo=False, future=True)

# Factory for creating session objects
AsyncSessionLocal = async_sessionmaker(
    bind=engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

class Base(DeclarativeBase):
    """Base class for all SQLAlchemy models"""
    pass

# Dependency: The 'Session-per-Request' provider
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            # Session is automatically committed/closed if using 'async with' context
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()
▶ Output
Async connection pool initialized. get_db yields non-blocking sessions.
📊 Production Insight
The async engine uses a connection pool — default size is 5.
If you have 8 Uvicorn workers, you'll exhaust the pool quickly.
Set pool_size = worker_count + 1 and max_overflow = 0.
Setting echo=True in production logs every query — use only in dev.
expire_on_commit=False is critical: without it, accessing model attributes after commit in async context raises greenlet errors.
🎯 Key Takeaway
Configure pool size based on worker count — oversubscription kills performance.
Always set expire_on_commit=False when using AsyncSession.
Never enable SQLAlchemy echo in production.

Models and Async CRUD Operations

In SQLAlchemy 2.0, we use the select() statement approach. This is more explicit and aligns better with static analysis tools like Mypy. Note how we keep the SQLAlchemy Model separate from the Pydantic schema.

io/thecodeforge/db/models.py · PYTHON
1234567891011121314151617181920212223242526272829303132333435
from sqlalchemy import String, Integer, select
from sqlalchemy.orm import Mapped, mapped_column
from fastapi import FastAPI, Depends, HTTPException, status
from io.thecodeforge.db.session import Base, get_db, AsyncSession
from pydantic import BaseModel, EmailStr

# 1. SQLAlchemy Model (Database Identity)
class User(Base):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True)
    email: Mapped[str] = mapped_column(unique=True)

# 2. Pydantic Schema (API Contract)
class UserCreate(BaseModel):
    username: str
    email: EmailStr

app = FastAPI()

@app.post('/forge/users', status_code=status.HTTP_201_CREATED)
async def create_user(payload: UserCreate, db: AsyncSession = Depends(get_db)):
    new_user = User(username=payload.username, email=payload.email)
    db.add(new_user)
    await db.commit()
    await db.refresh(new_user) # Reloads auto-generated ID from DB
    return new_user

@app.get('/forge/users/{user_id}')
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user
▶ Output
HTTP 201 Created: {'id': 1, 'username': 'forge_dev', 'email': 'dev@thecodeforge.io'}
📊 Production Insight
Mixing sync and async sessions in the same request causes deadlocks — ensure all database operations use the same AsyncSession.
The select() approach is safer than session.query() because it returns a coroutine that must be awaited, so you can't accidentally forget.
refresh() is necessary because after commit the object is expired — if you access an attribute without refresh, you get a greenlet error.
🎯 Key Takeaway
Use select() not session.query() — it forces await and works with async.
After commit, call db.refresh() to load generated fields.
Never mix sync and async sessions in the same request.

Repository Pattern for Data Access

The Repository pattern abstracts database operations behind an interface. Instead of calling db.execute(select(...)) directly in your endpoint, you define a repository class (e.g., UserRepository) that contains all query logic. The endpoint depends on the repository interface, not the session directly. This separation makes unit testing possible — you can mock the repository. It also keeps your business logic clean when multiple endpoints or background tasks need the same queries.

io/thecodeforge/db/repositories.py · PYTHON
123456789101112131415161718192021222324252627
from io.thecodeforge.db.session import AsyncSession
from io.thecodeforge.db.models import User
from sqlalchemy import select

class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

    async def create(self, username: str, email: str) -> User:
        user = User(username=username, email=email)
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)
        return user

# In endpoint:
@app.get('/forge/users/{user_id}')
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(404)
    return user
Mental Model
Why the Repository Pattern Matters
Think of the repository as the 'database adapter' — it isolates your domain from the persistence mechanism.
  • Centralizes query logic — no SQL leaks into endpoint handlers
  • Makes unit testing trivial: mock the repository, not the database
  • Enables swapping storage (e.g., from PostgreSQL to MongoDB) without changing business code
  • Avoids duplication when multiple consumers need the same query
📊 Production Insight
Without repository pattern, business logic leaks into endpoints.
When a second consumer (e.g., background worker) needs the same query, you end up duplicating SQL.
Repository classes centralize query logic and make unit testing possible by mocking the repository.
However, they add abstraction overhead — avoid deep inheritance hierarchies.
🎯 Key Takeaway
Repository pattern isolates database logic from endpoint code.
Use it when you have multiple consumers of the same data.
Don't over-abstract for simple CRUD — it adds cost without benefit.

Managing Migrations with Alembic

Alembic is the standard migration tool for SQLAlchemy. Install it with pip install alembic, then run alembic init alembic. Configure the env.py to use your async engine and import your Base models. Use alembic revision --autogenerate -m "description" to generate migration scripts and alembic upgrade head to apply them. For async databases, you must wrap the migration in an async context — Alembic's AsyncMigrationContext handles this.

alembic/env.py · PYTHON
12345678910111213141516171819202122232425262728293031323334
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from io.thecodeforge.db.session import Base
from io.thecodeforge.db.models import User  # noqa: F401

config = context.config
fileConfig(config.config_file_name)

target_metadata = Base.metadata

def run_migrations_offline():
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations():
    connectable = create_async_engine(config.get_main_option("sqlalchemy.url"), poolclass=pool.NullPool)
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

def run_migrations_online():
    import asyncio
    asyncio.run(run_async_migrations())

context.run_migrations = run_migrations_online if context.is_offline_mode() else run_migrations_offline
📊 Production Insight
Alembic autogenerate doesn't detect all changes — rename columns, constraint modifications often require manual editing.
Always review generated migration scripts before applying to production.
Downgrade scripts are rarely tested, so practice rollbacks in staging.
Running alembic upgrade head during deployment should be part of CI/CD, but beware of long-running migrations locking tables.
🎯 Key Takeaway
Never run autogenerated migrations blindly — review and test.
Use transactional DDL where possible (PostgreSQL supports it).
Always have a rollback plan: test downgrade scripts in staging.

Testing Async Database Operations

Use pytest with async fixtures to manage a test database. Create a test client with httpx.AsyncClient and FastAPI's TestClient wrapper. For each test, start a transaction and roll it back after the test to ensure isolation. Use a dedicated test PostgreSQL database with the same schema (use Alembic to set it up). Install pytest-asyncio to support async test functions.

io/thecodeforge/tests/test_users.py · PYTHON
123456789101112131415161718192021222324252627282930313233343536373839
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from io.thecodeforge.db.session import Base, get_db
from io.thecodeforge.main import app

# Use a separate test database
TEST_DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/test_forge_db"

@pytest.fixture
async def async_engine():
    engine = create_async_engine(TEST_DATABASE_URL, echo=False)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    await engine.dispose()

@pytest.fixture
async def db_session(async_engine) -> AsyncSession:
    session = async_sessionmaker(bind=async_engine, class_=AsyncSession, expire_on_commit=False)()
    try:
        yield session
    finally:
        await session.close()

@pytest.fixture
async def client(db_session):
    async def override_get_db():
        yield db_session
    app.dependency_overrides[get_db] = override_get_db
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://testserver") as ac:
        yield ac

@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
    response = await client.post("/forge/users", json={"username": "tester", "email": "tester@test.com"})
    assert response.status_code == 201
    data = response.json()
    assert data["username"] == "tester"
📊 Production Insight
Tests that share a database session can leak state between tests.
Use a transaction-per-test pattern: start a transaction before each test, roll it back after.
FastAPI's TestClient works with async by wrapping with AsyncClient from httpx.
Avoid hitting production database — use a dedicated test database with same schema.
🎯 Key Takeaway
Test with transactions to isolate tests and avoid cleanup code.
Use httpx.AsyncClient for testing async endpoints.
Never test against production — always use a separate test database.

Handling Transactions and Rollbacks in Production

In production, every request should be treated as a unit of work. Use the async with session.begin() context manager to automatically commit on success and rollback on exception. If you need partial rollback within a transaction, use savepoints via await session.begin_nested(). Always keep transactions short — avoid network calls or long computations inside a transaction block. Set statement_timeout in PostgreSQL to prevent runaway queries from holding locks.

io/thecodeforge/services/order_service.py · PYTHON
12345678910111213141516171819
async def create_order(db: AsyncSession, order_data: OrderCreate) -> Order:
    async with db.begin():
        # Automatically commits or rolls back on exception
        user = await db.execute(select(User).where(User.id == order_data.user_id))
        user = user.scalar_one_or_none()
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        order = Order(user_id=user.id, total=order_data.total)
        db.add(order)
        # Use nested transaction for partial rollback
        async with db.begin_nested():
            for item in order_data.items:
                inventory_item = await db.execute(select(Inventory).where(Inventory.product_id == item.product_id))
                inventory_item = inventory_item.scalar_one_or_none()
                if not inventory_item or inventory_item.quantity < item.quantity:
                    raise HTTPException(status_code=400, detail="Insufficient stock")
                inventory_item.quantity -= item.quantity
    await db.refresh(order)
    return order
⚠ Long Transactions Will Starve Your Pool
Every open transaction holds a connection from the pool. If your transaction does I/O (e.g., calling an external API), that connection is idle but not returned. At 5 connections and 8 workers, you'll exhaust the pool in seconds. Keep transactions as lean as possible.
📊 Production Insight
A common bug is catching an exception and committing partial data.
Always rollback in the except block before re-raising.
Nested transactions via savepoints allow partial rollback within a larger transaction.
In high-concurrency apps, keep transactions short — long transactions idle in 'idle in transaction' state, blocking vacuum and consuming connections.
🎯 Key Takeaway
Always rollback on exception — never commit a broken unit of work.
Use savepoints for partial rollback within a transaction.
Keep transactions as short as possible to avoid connection starvation.

🎯 Key Takeaways

  • The get_db() generator is a 'Unit of Work' pattern implementation—ensuring atomic transactions per request.
  • Async Drivers: Always use asyncpg or aiosqlite with FastAPI to avoid stalling the event loop on database waits.
  • SQLAlchemy 2.0 Syntax: Prefer select(Model) over the legacy session.query(Model) for better type safety and future-proofing.
  • Model Separation: Pydantic handles the 'JSON-to-Object' conversion; SQLAlchemy handles the 'Object-to-SQL' conversion.
  • Expiration: Set expire_on_commit=False in your session factory to prevent 'greenlet' errors when accessing attributes after a commit in async contexts.
  • Repository Pattern: Abstract data access behind a repository to isolate persistence logic and simplify testing.
  • Alembic Migrations: Always review autogenerated scripts and test downgrade paths before production deployment.

⚠ Common Mistakes to Avoid

    Using synchronous SQLAlchemy engine with FastAPI
    Symptom

    The app becomes unresponsive under load; event loop blocks on database queries.

    Fix

    Use create_async_engine and AsyncSession from sqlalchemy.ext.asyncio. Switch to asyncpg driver.

    Forgetting to await db.commit()
    Symptom

    No error raised, but data is never persisted. The transaction remains open.

    Fix

    Always use await db.commit() after adding objects. Use a linter or MyPy with async plugin to catch missing awaits.

    Not setting expire_on_commit=False
    Symptom

    Trying to access object attributes after commit raises AttributeError or greenlet error.

    Fix

    Set expire_on_commit=False in async_sessionmaker. If you need fresh data, call await db.refresh() explicitly.

    Using the same session across multiple requests (global session)
    Symptom

    Stale data, thread safety issues, transaction leaks.

    Fix

    Always use the Depends(get_db) pattern to get a fresh session per request. Never store the session as a global variable.

    Not rolling back on exception
    Symptom

    A failed request leaves the database in an inconsistent state; subsequent requests are blocked by idle transaction.

    Fix

    Wrap all session operations in try-except and call await session.rollback() on any exception. The async context manager does this automatically if you use 'async with'.

Interview Questions on This Topic

  • QWhat is the 'N+1 Problem' in SQLAlchemy, and how do you use joinedload or selectinload in FastAPI to solve it?SeniorReveal
    The N+1 problem occurs when lazy loading triggers N additional queries for each parent row. In FastAPI with AsyncSession, use selectinload(Relationship) on the query to eagerly load related objects in one additional query. joinedload works too but can cause cartesian product issues with many-to-many relationships. For performance, prefer selectinload because it issues a separate SELECT with an IN clause, which is efficient and avoids the large result set blow-up of joins.
  • QExplain the internal mechanics of how Depends(get_db) ensures the database session is closed even if the endpoint raises an unhandled exception.Mid-levelReveal
    FastAPI's Depends accepts a generator function. When the generator yields, FastAPI creates a context manager internally. If the endpoint completes normally, the generator resumes and executes the finally block, closing the session. If the endpoint raises an unhandled exception, the generator's finally block also executes because Python's garbage collection triggers the generator's cleanup via close(). Thus, the session is always closed, regardless of success or failure.
  • QIn a high-concurrency FastAPI app, what are the dangers of using a synchronous SQLAlchemy engine? How does it affect the Uvicorn worker?SeniorReveal
    A synchronous engine blocks the thread when executing database queries. Since Uvicorn uses an event loop in each worker, blocking the thread prevents that worker from handling other requests. This effectively serializes database work, drastically reducing throughput and increasing latency. Under high concurrency, the event loop starves, leading to timeouts and poor user experience. Always use the async engine with an async driver like asyncpg to release the event loop during I/O.
  • QHow would you implement a Global Transaction Middleware so that every request is automatically wrapped in a transaction that rolls back on any 4xx or 5xx error?SeniorReveal
    Implement a middleware class (using BaseHTTPMiddleware) that obtains a session from the async session factory, begins a transaction with async with session.begin(), and passes the request through. On exception, session.begin() automatically rolls back. However, middleware lacks access to the dependency injection scope, so you must create the session manually. An alternative is to use a custom dependency that wraps the entire endpoint logic in a transaction, but that requires changing every endpoint. The cleanest approach is to use a @contextmanager dependency that yields the session within a transaction and FastAPI's Depends to inject it into each route.
  • QDescribe the difference between expire_on_commit=True vs False. Why is False often preferred in asynchronous FastAPI applications?SeniorReveal
    expire_on_commit=True (the default) marks all loaded objects as expired after commit. The next attribute access triggers a lazy load to refresh the object. In async contexts, that lazy load happens synchronously (inside a property getter), which cannot await the event loop. This results in a greenlet_spawn error. Setting expire_on_commit=False keeps the object data intact after commit, avoiding accidental synchronous loads. You can still get fresh data by explicitly calling await db.refresh(object). Therefore, False is the standard setting for async apps.

Frequently Asked Questions

Why do I need db.refresh() after db.commit()?

When you call commit(), SQLAlchemy marks the local object as 'expired' because the database state is now the ultimate source of truth (it might have applied triggers, auto-increments, or default timestamps). refresh() tells SQLAlchemy to perform a SELECT immediately to pull those database-generated values back into your Python object so you can use them in your API response.

Should I use SQLAlchemy ORM or Core with FastAPI?

At TheCodeForge, we use the ORM for 90% of tasks because of its productivity and relationship mapping. However, for high-performance bulk inserts or complex reporting queries involving heavy window functions, we drop down to SQLAlchemy Core (SQL Expressions) to bypass the overhead of object instantiation.

How do I handle database migrations in FastAPI?

FastAPI/SQLAlchemy do not handle migrations automatically. The industry standard is Alembic. You use Alembic to track changes to your Base models and generate migration scripts (like upgrade and downgrade) to keep your production schema in sync with your code.

What is the best way to structure my FastAPI project with SQLAlchemy?

A common production structure is: app/ with subdirectories db/ (session, models, repositories), schemas/ (Pydantic models), services/ (business logic), and routers/ (endpoints). Separate concerns: database models know nothing about Pydantic, and repositories know nothing about HTTP. Use dependency injection to wire everything together.

🔥
Naren Founder & Author

Developer and founder of TheCodeForge. I built this site because I was tired of tutorials that explain what to type without explaining why it works. Every article here is written to make concepts actually click.

← PreviousFastAPI Background Tasks and Async EndpointsNext →FastAPI File Uploads and Form Data
Forged with 🔥 at TheCodeForge.io — Where Developers Are Forged