Home Python FastAPI WebSockets — Real-time Communication

FastAPI WebSockets — Real-time Communication

⚡ Quick Answer
Declare a persistent bidirectional stream using @app.websocket('/ws'). Establish the handshake with await websocket.accept(). Use an asynchronous loop with await websocket.receive_text() and await websocket.send_text() to manage data flow. Crucially, always wrap your logic in a try/except block for WebSocketDisconnect to ensure proper resource cleanup on the server.

Stateful Connection Management

In a production environment, you rarely handle a single socket in isolation. You need a centralized 'Manager' to track active sessions, handle ID-based routing, and manage life-cycles. This pattern prevents memory leaks and ensures that a single crashing socket doesn't destabilize your worker process.

io/thecodeforge/realtime/connection_manager.py · PYTHON
12345678910111213141516171819202122232425262728293031323334353637383940
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict

app = FastAPI()

class ForgeSocketManager:
    """Manages active WebSocket connections with unique identifiers."""
    def __init__(self):
        # Map user IDs to their respective WebSocket objects
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[user_id] = websocket

    def disconnect(self, user_id: str):
        if user_id in self.active_connections:
            del self.active_connections[user_id]

    async def send_personal_message(self, message: str, user_id: str):
        if user_id in self.active_connections:
            await self.active_connections[user_id].send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections.values():
            await connection.send_text(message)

manager = ForgeSocketManager()

@app.websocket('/ws/{user_id}')
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(user_id, websocket)
    try:
        while True:
            # Listen for incoming messages
            data = await websocket.receive_text()
            await manager.broadcast(f'User {user_id} sent: {data}')
    except WebSocketDisconnect:
        manager.disconnect(user_id)
        await manager.broadcast(f'User {user_id} has left the forge.')
▶ Output
Broadcasting enabled: Messages are routed to all connected users.

Securing the Handshake

WebSockets do not support custom HTTP headers (like 'Authorization: Bearer...') in standard browser implementations during the initial handshake. Therefore, you must use signed query parameters or cookies. Below is a production-grade approach using a dependency-injected token check before the socket is even accepted.

io/thecodeforge/security/socket_auth.py · PYTHON
123456789101112131415161718192021
from fastapi import FastAPI, WebSocket, Query, status

app = FastAPI()

async def get_token_header(token: str = Query(...)):
    # In production, verify this JWT against your Auth provider
    if token != "secret-forge-token":
        return False
    return True

@app.websocket('/secure-ws')
async def secure_socket(websocket: WebSocket, token: str = Query(...)):
    is_valid = await get_token_header(token)
    if not is_valid:
        # Close with policy violation code 1008 if unauthorized
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    
    await websocket.accept()
    await websocket.send_text('Access Granted. Welcome to the Forge.')
    await websocket.close()
▶ Output
Unauthorized connections are dropped before the 'accept' handshake.

🎯 Key Takeaways

  • Always call await websocket.accept() before sending or receiving.
  • Wrap the receive loop in try/except WebSocketDisconnect to handle client disconnections cleanly.
  • WebSocket connections stay open — the while True loop is the correct pattern.
  • For scaling across multiple server instances, use a pub/sub broker (Redis) to broadcast across nodes.
  • WebSocket endpoints support path parameters and query parameters the same way HTTP endpoints do.

Interview Questions on This Topic

  • QWhat is the difference between WSGI and ASGI, and why is the latter required for FastAPI WebSockets?
  • QExplain the 'C10k problem' and how Python's 'asyncio' library helps FastAPI handle thousands of concurrent WebSocket connections.
  • QHow would you implement a distributed 'Presence' system (showing who is online) using FastAPI and Redis?
  • QDescribe the lifecycle of a WebSocket handshake. What happens at the HTTP level during the 'Upgrade' request?
  • QIn a LeetCode System Design context: How would you design a scalable notification service using WebSockets for 10 million users?

Frequently Asked Questions

How do I authenticate a WebSocket connection?

Query parameters are the most common approach for WebSockets since they do not support custom headers during the handshake: @app.websocket('/ws') async def ws(websocket: WebSocket, token: str). Validate the token before calling websocket.accept(), and close with websocket.close(code=4001) if invalid.

How do WebSockets scale across multiple FastAPI instances?

A WebSocket connection is tied to a specific server instance. Broadcasting only reaches clients on the same instance. To broadcast across instances, use a Redis pub/sub channel: each instance subscribes to a channel and publishes messages to it, then all instances forward the message to their local connections.

Can I use FastAPI Middleware with WebSockets?

Traditional HTTP middleware (using @app.middleware('http')) does not trigger for WebSocket connections because WebSockets operate on a different protocol (WS/WSS) after the initial HTTP upgrade. If you need global logic, you should implement it within a custom wrapper or via Class-based dependencies.

🔥
Naren Founder & Author

Developer and founder of TheCodeForge. I built this site because I was tired of tutorials that explain what to type without explaining why it works. Every article here is written to make concepts actually click.

← PreviousFastAPI Testing with pytest and TestClientNext →FastAPI Deployment — Docker, Uvicorn and Gunicorn
Forged with 🔥 at TheCodeForge.io — Where Developers Are Forged