FastAPI WebSockets — Real-time Communication
Stateful Connection Management
In a production environment, you rarely handle a single socket in isolation. You need a centralized 'Manager' to track active sessions, handle ID-based routing, and manage life-cycles. This pattern prevents memory leaks and ensures that a single crashing socket doesn't destabilize your worker process.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import Dict app = FastAPI() class ForgeSocketManager: """Manages active WebSocket connections with unique identifiers.""" def __init__(self): # Map user IDs to their respective WebSocket objects self.active_connections: Dict[str, WebSocket] = {} async def connect(self, user_id: str, websocket: WebSocket): await websocket.accept() self.active_connections[user_id] = websocket def disconnect(self, user_id: str): if user_id in self.active_connections: del self.active_connections[user_id] async def send_personal_message(self, message: str, user_id: str): if user_id in self.active_connections: await self.active_connections[user_id].send_text(message) async def broadcast(self, message: str): for connection in self.active_connections.values(): await connection.send_text(message) manager = ForgeSocketManager() @app.websocket('/ws/{user_id}') async def websocket_endpoint(websocket: WebSocket, user_id: str): await manager.connect(user_id, websocket) try: while True: # Listen for incoming messages data = await websocket.receive_text() await manager.broadcast(f'User {user_id} sent: {data}') except WebSocketDisconnect: manager.disconnect(user_id) await manager.broadcast(f'User {user_id} has left the forge.')
Securing the Handshake
WebSockets do not support custom HTTP headers (like 'Authorization: Bearer...') in standard browser implementations during the initial handshake. Therefore, you must use signed query parameters or cookies. Below is a production-grade approach using a dependency-injected token check before the socket is even accepted.
from fastapi import FastAPI, WebSocket, Query, status app = FastAPI() async def get_token_header(token: str = Query(...)): # In production, verify this JWT against your Auth provider if token != "secret-forge-token": return False return True @app.websocket('/secure-ws') async def secure_socket(websocket: WebSocket, token: str = Query(...)): is_valid = await get_token_header(token) if not is_valid: # Close with policy violation code 1008 if unauthorized await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return await websocket.accept() await websocket.send_text('Access Granted. Welcome to the Forge.') await websocket.close()
🎯 Key Takeaways
- Always call await websocket.accept() before sending or receiving.
- Wrap the receive loop in try/except WebSocketDisconnect to handle client disconnections cleanly.
- WebSocket connections stay open — the while True loop is the correct pattern.
- For scaling across multiple server instances, use a pub/sub broker (Redis) to broadcast across nodes.
- WebSocket endpoints support path parameters and query parameters the same way HTTP endpoints do.
Interview Questions on This Topic
- QWhat is the difference between WSGI and ASGI, and why is the latter required for FastAPI WebSockets?
- QExplain the 'C10k problem' and how Python's 'asyncio' library helps FastAPI handle thousands of concurrent WebSocket connections.
- QHow would you implement a distributed 'Presence' system (showing who is online) using FastAPI and Redis?
- QDescribe the lifecycle of a WebSocket handshake. What happens at the HTTP level during the 'Upgrade' request?
- QIn a LeetCode System Design context: How would you design a scalable notification service using WebSockets for 10 million users?
Frequently Asked Questions
How do I authenticate a WebSocket connection?
Query parameters are the most common approach for WebSockets since they do not support custom headers during the handshake: @app.websocket('/ws') async def ws(websocket: WebSocket, token: str). Validate the token before calling websocket.accept(), and close with websocket.close(code=4001) if invalid.
How do WebSockets scale across multiple FastAPI instances?
A WebSocket connection is tied to a specific server instance. Broadcasting only reaches clients on the same instance. To broadcast across instances, use a Redis pub/sub channel: each instance subscribes to a channel and publishes messages to it, then all instances forward the message to their local connections.
Can I use FastAPI Middleware with WebSockets?
Traditional HTTP middleware (using @app.middleware('http')) does not trigger for WebSocket connections because WebSockets operate on a different protocol (WS/WSS) after the initial HTTP upgrade. If you need global logic, you should implement it within a custom wrapper or via Class-based dependencies.
Developer and founder of TheCodeForge. I built this site because I was tired of tutorials that explain what to type without explaining why it works. Every article here is written to make concepts actually click.