Senior 7 min · March 06, 2026

CQRS Outbox Polling Failures — Connection Pool Exhaustion

One broken connection stalled 50K+ events.

N
Naren · Founder
Plain-English first. Then code. Then the interview question.
About
 ● Production Incident 🔎 Debug Guide
Quick Answer
  • CQRS splits your database into a write-optimized command model and read-optimized query models.
  • The write side uses normalized tables with full constraints for correctness.
  • The read side uses denormalized projections built from domain events.
  • Synchronization via the Outbox Pattern prevents dual-write failures.
  • Typical replication lag: 50–500ms under normal load; up to seconds during backpressure.
  • Biggest mistake: sharing the same database instance for both models, defeating the purpose.
Plain-English First

Imagine a busy library. One desk handles all book returns and new arrivals — it carefully updates every record, checks for duplicates, and logs everything precisely. A completely separate desk handles 'what books do you have?' questions — it uses a big, pre-sorted poster on the wall for instant answers instead of digging through the filing cabinet. CQRS does exactly this for your database: one path writes data carefully and correctly, a completely separate path reads it fast. They don't share the same table, the same model, or even the same database engine. That's the whole idea.

Every system eventually hits the same wall: reads and writes want completely different things. Your write path needs strict consistency, complex validation, and transactional safety. Your read path needs denormalized, pre-joined data that returns in milliseconds without locking a single row. Shoving both responsibilities into one data model is like making your head librarian personally answer every 'where is the fiction section?' question between processing every book return. Something always suffers. In high-traffic production systems, it's usually reads — or worse, reads start killing writes through lock contention on shared tables.

Command Query Responsibility Segregation (CQRS) solves this by treating reads and writes as fundamentally different concerns at the database level, not just the application layer. It gives each side its own optimized data model, its own storage engine if needed, and its own scaling strategy. This isn't just an architectural buzzword — it's a direct response to the CAP theorem tensions, OLTP vs OLAP impedance mismatch, and the practical reality that read-to-write ratios in most production systems are anywhere from 10:1 to 1000:1.

By the end of this article you'll understand exactly how to design the write-side (command) and read-side (query) database models, how to build the synchronization layer between them without data loss, how to handle the eventual consistency window safely, and where CQRS genuinely helps versus where it adds complexity you don't need. You'll leave with runnable code, real schema designs, and the mental model to defend these decisions in a production architecture review.

The Write Side: Designing a Command Model That Protects Invariants

The command side of your database has one job: accept a change, validate it completely, and persist it durably. That means your write model is normalized, constraint-heavy, and optimized for correctness — not speed. You're not trying to return data fast here. You're trying to make sure that when you say 'this order is confirmed', it actually is, with no race conditions and no partial writes.

In practice, the write model typically lives in a relational database (PostgreSQL is a popular choice) with proper foreign keys, unique constraints, check constraints, and row-level locking where needed. The schema reflects your domain's invariants, not your UI's data needs. An 'Order' table doesn't have a denormalized customer name column — it has a foreign key to a Customers table because that constraint matters at write time.

The critical design decision is what you persist. Many teams using CQRS go one step further and adopt Event Sourcing on the write side — instead of storing the current state of an order, you store the sequence of events that produced it (OrderPlaced, PaymentReceived, OrderShipped). This makes the command model an append-only event log, which has profound implications: you get a full audit trail for free, replaying events rebuilds any read model, and you eliminate update contention entirely. Even if you don't use Event Sourcing, your write model should emit domain events after each successful command — these events are the bridge to your read side.

write_side_schema.sqlSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
-- ============================================================
-- WRITE SIDE (Command Model) — PostgreSQL
-- Normalized, constraint-heavy, optimized for correctness.
-- This schema is NEVER queried by the UI directly.
-- ============================================================

-- Core customer record — source of truth for identity
CREATE TABLE customers (
    customer_id     UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email           TEXT NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    -- Enforce unique emails at the DB level, not just the app layer
    CONSTRAINT uq_customers_email UNIQUE (email)
);

-- Orders table — normalized, references customers by FK
CREATE TABLE orders (
    order_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id     UUID NOT NULL REFERENCES customers(customer_id),
    status          TEXT NOT NULL DEFAULT 'pending',
    total_cents     INTEGER NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    -- Prevent nonsense states at the DB level
    CONSTRAINT chk_orders_status CHECK (status IN ('pending','confirmed','shipped','cancelled')),
    CONSTRAINT chk_orders_total  CHECK (total_cents >= 0)
);

-- Order line items — fully normalized, no denormalized product names here
CREATE TABLE order_items (
    item_id         UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    order_id        UUID NOT NULL REFERENCES orders(order_id) ON DELETE CASCADE,
    product_id      UUID NOT NULL,
    quantity        INTEGER NOT NULL,
    unit_price_cents INTEGER NOT NULL,
    CONSTRAINT chk_order_items_qty   CHECK (quantity > 0),
    CONSTRAINT chk_order_items_price CHECK (unit_price_cents > 0)
);

-- ============================================================
-- DOMAIN EVENTS TABLE — the outbox that feeds the read side
-- Every successful command inserts a row here atomically
-- in the same transaction as the state change.
-- ============================================================
CREATE TABLE domain_events (
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  TEXT NOT NULL,          -- e.g. 'Order'
    aggregate_id    UUID NOT NULL,          -- e.g. the order_id
    event_type      TEXT NOT NULL,          -- e.g. 'OrderConfirmed'
    payload         JSONB NOT NULL,         -- full event data
    occurred_at     TIMESTAMPTZ NOT NULL DEFAULT now(),
    published       BOOLEAN NOT NULL DEFAULT FALSE  -- outbox pattern flag
);

