FastAPI Database Integration with SQLAlchemy
- The
get_db()generator is a 'Unit of Work' pattern implementation—ensuring atomic transactions per request. - Async Drivers: Always use
asyncpgoraiosqlitewith FastAPI to avoid stalling the event loop on database waits. - SQLAlchemy 2.0 Syntax: Prefer
select(Model)over the legacysession.query(Model)for better type safety and future-proofing.
- FastAPI uses Depends() to inject a database session per request
- The get_db() generator yields a session and guarantees cleanup in finally
- Always use AsyncSession with asyncpg to avoid blocking the event loop
- Session pool size must match your worker concurrency — too many connections can starve PostgreSQL
- Biggest mistake: reusing a session across multiple handlers causes stale data and transaction leaks
Get pool exhausted error
docker exec <pg_container> psql -U user -c "SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"docker logs <fastapi_container> | grep 'connection'Greenlet error spamming logs
grep 'greenlet_spawn' /var/log/app.log | tail -20Review code for missing await before db methodsSlow queries under load
Set environment variable SQLALCHEMY_ECHO=1Log all queries to find slow onesProduction Incident
Depends() — it should be fine.async_sessionmaker() directly in a background task without the context manager, so session.close() was never called.AsyncSessionLocal() as session: or ensure finally block calls await session.close().Production Debug GuideDiagnose session lifecycle issues fast
db.commit() or db.execute(). Missing await leaves the coroutine unexecuted.session.rollback() on exception. Use logging to trace transaction boundaries.Async Database Configuration and Session Lifecycle
For modern FastAPI apps, we utilize asyncio. This configuration uses the postgresql+asyncpg driver. The get_db generator is the heart of the integration: it provides a session to your endpoint and guarantees closure, even if the request crashes.
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.orm import DeclarativeBase import os # Production-grade PostgreSQL async connection string DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost/forge_db") # create_async_engine prevents the event loop from blocking during I/O engine = create_async_engine(DATABASE_URL, echo=False, future=True) # Factory for creating session objects AsyncSessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False ) class Base(DeclarativeBase): """Base class for all SQLAlchemy models""" pass # Dependency: The 'Session-per-Request' provider async def get_db(): async with AsyncSessionLocal() as session: try: yield session # Session is automatically committed/closed if using 'async with' context except Exception: await session.rollback() raise finally: await session.close()
Models and Async CRUD Operations
In SQLAlchemy 2.0, we use the select() statement approach. This is more explicit and aligns better with static analysis tools like Mypy. Note how we keep the SQLAlchemy Model separate from the Pydantic schema.
from sqlalchemy import String, Integer, select from sqlalchemy.orm import Mapped, mapped_column from fastapi import FastAPI, Depends, HTTPException, status from io.thecodeforge.db.session import Base, get_db, AsyncSession from pydantic import BaseModel, EmailStr # 1. SQLAlchemy Model (Database Identity) class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(primary_key=True, index=True) username: Mapped[str] = mapped_column(String(50), unique=True) email: Mapped[str] = mapped_column(unique=True) # 2. Pydantic Schema (API Contract) class UserCreate(BaseModel): username: str email: EmailStr app = FastAPI() @app.post('/forge/users', status_code=status.HTTP_201_CREATED) async def create_user(payload: UserCreate, db: AsyncSession = Depends(get_db)): new_user = User(username=payload.username, email=payload.email) db.add(new_user) await db.commit() await db.refresh(new_user) # Reloads auto-generated ID from DB return new_user @app.get('/forge/users/{user_id}') async def read_user(user_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") return user
select() approach is safer than session.query() because it returns a coroutine that must be awaited, so you can't accidentally forget.select() not session.query() — it forces await and works with async.db.refresh() to load generated fields.Repository Pattern for Data Access
The Repository pattern abstracts database operations behind an interface. Instead of calling db.execute(select(...)) directly in your endpoint, you define a repository class (e.g., UserRepository) that contains all query logic. The endpoint depends on the repository interface, not the session directly. This separation makes unit testing possible — you can mock the repository. It also keeps your business logic clean when multiple endpoints or background tasks need the same queries.
from io.thecodeforge.db.session import AsyncSession from io.thecodeforge.db.models import User from sqlalchemy import select class UserRepository: def __init__(self, db: AsyncSession): self.db = db async def get_by_id(self, user_id: int) -> User | None: result = await self.db.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none() async def create(self, username: str, email: str) -> User: user = User(username=username, email=email) self.db.add(user) await self.db.commit() await self.db.refresh(user) return user # In endpoint: @app.get('/forge/users/{user_id}') async def read_user(user_id: int, db: AsyncSession = Depends(get_db)): repo = UserRepository(db) user = await repo.get_by_id(user_id) if not user: raise HTTPException(404) return user
- Centralizes query logic — no SQL leaks into endpoint handlers
- Makes unit testing trivial: mock the repository, not the database
- Enables swapping storage (e.g., from PostgreSQL to MongoDB) without changing business code
- Avoids duplication when multiple consumers need the same query
Managing Migrations with Alembic
Alembic is the standard migration tool for SQLAlchemy. Install it with pip install alembic, then run alembic init alembic. Configure the env.py to use your async engine and import your Base models. Use alembic revision --autogenerate -m "description" to generate migration scripts and alembic upgrade head to apply them. For async databases, you must wrap the migration in an async context — Alembic's AsyncMigrationContext handles this.
from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.ext.asyncio import create_async_engine from alembic import context from io.thecodeforge.db.session import Base from io.thecodeforge.db.models import User # noqa: F401 config = context.config fileConfig(config.config_file_name) target_metadata = Base.metadata def run_migrations_offline(): url = config.get_main_option("sqlalchemy.url") context.configure(url=url, target_metadata=target_metadata, literal_binds=True) with context.begin_transaction(): context.run_migrations() def do_run_migrations(connection): context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_async_migrations(): connectable = create_async_engine(config.get_main_option("sqlalchemy.url"), poolclass=pool.NullPool) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() def run_migrations_online(): import asyncio asyncio.run(run_async_migrations()) context.run_migrations = run_migrations_online if context.is_offline_mode() else run_migrations_offline
Testing Async Database Operations
Use pytest with async fixtures to manage a test database. Create a test client with httpx.AsyncClient and FastAPI's TestClient wrapper. For each test, start a transaction and roll it back after the test to ensure isolation. Use a dedicated test PostgreSQL database with the same schema (use Alembic to set it up). Install pytest-asyncio to support async test functions.
import pytest from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from io.thecodeforge.db.session import Base, get_db from io.thecodeforge.main import app # Use a separate test database TEST_DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/test_forge_db" @pytest.fixture async def async_engine(): engine = create_async_engine(TEST_DATABASE_URL, echo=False) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield engine await engine.dispose() @pytest.fixture async def db_session(async_engine) -> AsyncSession: session = async_sessionmaker(bind=async_engine, class_=AsyncSession, expire_on_commit=False)() try: yield session finally: await session.close() @pytest.fixture async def client(db_session): async def override_get_db(): yield db_session app.dependency_overrides[get_db] = override_get_db async with AsyncClient(transport=ASGITransport(app=app), base_url="http://testserver") as ac: yield ac @pytest.mark.asyncio async def test_create_user(client: AsyncClient): response = await client.post("/forge/users", json={"username": "tester", "email": "tester@test.com"}) assert response.status_code == 201 data = response.json() assert data["username"] == "tester"
Handling Transactions and Rollbacks in Production
In production, every request should be treated as a unit of work. Use the async with context manager to automatically commit on success and rollback on exception. If you need partial rollback within a transaction, use savepoints via session.begin()await . Always keep transactions short — avoid network calls or long computations inside a transaction block. Set session.begin_nested()statement_timeout in PostgreSQL to prevent runaway queries from holding locks.
async def create_order(db: AsyncSession, order_data: OrderCreate) -> Order: async with db.begin(): # Automatically commits or rolls back on exception user = await db.execute(select(User).where(User.id == order_data.user_id)) user = user.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") order = Order(user_id=user.id, total=order_data.total) db.add(order) # Use nested transaction for partial rollback async with db.begin_nested(): for item in order_data.items: inventory_item = await db.execute(select(Inventory).where(Inventory.product_id == item.product_id)) inventory_item = inventory_item.scalar_one_or_none() if not inventory_item or inventory_item.quantity < item.quantity: raise HTTPException(status_code=400, detail="Insufficient stock") inventory_item.quantity -= item.quantity await db.refresh(order) return order
🎯 Key Takeaways
- The
get_db()generator is a 'Unit of Work' pattern implementation—ensuring atomic transactions per request. - Async Drivers: Always use
asyncpgoraiosqlitewith FastAPI to avoid stalling the event loop on database waits. - SQLAlchemy 2.0 Syntax: Prefer
select(Model)over the legacysession.query(Model)for better type safety and future-proofing. - Model Separation: Pydantic handles the 'JSON-to-Object' conversion; SQLAlchemy handles the 'Object-to-SQL' conversion.
- Expiration: Set
expire_on_commit=Falsein your session factory to prevent 'greenlet' errors when accessing attributes after a commit in async contexts. - Repository Pattern: Abstract data access behind a repository to isolate persistence logic and simplify testing.
- Alembic Migrations: Always review autogenerated scripts and test downgrade paths before production deployment.
⚠ Common Mistakes to Avoid
Interview Questions on This Topic
- QWhat is the 'N+1 Problem' in SQLAlchemy, and how do you use
joinedloadorselectinloadin FastAPI to solve it?SeniorReveal - QExplain the internal mechanics of how
Depends(get_db)ensures the database session is closed even if the endpoint raises an unhandled exception.Mid-levelReveal - QIn a high-concurrency FastAPI app, what are the dangers of using a synchronous SQLAlchemy engine? How does it affect the Uvicorn worker?SeniorReveal
- QHow would you implement a Global Transaction Middleware so that every request is automatically wrapped in a transaction that rolls back on any 4xx or 5xx error?SeniorReveal
- QDescribe the difference between
expire_on_commit=TruevsFalse. Why isFalseoften preferred in asynchronous FastAPI applications?SeniorReveal
Frequently Asked Questions
Why do I need db.refresh() after db.commit()?
When you call , SQLAlchemy marks the local object as 'expired' because the database state is now the ultimate source of truth (it might have applied triggers, auto-increments, or default timestamps). commit() tells SQLAlchemy to perform a refresh()SELECT immediately to pull those database-generated values back into your Python object so you can use them in your API response.
Should I use SQLAlchemy ORM or Core with FastAPI?
At TheCodeForge, we use the ORM for 90% of tasks because of its productivity and relationship mapping. However, for high-performance bulk inserts or complex reporting queries involving heavy window functions, we drop down to SQLAlchemy Core (SQL Expressions) to bypass the overhead of object instantiation.
How do I handle database migrations in FastAPI?
FastAPI/SQLAlchemy do not handle migrations automatically. The industry standard is Alembic. You use Alembic to track changes to your Base models and generate migration scripts (like upgrade and downgrade) to keep your production schema in sync with your code.
What is the best way to structure my FastAPI project with SQLAlchemy?
A common production structure is: app/ with subdirectories db/ (session, models, repositories), schemas/ (Pydantic models), services/ (business logic), and routers/ (endpoints). Separate concerns: database models know nothing about Pydantic, and repositories know nothing about HTTP. Use dependency injection to wire everything together.
Developer and founder of TheCodeForge. I built this site because I was tired of tutorials that explain what to type without explaining why it works. Every article here is written to make concepts actually click.