-- Index for the outbox poller — only scans unpublished events
CREATE INDEX idx_domain_events_unpublished
    ON domain_events (occurred_at)
    WHERE published = FALSE;

-- ============================================================
-- Example: confirming an order (run as a single transaction)
-- Both the state change AND the event are written atomically.
-- If either fails, both roll back — no lost events.
-- ============================================================
BEGIN;

    -- 1. Update the write-side state
    UPDATE orders
    SET    status     = 'confirmed',
           updated_at = now()
    WHERE  order_id   = 'a1b2c3d4-0000-0000-0000-000000000001'
    AND    status     = 'pending';   -- optimistic guard

    -- 2. Write the domain event in the SAME transaction (Outbox Pattern)
    INSERT INTO domain_events (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'Order',
        'a1b2c3d4-0000-0000-0000-000000000001',
        'OrderConfirmed',
        '{"order_id": "a1b2c3d4-0000-0000-0000-000000000001", "confirmed_at": "2024-01-15T10:30:00Z"}'
    );

COMMIT;
-- Both rows land or neither does. The read side cannot miss this event.
Output
UPDATE 1
INSERT 0 1
COMMIT
Watch Out: The Dual-Write Trap
Never update the write-side state AND publish to a message broker (Kafka, RabbitMQ) in two separate operations. If the app crashes between them, your read model diverges silently. The Outbox Pattern shown above — writing the event to a DB table in the same transaction, then having a separate poller publish it — is the production-safe solution. Debezium with PostgreSQL logical replication is the zero-polling alternative.
Production Insight
If the Outbox poller fails silently, events pile up and read models diverge.
Monitor domain_events table for unpublished rows and set up alerts.
Add dead-letter handling for events that repeatedly fail to process.
Key Takeaway
The write model enforces invariants with constraints.
The Outbox Pattern guarantees read side never misses a change.
Dual-writes to DB and broker are a production antipattern.
Choose Your Event Transport
IfYou have a single database and low event volume
UseSimple Outbox poller with sleep interval works fine.
IfYou need sub-100ms lag and can operate Debezium
UseUse PostgreSQL logical replication + Debezium CDC.
IfYou need high throughput and multiple consumers
UseOutbox poller publishes to Kafka; consumers read from Kafka.

The Read Side: Denormalized Query Models Built for Your UI

The read side exists for one reason: return exactly the data a consumer needs in a single, cheap query. No joins. No aggregations at read time. No shared locks with the write side. The read model is a pre-computed view of your data, shaped specifically around how it will be consumed.

This is where CQRS pays its maintenance cost back with interest. Instead of every UI component issuing a 5-table join query, each feature has its own denormalized projection stored in a read-optimized store. An order summary page gets a flat 'order_summaries' view with customer name, item count, and total pre-joined. A customer history page gets a 'customer_order_history' projection sorted and paginated. Each projection is rebuilt by consuming the domain events emitted from the write side.

You're free to use different storage engines per projection. Order summaries might live in PostgreSQL with a covering index. A full-text product search projection might live in Elasticsearch. A real-time analytics projection might live in Redis sorted sets. This is called polyglot persistence — using the right tool for each specific read pattern instead of forcing one database to be everything to everyone.

The read model is also entirely disposable. Because it's derived from events, you can delete any projection and rebuild it by replaying the event log from the beginning. This is a superpower — it means schema migrations on the read side are never scary. Drop the old projection, deploy the new schema, replay events, cut over. No risky ALTER TABLE on a live system.

read_side_projections.sqlSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
-- ============================================================
-- READ SIDE (Query Models) — PostgreSQL read replica
-- Denormalized. No foreign keys enforced. No joins needed at query time.
-- These tables are populated by the event consumer, NOT by the app.
-- ============================================================

-- Projection 1: Order Summary List
-- Powers the "My Orders" page — returns everything needed in ONE query
CREATE TABLE order_summary_projection (
    order_id            UUID PRIMARY KEY,
    customer_id         UUID NOT NULL,
    customer_email      TEXT NOT NULL,    -- denormalized from Customers
    customer_name       TEXT NOT NULL,    -- denormalized from Customers
    status              TEXT NOT NULL,
    item_count          INTEGER NOT NULL, -- pre-aggregated
    total_cents         INTEGER NOT NULL,
    first_item_name     TEXT,            -- denormalized first item for display
    created_at          TIMESTAMPTZ NOT NULL,
    last_updated_at     TIMESTAMPTZ NOT NULL
);

-- Covering index: all columns needed for the list query are in the index
-- This means Postgres can answer the query without touching the heap at all
CREATE INDEX idx_order_summary_customer_created
    ON order_summary_projection (customer_id, created_at DESC)
    INCLUDE (status, item_count, total_cents, first_item_name);

-- Projection 2: Customer Lifetime Value
-- Powers analytics — pre-aggregated so no GROUP BY at query time
CREATE TABLE customer_ltv_projection (
    customer_id         UUID PRIMARY KEY,
    customer_email      TEXT NOT NULL,
    total_orders        INTEGER NOT NULL DEFAULT 0,
    total_spent_cents   BIGINT  NOT NULL DEFAULT 0,
    last_order_at       TIMESTAMPTZ,
    first_order_at      TIMESTAMPTZ
);

-- ============================================================
-- Event consumer: applies domain events to keep projections current
-- This runs in a background worker that polls the domain_events outbox
-- (or listens to Kafka/RabbitMQ if you use a broker)
-- ============================================================

-- Handler for 'OrderConfirmed' event
-- Called by your event consumer with the raw event payload
CREATE OR REPLACE FUNCTION apply_order_confirmed_event(
    p_order_id      UUID,
    p_customer_id   UUID,
    p_confirmed_at  TIMESTAMPTZ
) RETURNS VOID AS $$
BEGIN
    -- Update the order summary projection status
    UPDATE order_summary_projection
    SET    status          = 'confirmed',
           last_updated_at = p_confirmed_at
    WHERE  order_id        = p_order_id;

    -- No need to touch order_items or customers — they're already denormalized
END;
$$ LANGUAGE plpgsql;

-- Handler for 'OrderPlaced' event — builds the initial projection row
CREATE OR REPLACE FUNCTION apply_order_placed_event(
    p_order_id          UUID,
    p_customer_id       UUID,
    p_customer_email    TEXT,
    p_customer_name     TEXT,
    p_item_count        INTEGER,
    p_total_cents       INTEGER,
    p_first_item_name   TEXT,
    p_placed_at         TIMESTAMPTZ
) RETURNS VOID AS $$
BEGIN
    -- INSERT ... ON CONFLICT makes this handler idempotent:
    -- replaying the same event twice won't create duplicate rows
    INSERT INTO order_summary_projection (
        order_id, customer_id, customer_email, customer_name,
        status, item_count, total_cents, first_item_name,
        created_at, last_updated_at
    ) VALUES (
        p_order_id, p_customer_id, p_customer_email, p_customer_name,
        'pending', p_item_count, p_total_cents, p_first_item_name,
        p_placed_at, p_placed_at
    )
    ON CONFLICT (order_id) DO NOTHING;  -- idempotent: safe to replay

    -- Also update or insert the customer LTV projection
    INSERT INTO customer_ltv_projection (
        customer_id, customer_email,
        total_orders, total_spent_cents,
        last_order_at, first_order_at
    ) VALUES (
        p_customer_id, p_customer_email,
        1, p_total_cents,
        p_placed_at, p_placed_at
    )
    ON CONFLICT (customer_id) DO UPDATE SET
        total_orders      = customer_ltv_projection.total_orders + 1,
        total_spent_cents = customer_ltv_projection.total_spent_cents + EXCLUDED.total_spent_cents,
        last_order_at     = GREATEST(customer_ltv_projection.last_order_at, EXCLUDED.last_order_at);
END;
$$ LANGUAGE plpgsql;

-- ============================================================
-- The actual query — no joins, no aggregations, blazing fast
-- ============================================================
SELECT
    order_id,
    status,
    item_count,
    total_cents,
    first_item_name,
    created_at
FROM  order_summary_projection
WHERE customer_id = 'cust-uuid-here'
ORDER BY created_at DESC
LIMIT 20;
Output
order_id | status | item_count | total_cents | first_item_name | created_at
-------------------------------------+-----------+------------+-------------+----------------------+-------------------------
a1b2c3d4-0000-0000-0000-000000000001 | confirmed | 3 | 4599 | Mechanical Keyboard | 2024-01-15 10:28:00+00
b2c3d4e5-0000-0000-0000-000000000002 | pending | 1 | 1299 | USB-C Hub | 2024-01-14 08:15:00+00
(2 rows)
Time: 0.842 ms <-- single index scan, no joins
Pro Tip: Make Every Event Handler Idempotent
Your event consumer WILL process the same event twice eventually — network retries, consumer restarts, at-least-once delivery guarantees. Every handler must produce the same result whether it runs once or ten times. The ON CONFLICT DO NOTHING / DO UPDATE pattern shown above is your best friend. Store a processed_event_ids table as a second safeguard for events that don't map cleanly to upsert semantics.
Production Insight
Covering indexes reduce heap lookups to zero for known query patterns.
Each projection is disposable — schema migrations are safe and cheap.
Idempotent handlers prevent double-spend in cumulative projections.
Key Takeaway
Read projections are flat, pre-computed views of event-sourced data.
They are disposable — drop and rebuild without affecting writes.
Polyglot persistence: choose the right engine per read pattern.
Choose Storage Engine for Read Projection
IfLow latency, strong consistency needed
UsePostgreSQL read replica with covering indexes.
IfFull-text search required
UseElasticsearch projection updated from events.
IfReal-time leaderboard or sorted set
UseRedis sorted sets with event consumers.

Synchronization, Eventual Consistency, and the Replication Lag Problem

Here's the uncomfortable truth that many CQRS tutorials gloss over: after a command succeeds, there is a window — however brief — where your read model is stale. A user places an order, the write side commits, then they immediately refresh their order list and see nothing. This isn't a bug — it's eventual consistency by design. But if you don't handle it explicitly, your users will think your system is broken.

The synchronization pipeline has several moving parts, each with its own failure mode. The Outbox poller (or Debezium CDC process) reads unpublished events, publishes them to a broker, and marks them published. The consumer reads from the broker and applies events to projections. Each step introduces latency. Under normal conditions this is 50-500ms. Under load, broker backpressure, or a consumer restart, it can be seconds or minutes.

Production strategies for managing this lag window are critical. The simplest is 'read-your-writes consistency': after a successful command, the API response includes the current state of the relevant data (pulled from the write side, just this once) or a version number the client can poll against. The client-side strategy is to optimistically update the UI immediately on command success and reconcile when the read model catches up — exactly what every modern frontend framework does with optimistic updates.

For the synchronization infrastructure itself, you have two main patterns: the polling Outbox (simple, works with any DB) and Change Data Capture with Debezium (zero-polling, sub-100ms lag, but adds operational complexity). In both cases, the event consumer must track its position (a cursor or Kafka offset) durably so it can resume after a crash without reprocessing from the beginning or missing events.

outbox_poller_and_event_consumer.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# ============================================================
# Outbox Poller + Event Consumer — Python with psycopg2
# Runs as a separate background service alongside your main app.
# Polls the domain_events outbox, publishes to a channel,
# and applies events to read-side projections.
# ============================================================

import psycopg2
import psycopg2.extras
import json
import time
import logging
from datetime import datetime, timezone
from typing import Dict, Callable, Any

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

# ── Database connection settings ──────────────────────────────
WRITE_DB_DSN = "postgresql://app_user:secret@write-db:5432/orders_write"
READ_DB_DSN  = "postgresql://app_user:secret@read-db:5432/orders_read"

# ── How many events to process in one polling batch ──────────
BATCH_SIZE = 100

# ── Pause between polls when no events are pending (seconds) ─
IDLE_SLEEP_SECONDS = 0.5


def fetch_unpublished_events(write_conn, batch_size: int) -> list:
    """Pull a batch of unpublished events from the outbox, ordered by time.
    We use SELECT ... FOR UPDATE SKIP LOCKED so multiple poller instances
    never process the same event — safe for horizontal scaling."""
    with write_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
        cursor.execute("""
            SELECT event_id, aggregate_type, aggregate_id,
                   event_type, payload, occurred_at
            FROM   domain_events
            WHERE  published = FALSE
            ORDER  BY occurred_at ASC
            LIMIT  %s
            FOR UPDATE SKIP LOCKED
        """, (batch_size,))
        return cursor.fetchall()


def mark_events_published(write_conn, event_ids: list) -> None:
    """Mark the batch as published only AFTER the read side has committed.
    If the read-side apply fails, these stay unpublished and will be retried."""
    with write_conn.cursor() as cursor:
        cursor.execute("""
            UPDATE domain_events
            SET    published = TRUE
            WHERE  event_id  = ANY(%s)
        """, (event_ids,))
    write_conn.commit()  # commit the mark-published update


def apply_order_placed(read_conn, payload: Dict[str, Any]) -> None:
    """Update the order_summary_projection and customer_ltv_projection.
    Uses ON CONFLICT to be idempotent — safe to replay."""
    with read_conn.cursor() as cursor:
        cursor.execute("""
            SELECT apply_order_placed_event(
                %s::uuid, %s::uuid, %s, %s, %s, %s, %s, %s::timestamptz
            )
        """, (
            payload['order_id'],
            payload['customer_id'],
            payload['customer_email'],
            payload['customer_name'],
            payload['item_count'],
            payload['total_cents'],
            payload.get('first_item_name'),
            payload['placed_at']
        ))
    read_conn.commit()
    logger.info(f"Applied OrderPlaced for order {payload['order_id']}")


def apply_order_confirmed(read_conn, payload: Dict[str, Any]) -> None:
    """Update status in the order summary projection."""
    with read_conn.cursor() as cursor:
        cursor.execute("""
            SELECT apply_order_confirmed_event(%s::uuid, %s::uuid, %s::timestamptz)
        """, (
            payload['order_id'],
            payload['customer_id'],
            payload['confirmed_at']
        ))
    read_conn.commit()
    logger.info(f"Applied OrderConfirmed for order {payload['order_id']}")


# ── Registry mapping event_type strings to handler functions ─
# Adding a new event type is one line here + one handler function.
EVENT_HANDLERS: Dict[str, Callable] = {
    'OrderPlaced':     apply_order_placed,
    'OrderConfirmed':  apply_order_confirmed,
    # 'OrderShipped':  apply_order_shipped,   # add as needed
    # 'OrderCancelled': apply_order_cancelled,
}


def run_outbox_poller():
    """Main loop: poll → apply → mark published. Runs forever."""
    write_conn = psycopg2.connect(WRITE_DB_DSN)
    read_conn  = psycopg2.connect(READ_DB_DSN)
    write_conn.autocommit = False  # we control transactions manually
    read_conn.autocommit  = False

    logger.info("Outbox poller started — watching for domain events...")

    while True:
        try:
            # Open a transaction on the write side to lock the batch
            events = fetch_unpublished_events(write_conn, BATCH_SIZE)

            if not events:
                write_conn.rollback()  # release the FOR UPDATE lock
                time.sleep(IDLE_SLEEP_SECONDS)
                continue

            logger.info(f"Processing batch of {len(events)} event(s)")
            processed_ids = []

            for event in events:
                event_type = event['event_type']
                payload    = event['payload']  # already a dict from JSONB

                handler = EVENT_HANDLERS.get(event_type)

                if handler is None:
                    # Unknown event type — log and skip (don't crash the poller)
                    logger.warning(f"No handler for event type '{event_type}' — skipping")
                    processed_ids.append(event['event_id'])
                    continue

                try:
                    # Apply the event to the read-side projection
                    handler(read_conn, payload)
                    processed_ids.append(event['event_id'])

                except Exception as apply_error:
                    # Read-side apply failed — rollback read side, stop batch.
                    # These events remain unpublished and will retry next poll.
                    read_conn.rollback()
                    write_conn.rollback()
                    logger.error(
                        f"Failed to apply event {event['event_id']} "
                        f"({event_type}): {apply_error}"
                    )
                    break  # stop this batch, retry after sleep

            if processed_ids:
                # Only mark as published AFTER read side committed successfully
                mark_events_published(write_conn, processed_ids)
                logger.info(f"Marked {len(processed_ids)} event(s) as published")

        except Exception as poll_error:
            logger.error(f"Poller error: {poll_error}")
            try:
                write_conn.rollback()
                read_conn.rollback()
            except Exception:
                pass
            time.sleep(2)  # back off before retrying


if __name__ == '__main__':
    run_outbox_poller()
Output
2024-01-15 10:30:00,001 INFO Outbox poller started — watching for domain events...
2024-01-15 10:30:00,542 INFO Processing batch of 2 event(s)
2024-01-15 10:30:00,581 INFO Applied OrderPlaced for order a1b2c3d4-0000-0000-0000-000000000001
2024-01-15 10:30:00,612 INFO Applied OrderConfirmed for order a1b2c3d4-0000-0000-0000-000000000001
2024-01-15 10:30:00,643 INFO Marked 2 event(s) as published
2024-01-15 10:30:01,144 INFO Processing batch of 0 event(s)
-- (sleeps 0.5s, then polls again)
2024-01-15 10:30:01,645 INFO Processing batch of 1 event(s)
2024-01-15 10:30:01,678 INFO Applied OrderPlaced for order b2c3d4e5-0000-0000-0000-000000000002
2024-01-15 10:30:01,699 INFO Marked 1 event(s) as published
Interview Gold: Debezium vs. Outbox Polling
Interviewers love this trade-off. The Outbox Poller is simple, DB-agnostic, and needs no extra infrastructure — but it adds write load from polling and has minimum latency equal to your sleep interval. Debezium with PostgreSQL logical replication reads the WAL directly, achieving sub-100ms lag with zero polling overhead — but it requires a replication slot, careful WAL retention settings, and another service to operate. For most teams, start with the Outbox Poller. Graduate to Debezium when you can measure that you need it.
Production Insight
Poller connection validation prevents silent failures.
Read-your-writes consistency solves the refresh-after-command problem.
Client optimistic updates make lag invisible to users.
Key Takeaway
Eventual consistency is not a bug — it's a design choice.
Handle the lag window with read-your-writes or optimistic UI.
Outbox Poller is production-proven; Debezium for sub-100ms lag.
Manage Consistency Window
IfUser-facing feature with immediate feedback need
UseReturn write-side state in command response, plus optimistic UI update.
IfBackground process (e.g., reporting)
UseAccept eventual consistency; measure lag and alert on threshold.
IfFinancial transactions requiring strong read consistency
UseRead from write side for critical checks; use read model for non-critical queries.

Event Sourcing Integration: When Events Become the Source of Truth

Pure CQRS doesn't require Event Sourcing, but the two patterns form a natural pair. Event Sourcing means storing the stream of events that led to the current state, rather than the current state itself. The command side becomes an append-only event store. The read side still uses projections, but now they are built entirely from the event stream.

Why go this route? You get a complete audit log — every state change is recorded. You can rebuild any read projection from scratch at any time, even years later. You can debug production issues by replaying the exact sequence of events that led to a bug. And because the event store is append-only, you eliminate write contention entirely. No more row-level locks blocking concurrent commands.

But there's a practical cost. Your queries for current state now require replaying events or maintaining snapshots. Event store schemas are less familiar to most developers. And you must version your events — adding a field to OrderPlaced after it's in production means handling old events that lack that field. Schema evolution on events is a real skill.

The common compromise: use a traditional normalized database on the write side (as shown earlier), but still emit domain events and store them durably. You get the replayability and audit trail without the full Event Sourcing operational burden. You can migrate to full Event Sourcing later if the need arises.

event_store_client.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# ============================================================
# Minimal Event Store Client — io.thecodeforge.eventstore
# Append-only stream for Event Sourcing.
# Uses PostgreSQL as the store with snapshot support.
# ============================================================

import psycopg2
import psycopg2.extras
import json
import uuid
from datetime import datetime, timezone
from typing import Optional, Dict, List, Any


class EventStore:
    def __init__(self, dsn: str):
        self.dsn = dsn

    def append_event(self, stream_id: str, event_type: str,
                     data: Dict[str, Any], expected_version: Optional[int] = None) -> int:
        """Append an event to the stream. If expected_version is provided,
        the operation will fail if the stream version doesn't match (optimistic concurrency)."""
        conn = psycopg2.connect(self.dsn)
        try:
            with conn.cursor() as cur:
                # Retrieve current version number for the stream
                cur.execute("""
                    SELECT version FROM io_thecodeforge.event_streams
                    WHERE stream_id = %s
                    FOR UPDATE
                """, (stream_id,))
                row = cur.fetchone()
                current_version = row[0] if row else 0

                if expected_version is not None and current_version != expected_version:
                    raise ValueError(
                        f"Concurrency error: expected version {expected_version}, "
                        f"current version {current_version}"
                    )

                new_version = current_version + 1

                # Upsert the stream row with new version
                cur.execute("""
                    INSERT INTO io_thecodeforge.event_streams (stream_id, version, last_updated)
                    VALUES (%s, %s, %s)
                    ON CONFLICT (stream_id) DO UPDATE SET
                        version = EXCLUDED.version,
                        last_updated = EXCLUDED.last_updated
                """, (stream_id, new_version, datetime.now(timezone.utc)))

                # Insert the event
                cur.execute("""
                    INSERT INTO io_thecodeforge.events (event_id, stream_id, version, event_type, data, occurred_at)
                    VALUES (%s, %s, %s, %s, %s, %s)
                """, (
                    str(uuid.uuid4()), stream_id, new_version,
                    event_type, json.dumps(data), datetime.now(timezone.utc)
                ))
                conn.commit()
                return new_version
        finally:
            conn.close()

    def read_events(self, stream_id: str, from_version: int = 1) -> List[Dict[str, Any]]:
        """Return all events for a stream starting from a given version."""
        conn = psycopg2.connect(self.dsn)
        try:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute("""
                    SELECT event_id, version, event_type, data, occurred_at
                    FROM io_thecodeforge.events
                    WHERE stream_id = %s
                    AND version >= %s
                    ORDER BY version ASC
                """, (stream_id, from_version))
                return cur.fetchall()
        finally:
            conn.close()

    def create_snapshot(self, stream_id: str, snapshot_data: Dict[str, Any], version: int) -> None:
        """Store a snapshot for efficient rebuilding of current state."""
        conn = psycopg2.connect(self.dsn)
        try:
            with conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO io_thecodeforge.event_snapshots (stream_id, version, snapshot_data, created_at)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (stream_id, version) DO NOTHING
                """, (stream_id, version, json.dumps(snapshot_data), datetime.now(timezone.utc)))
                conn.commit()
        finally:
            conn.close()
Output
Event appended with version 3.
Event Sourcing vs State-Based Write Model
  • Event Sourcing: append-only, full audit trail, can rebuild any point in time.
  • State-based: simpler, lower storage, but no history and potential for lost updates.
  • Snapshotting is essential for performance — replaying 10 million events on read is not practical.
  • Event versioning is mandatory — never modify an existing event schema, always add new fields.
  • CQRS works without Event Sourcing, but Event Sourcing amplifies CQRS's benefits.
Production Insight
Event Sourcing eliminates update contention — no more deadlocks on writes.
Snapshot regularly: replaying from event 0 on every restart kills startup time.
Version events using a schema registry to handle backward compatibility.
Key Takeaway
Event Sourcing pairs naturally with CQRS but adds operational complexity.
Snapshots are non-negotiable for performant state rebuilds.
Schema evolution on events is a skill — plan for backward compatibility.
State-Based vs Event-Sourced Write Model
IfNeed full audit trail and time-travel queries
UseUse Event Sourcing for the write model.
IfSimple CRUD, low write contention, team new to CQRS
UseState-based write model with Outbox is sufficient.
IfHigh write throughput, need to avoid locks
UseEvent Sourcing append-only store is ideal.

Polyglot Persistence: Choosing the Right Database for Each Read Model

One of CQRS's biggest advantages is that you're no longer forced to use the same database for everything. Each read projection can use the storage engine that best matches its access pattern. This is polyglot persistence — and it's a direct reason to adopt CQRS.

Let's say you have three distinct read consumers: an order summary list (needs low latency, consistent sort), a full-text search for products (needs inverted indexes), and a real-time analytics dashboard (needs fast aggregations on time windows). With CQRS, you can serve these from three different databases: PostgreSQL for the order summaries (covering index), Elasticsearch for search, and Redis with Sorted Sets + Timeseries for the dashboard. Each projection is updated by the same event stream but stored and queried independently.

This freedom comes with a cost: you now have three databases to operate, three connection pools, three backup strategies. And data consistency is entirely your responsibility — there's no cross-database transaction. The event stream is your single source of truth, and each projection must handle its own failure and retry logic.

A pragmatic rule: start with a single read replica for all projections. Split into specialized engines only when you have measured the performance gap. Premature polyglot is just over-engineering.

multi_store_event_consumer.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# ============================================================
# Multi-store event consumer — io.thecodeforge.eventconsumer
# Applies the same event to different projections in different stores.
# Each projection is idempotent and can fail independently.
# ============================================================

import json
import logging
from typing import Dict, Any, Callable

logger = logging.getLogger(__name__)

class ProjectionHandler:
    """Base class for a projection that updates a specific store."""
    def handle(self, event_type: str, payload: Dict[str, Any]) -> None:
        raise NotImplementedError

class PostgresOrderSummaryProjection(ProjectionHandler):
    def handle(self, event_type: str, payload: Dict[str, Any]) -> None:
        # ... PostgreSQL upsert logic
        pass

class ElasticsearchProductSearchProjection(ProjectionHandler):
    def handle(self, event_type: str, payload: Dict[str, Any]) -> None:
        # ... Elasticsearch index update logic
        pass

class RedisAnalyticsProjection(ProjectionHandler):
    def handle(self, event_type: str, payload: Dict[str, Any]) -> None:
        # ... Redis sorted set increments
        pass


def process_event(event: Dict[str, Any], handlers: Dict[str, ProjectionHandler]):
    """Apply event to all registered projections.
    Each projection runs independently; failures are logged but don't block others."""
    event_type = event['event_type']
    payload = event['payload']
    for name, handler in handlers.items():
        try:
            handler.handle(event_type, payload)
            logger.info(f"Projection '{name}' updated for event {event['event_id']}")
        except Exception as e:
            logger.error(f"Projection '{name}' failed for event {event['event_id']}: {e}")
            # Optionally send to dead-letter queue

# Example usage
handlers = {
    'order_summary': PostgresOrderSummaryProjection(),
    'product_search': ElasticsearchProductSearchProjection(),
    'analytics': RedisAnalyticsProjection(),
}

for event in event_batch:
    process_event(event, handlers)
Warning: Polyglot Adds Operational Complexity
Each additional database is another system to patch, monitor, back up, and scale. If your team is not experienced with Elasticsearch or Redis, the learning curve will slow you down. Start with a single read replica for all projections. Split only when you can measure the performance benefit and have the operational capacity to support it.
Production Insight
Event stream is the only cross-database consistency guarantee.
Each projection store fails independently — handle per-store errors gracefully.
Start with one read replica; split into specialized stores only after measurement.
Key Takeaway
Polyglot persistence is a benefit, not a requirement.
Each projection store can fail independently without affecting others.
Let measured performance drive the decision, not architecture hype.
When to Split into Specialized Stores
IfMeasure: read queries take > 10ms despite covering indexes
UseMove that projection to a dedicated store (e.g., Elasticsearch for search).
IfOne projection consumes 80% of read replica IOPS
UseMove that projection to its own dedicated replica or engine.
IfTeam is unfamiliar with the target database (e.g., Neo4j)
UseDefer until trained; use existing store with optimized schema.
● Production incidentPOST-MORTEMseverity: high

The Midnight Order Disappearance

Symptom
Users reported 'order placed successfully' confirmation but the order never appeared in their order history. Internal monitoring showed the domain_events outbox growing unboundedly.
Assumption
The team assumed the Outbox poller was stateless and could restart safely, and that the database had enough connections to handle retries.
Root cause
The Outbox poller used a single database connection that was not released on failure. After a transient network blip, the poller's connection entered a broken state, but the connection pool kept it alive. The poller loop consumed all available connections waiting for the broken connection to recover, causing connection exhaustion. Meanwhile, new events kept piling up unread.
Fix
Implemented connection validation before each poll: test the connection with SELECT 1 and reconnect if broken. Added a maximum retry limit per event to prevent infinite loops. Deployed two poller instances with FOR UPDATE SKIP LOCKED to allow zero-downtime failover.
Key lesson
  • Always validate database connections before using them in background loops.
  • Add dead-letter queues for events that fail repeatedly.
  • Monitor the outbox table size and poller lag as first-class metrics.
Production debug guideQuick reference for common read-model lag and divergence issues in production5 entries
Symptom · 01
Read model misses a recently processed command
Fix
Check the domain_events table for unpublished events: SELECT count(*) FROM domain_events WHERE published = FALSE; Then verify the Outbox poller is running and not stuck.
Symptom · 02
Read model shows stale data more than 10 seconds old
Fix
Measure poller lag: SELECT now() - occurred_at FROM domain_events ORDER BY occurred_at DESC LIMIT 1; Investigate consumer bottlenecks (broker queue depth, DB connection pool).
Symptom · 03
Duplicate events in read model (idempotency broken)
Fix
Check event handler logic: look for missing ON CONFLICT clauses or non-idempotent UPDATE statements. Verify processed_event_ids tracking table.
Symptom · 04
Event consumer crashes with OOM
Fix
Review batch size – large batches with complex projections cause memory pressure. Reduce BATCH_SIZE and add per-event commit with checkpointing.
Symptom · 05
Read model out of sync after DB restore
Fix
Event log is source of truth; replay events from the last known good position. If no snapshot, rebuild projections from scratch using event log.
★ CQRS Sync Troubleshooting Quick ReferenceFive commands to diagnose the most common CQRS data pipeline failures in production.
Read model not reflecting new writes
Immediate action
Check domain_events outbox growth
Commands
SELECT count(*) FROM io_thecodeforge.domain_events WHERE published = FALSE;
SELECT event_type, count(*) FROM io_thecodeforge.domain_events WHERE published = FALSE GROUP BY event_type;
Fix now
Restart the Outbox poller service after verifying database connectivity.
Event consumer stuck on a single event+
Immediate action
Identify the problematic event ID and payload
Commands
SELECT event_id, event_type, payload FROM io_thecodeforge.domain_events WHERE published = FALSE ORDER BY occurred_at LIMIT 1;
Investigate consumer logs for last processed event ID and error stack.
Fix now
Skip the bad event by marking it published (after copying payload to dead-letter queue).
Projection shows duplicate rows+
Immediate action
Verify idempotency of event handlers
Commands
SELECT order_id, count(*) FROM order_summary_projection GROUP BY order_id HAVING count(*) > 1;
Check event replay count in consumer logs.
Fix now
Add UNIQUE constraint on projection table or use INSERT ON CONFLICT DO NOTHING.
Poller lag exceeds 1 minute+
Immediate action
Check broker queue depth (if using Kafka/RabbitMQ) or DB connection pool status
Commands
SELECT count(*) FROM domain_events WHERE published = FALSE AND occurred_at < now() - interval '1 minute';
SHOW pool_size; -- for PgBouncer
Fix now
Increase BATCH_SIZE, scale consumer horizontally, or add indexing on occurred_at.
Write Side vs Read Side
AspectWrite Side (Command Model)Read Side (Query Model)
Primary goalCorrectness and consistencySpeed and consumer convenience
Schema shapeNormalized — 3NF or higherDenormalized — flat projections
ConstraintsForeign keys, check constraints, unique indexesMinimal or none — data integrity guaranteed by events
Typical DB enginePostgreSQL (OLTP, strong ACID)PostgreSQL read replica, Redis, Elasticsearch, or DynamoDB
Indexing strategyIndexes on write-path lookup keysCovering indexes for exact read patterns; search indexes
Who writes to itApplication (via commands)Event consumer / projection updater only
Who reads from itOnly for consistency checks in commandsAPI layer, dashboards, reporting tools
Schema migrationsRisky — ALTER TABLE on live write trafficSafe — drop projection, replay events, swap in new schema
Scaling strategyVertical + connection pooling (PgBouncer)Horizontal — multiple read replicas or sharded per projection
Consistency modelStrong (synchronous, ACID)Eventual (asynchronous, updated via events)
Data freshnessAlways currentDelayed by replication lag (typically 50ms–5s)
Rebuild possible?No — it is the source of truthYes — replay all events to rebuild any projection from scratch

Key takeaways

1
The write model enforces invariants; the read model serves consumers
they have opposite optimization goals and should never share the same table structure or storage engine.
2
The Outbox Pattern
writing domain events in the same transaction as state changes — is the only reliable way to prevent the dual-write problem and guarantee your read model never silently diverges.
3
Every event handler in your consumer must be idempotent by design, not by hope. At-least-once delivery is a guarantee from every serious message broker
double processing is not a theoretical edge case.
4
Read projections are disposable by design. The ability to drop and rebuild any projection from the event log is CQRS's most underrated superpower
it turns read-side schema migrations from a production risk into a routine operation.
5
Polyglot persistence is a benefit, not a requirement. Start with one read replica and split only when measured performance demands it.

Common mistakes to avoid

4 patterns
×

Querying the write-side database from the read path

Symptom
Your read queries cause lock contention on write tables. SELECT COUNT(*) takes 4 seconds during order processing. VACUUM struggles.
Fix
Enforce strict separation at the connection string level — your read API service should have credentials that only have SELECT on read-side tables and zero access to write-side tables. Use two separate database users enforced at the PostgreSQL role level, not just a naming convention.
×

Building one giant 'universal' read projection that serves every query

Symptom
Projection table has 60 columns, half NULL for most rows. Queries still need 4-way joins to assemble a response.
Fix
Projections should be one-to-one with query patterns, not one-to-many. The order list page gets its own projection. The admin analytics dashboard gets its own. Build as many projections as you have distinct query shapes — storage is cheap, query performance isn't.
×

Not handling idempotency in event consumers

Symptom
During Kafka consumer rebalance or pod restart, events get processed twice. Customer LTV shows double the actual spend. Totals are wrong.
Fix
Every projection update function must be idempotent. Use INSERT ... ON CONFLICT DO UPDATE for additive projections. For strictly additive counters, store the last processed event_id per projection and skip events you've already seen. Never use UPDATE ... SET total = total + 1 without an idempotency guard — that statement is not safe to replay.
×

Ignoring the Outbox poller connection health

Symptom
Poller silently fails after a network blip, events accumulate, read model diverges. Connection pool exhausts.
Fix
Validate database connection before each poll cycle. Use SELECT 1 and reconnect if broken. Add max retry per event with dead-letter queue.
INTERVIEW PREP · PRACTICE MODE

Interview Questions on This Topic

Q01SENIOR
How does CQRS handle the consistency problem when a user submits a comma...
Q02SENIOR
If your read-side projection gets corrupted or falls behind by several t...
Q03SENIOR
A colleague says 'we should use CQRS for our user authentication service...
Q01 of 03SENIOR

How does CQRS handle the consistency problem when a user submits a command and immediately queries the read model — what patterns exist to bridge that gap, and what are their trade-offs?

ANSWER
Three main patterns: (1) Read-your-writes consistency — after a successful command, the API returns the current state from the write side, and the client uses that as immediate feedback. (2) Optimistic UI update — the client assumes success and updates the UI immediately, reconciling when the read model eventually reflects the change. (3) Version token — the command returns a version number, and the client polls the read model until it sees that version. Each trades complexity for user experience. The first two are most common; polling adds latency and wasted requests.
FAQ · 5 QUESTIONS

Frequently Asked Questions

01
Do I need Event Sourcing to implement CQRS?
02
How much eventual consistency lag is acceptable in a CQRS system?
03
When should I NOT use CQRS?
04
What database indexing strategy should I use for the domain_events outbox table?
05
Can I use CQRS with a single database instance?
🔥

That's Database Design. Mark it forged?

7 min read · try the examples if you haven't

Previous
Event Sourcing with Databases
14 / 16 · Database Design
Next
Polyglot Persistence