Senior 11 min · March 17, 2026

Database Sharding — Why Sequential IDs Cause Hot Shards

One shard hit 95% CPU while others idled at 15% during a flash sale.

N
Naren Founder & Principal Engineer

20+ years shipping high-throughput database systems. Drawn from code that ran under real load.

Follow
Production
production tested
May 24, 2026
last updated
1,554
articles · all by Naren
 ● Production Incident 🔎 Debug Guide ⚙ Triage Commands
Quick Answer
  • Sharding splits data across multiple independent database instances (shards) to scale beyond a single server.
  • A shard key determines which shard stores each row; queries must include it to avoid broadcasting.
  • Hash sharding distributes writes evenly; range sharding supports efficient range queries but risks hot shards.
  • Cross-shard queries are slow and distributed transactions are hard — design the shard key to keep related data together.
  • Most apps should not shard — optimize indexes, add replicas, and scale vertically first.
✦ Definition~90s read
What is Database Sharding?

Database sharding is a horizontal partitioning pattern that physically splits a single logical database across multiple independent database instances, each called a shard. Unlike replication (which copies data) or vertical scaling (which makes a single server bigger), sharding distributes different rows of the same table across separate machines.

Imagine a filing cabinet (one database) that gets too full.

This is the only scalability pattern that actually reduces the load per server by dividing both data volume and write throughput. Companies like Instagram, Uber, and Discord have all adopted sharding when their databases exceeded single-node capacity — typically in the hundreds of gigabytes to low terabytes range, or when write throughput surpasses what a single master can handle (e.g., 10,000+ writes/second on a typical PostgreSQL instance).

Sharding works by applying a sharding function to a chosen shard key — usually a column like user_id or order_id — to determine which shard stores each row. The two dominant strategies are hash sharding and range sharding. Hash sharding applies a consistent hash (e.g., hash(user_id) % N) to distribute rows uniformly across shards, but makes range queries expensive.

Range sharding splits data by key ranges (e.g., user_id 1-1M on shard A, 1M-2M on shard B), which supports efficient range scans but creates hot shards when sequential IDs cluster writes on a single shard. This is why sequential IDs (auto-increment integers, timestamps, or monotonically increasing UUIDs) are dangerous with range sharding: all new writes hit the last shard, turning it into a bottleneck while other shards sit idle.

Choosing the right shard key is the hardest part of sharding — it directly determines whether your system scales or collapses under skewed load. A good shard key has high cardinality (many distinct values), uniform access patterns (no single value dominates writes), and supports your query patterns (ideally making cross-shard queries rare).

Common mistakes include sharding on created_at (hot shard on current time), country (few values, uneven distribution), or user_id without considering that power users generate disproportionate load. The trade-off matrix between hash and range sharding typically favors hash for write-heavy workloads and range for read-heavy workloads with predictable access patterns.

When you get it wrong, you'll see shard imbalance — one shard at 90% CPU while others idle at 10% — requiring costly rebalancing operations that can take days on large datasets.

Plain-English First

Imagine a filing cabinet (one database) that gets too full. Sharding is like buying multiple cabinets and splitting your files across them. You need a rule (the shard key) to decide which file goes where. Looking up a file is fast if you know the rule. But if you need to search across all cabinets — like finding all files from a certain date — it's slow because you have to open every drawer.

Most applications never need to shard. A properly indexed PostgreSQL or MySQL instance can handle tens of millions of rows and thousands of queries per second. But when you hit the limits of vertical scaling — larger instances give diminishing returns — sharding becomes the path forward.

The tradeoff is real: sharding gives you horizontal scale but introduces significant operational complexity. Cross-shard queries are slow, transactions spanning shards are hard, and resharding (changing the number of shards) requires careful migration. Get the shard key wrong, and you'll trade one bottleneck for another.

Database Sharding — The Only Scalability Pattern That Actually Splits Data

Database sharding is a horizontal partitioning strategy where you split a single logical dataset across multiple independent database instances (shards). Each shard holds a subset of the data, determined by a shard key — a column or hash of a column that maps every row to exactly one shard. The core mechanic is that no single database node holds the full dataset; instead, the application or a proxy routes each query to the correct shard based on the shard key.

In practice, sharding trades consistency and query flexibility for write throughput and storage capacity. A well-chosen shard key distributes writes evenly across shards — ideally O(1) routing per operation. But if the shard key is poorly chosen (e.g., a monotonically increasing sequential ID), writes concentrate on a single shard, creating a "hot shard" that becomes a bottleneck. This defeats the purpose of sharding: instead of scaling horizontally, you end up with one overloaded node and many idle ones.

Use sharding when your dataset exceeds a single node's storage capacity (e.g., >1 TB) or when write throughput surpasses a single node's IOPS (e.g., >10k writes/second). It's not a default choice — it adds operational complexity (backups, resharding, cross-shard joins). But for systems like social feeds, IoT event ingestion, or multi-tenant SaaS, sharding is the difference between a system that scales and one that falls over at 2x growth.

Hot Shards Are Silent Killers
A monotonically increasing shard key (like auto-increment IDs or timestamps) guarantees that all new writes hit the same shard — you don't have a distributed system, you have a single-node bottleneck with followers.
Production Insight
A social media company sharded by user ID (sequential) — writes for new users all hit shard 0, causing 50ms write latency spikes and connection pool exhaustion within 3 hours of launch.
Symptom: one shard's CPU at 95% while others sit at 10%; write timeouts on the hot shard cascade into retries that amplify load.
Rule of thumb: if your shard key is monotonic, hash it (e.g., user_id % N) or use a distributed ID generator (Snowflake, UUID v7) to spread writes uniformly.
Key Takeaway
Sharding is about distributing writes, not just storage — a bad shard key creates a single point of failure.
Sequential IDs are the #1 cause of hot shards in production — always hash or use a non-monotonic key.
Sharding adds operational cost; only adopt it when a single node can't handle your write throughput or dataset size.
Database Sharding: Sequential ID Hot Shards THECODEFORGE.IO Database Sharding: Sequential ID Hot Shards Why sequential keys cause uneven data distribution in sharded databases Sequential ID Generator Auto-increment or timestamp-based IDs Range Sharding IDs map to shards by contiguous ranges Hot Shard All new writes hit the last shard Hash Sharding Distributes writes evenly across shards ⚠ Sequential IDs + range sharding = hot shard Use hash sharding or distributed ID generators (e.g., Snowflake) THECODEFORGE.IO
thecodeforge.io
Database Sharding: Sequential ID Hot Shards
Database Sharding

How Sharding Works

The shard key is the column used to route data to a specific shard. Every query must include the shard key to avoid broadcasting to all shards. The application or a proxy layer uses a routing function (e.g., hash modulo) to determine the target shard. Each shard is an independent database instance with its own compute and storage.

In production, the shard key is usually the primary dimension of access — something you filter on in every query. User ID, tenant ID, or region ID are common choices. The key must be present in all queries; otherwise the system either fails or performs a full scan across all shards.

io/thecodeforge/sharding/router.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Package: io.thecodeforge.sharding.router

import hashlib

class ShardRouter:
    def __init__(self, shard_dsn_list: list[str]):
        self.shards = shard_dsn_list
        self.n = len(shards)

    def _hash(self, key: str) -> int:
        # Use consistent hashing in production, not plain modulo
        return int(hashlib.md5(key.encode()).hexdigest(), 16) % self.n

    def get_shard_for_key(self, key: str) -> str:
        return self.shards[self._hash(key)]

    def execute_query(self, key: str, sql: str, params: dict):
        if "WHERE" not in sql:
            raise ValueError("Query must include shard key filter")
        shard_id = self.get_shard_for_key(key)
        # Execute on the single shard
        return self._query_shard(shard_id, sql, params)

    def execute_cross_shard(self, sql: str, params: dict):
        # Broadcast to all shards and merge
        results = []
        for shard in self.shards:
            results.extend(self._query_shard(shard, sql, params))
        return results
Think of Sharding Like a Dictionary
  • A dictionary is effectively sharded by the first letter: 'apple' belongs to 'A', 'banana' to 'B'.
  • Queries that don't specify a letter (e.g., 'find all fruits with 5 letters') require scanning every page — that's a cross-shard query.
  • Hot shards happen when one letter (like 'S') contains vastly more words than others.
  • Hash sharding is like assigning words to pages randomly — even distribution but you lose the ability to easily browse consecutive words.
Production Insight
If you add a second shard without changing the routing function, half your reads will miss and you'll return stale or duplicated data.
The fix is to use a versioned routing table so that reads are always consistent during migration.
Rule: Never change the number of shards without a versioned routing strategy.
Key Takeaway
The shard key is the most important decision in sharding.
Every query must include it.
Pick it based on your primary access pattern, not on data distribution.
Choosing a Shard Key
IfAll queries include a user_id or tenant_id filter
UseUse that ID as the shard key. Hash it for even distribution.
IfYou need fast range queries on a date column
UseUse range sharding on date, but expect hot shards at boundaries (e.g., month end).
IfNo single query filter covers 95% of traffic
UseConsider a composite shard key or a secondary index layer (e.g., Elasticsearch for cross-shard searches).
IfYou expect to add shards in the future
UseUse consistent hashing — avoid modulo-based routing unless you plan to migrate all data on each change.

Scalability Decision Tree: Sharding vs Vertical Scaling

Before deciding to shard, consider whether vertical scaling (upgrading the database server) or read replicas can solve your problem. Sharding adds operational complexity that is rarely justified for data sets under 10TB or write throughput under 10k writes/second. The following decision tree helps evaluate your options.

When to use vertical scaling: If your database fits on a single server and you can increase CPU/RAM/IOPS to meet demand, vertical scaling is simpler. Modern cloud databases (like RDS, Cloud SQL) offer instances with up to 128 vCPUs and 4TB RAM – enough for most workloads. Always benchmark your query patterns first.

When to consider read replicas: If your bottleneck is read queries (e.g., reporting, dashboards), adding replicas can absorb read traffic without sharding. This works well when writes are manageable on a single primary.

When sharding becomes necessary: Your data grows beyond what a single server can store (e.g., > 10TB). Your write throughput exceeds the I/O capacity of the largest instance. Read latency is unacceptable even with replicas because the working set doesn't fit in memory.

  • Is data size > 10TB? If no, use vertical scaling.
  • If yes, is write throughput > single server capacity? If no, use read replicas + vertical scaling.
  • If yes, is read latency acceptable with replicas? If yes, use read replicas. If no, consider sharding.
  • If sharding, can related data be co-located on one shard? If yes, use hash sharding by primary access key. If no, evaluate denormalization or a search engine.
io/thecodeforge/scalability/decision.pyPYTHON
1
2
3
4
5
6
def scalability_decision(data_size_gb, writes_per_sec, single_server_writes_capacity):
    if data_size_gb < 10000:
        return 'vertical scaling'
    if writes_per_sec < single_server_writes_capacity:
        return 'read replicas'
    return 'consider sharding'
Sharding is a last resort
Most applications never need sharding. Optimize indexes, add caching, and scale vertically first. Sharding introduces operational complexity that can overshadow scalability benefits.
Production Insight
A startup sharded their database when they had only 5GB of data, expecting explosive growth. Growth didn't come, but they were stuck with cross-shard joins and complex migrations. They eventually migrated back to a single server – a painful process that cost weeks.
Key Takeaway
Evaluate vertical scaling and read replicas before sharding. Sharding is warranted only when a single server cannot handle data volume or write throughput, and your access patterns allow efficient shard key design.
Sharding Decision Tree
YesNoYesNoNoYesYesNoData 10TB?Write throughput single servercapacity?Use vertical scalingRead latency acceptable withreplicas?Use read replicas + verticalscalingConsider shardingUse read replicasCan related data beco-located?Hash sharding by primaryaccess keyEvaluate denormalization orsearch engine

Hash vs Range Sharding

Hash sharding applies a hash function to the shard key and takes modulo the number of shards. It distributes rows evenly across shards, preventing hot shards. The trade-off: range queries (e.g., 'users joined in January') require broadcasting to all shards because the hash destroys order.

Range sharding assigns contiguous ranges of the shard key to each shard. Range queries can be routed to a single shard if the range fits within one boundary. The downside: hot shards appear when new data falls disproportionately on one range — for example, sequential user IDs always go to the last shard.

Consistent hashing (used by Cassandra, DynamoDB) maps data to a ring of hash values. When a shard is added or removed, only the data from the adjacent shard moves — no full rehash. This minimizes migration during resharding.

io/thecodeforge/sharding/strategies.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Package: io.thecodeforge.sharding.strategies

import hashlib

class HashShard:
    def __init__(self, n_shards: int):
        self.n = n_shards

    def route(self, user_id: int) -> int:
        return user_id % self.n

class RangeShard:
    def __init__(self, boundaries: list[int]):
        # boundaries like [0, 1_000_000, 2_000_000]
        self.boundaries = boundaries

    def route(self, user_id: int) -> int:
        for i in range(1, len(self.boundaries)):
            if user_id < self.boundaries[i]:
                return i - 1
        return len(self.boundaries) - 1

class ConsistentHashRing:
    def __init__(self, nodes: list[str], replicas: int = 150):
        self.ring = {}
        self.sorted_keys = []
        for node in nodes:
            for i in range(replicas):
                key = hashlib.md5(f"{node}:{i}".encode()).hexdigest()
                hash_val = int(key, 16)
                self.ring[hash_val] = node
            self.sorted_keys = sorted(self.ring.keys())

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None
        hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
        idx = bisect_left(self.sorted_keys, hash_val)
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]
The hot shard trap with range sharding
If your shard key is a monotonically increasing value (e.g., auto-increment ID, timestamp), range sharding will slam all new writes onto the last shard. This creates a write hotspot that throttles throughput and makes vertical scaling pointless. Hash sharding avoids this, but at the cost of range query performance.
Production Insight
A SaaS company sharded by customer signup date (YYYY-MM). January's shard got 10x the writes of February's because a marketing campaign in January brought 80% of new customers.
They had to migrate to hash sharding, which took three weeks.
Rule: If your data shows any temporal skew, do not use range sharding on that field.

Sharding Strategy Comparison Matrix

Beyond the basic hash vs range dichotomy, there are several sharding strategies used in production. This section compares the most common ones: Hash, Range, Directory-based, and Consistent Hashing.

StrategyWrite DistributionRange Query SupportResharding CostComplexityUse Case
HashEvenPoorHigh (full rehash)LowHigh write volume, any access pattern
RangeSkewedGoodMedium (boundary split)MediumRead-heavy, predictable range queries
DirectoryConfigurableDepends on keyLow (update lookup)HighMulti-tenant dynamic allocation, frequent rebalancing
Consistent HashingEvenPoorLow (minimal move)MediumElastic scaling, frequent resharding

Directory-based sharding uses a lookup table that maps each entity to a shard. This decouples the shard key from the physical shard, allowing you to reassign shards without touching the data. The lookup table must be replicated or distributed to avoid a single point of failure. This approach is common in multi-tenant SaaS platforms where tenants have different sizes and can be moved between shards.

Consistent hashing, used by Cassandra and DynamoDB, maps data to a ring of hash values. When a shard is added or removed, only data from adjacent shards moves, minimizing migration. However, it requires virtual nodes to handle load imbalance and adds complexity in failure scenarios.

When choosing a strategy, consider your write/read ratio, need for range queries, and how often you plan to add or remove shards. No single strategy fits all – the right choice depends on your specific workload patterns and operational constraints.

io/thecodeforge/sharding/directory_router.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Package: io.thecodeforge.sharding.directory_router

import redis

class DirectoryRouter:
    def __init__(self, redis_host):
        self.redis = redis.Redis(host=redis_host, port=6379, decode_responses=True)

    def get_shard(self, entity_id: str) -> str:
        shard = self.redis.get(f"shard:{entity_id}")
        if shard is None:
            # Assign to least loaded shard based on current utilization
            shard = self._assign_shard(entity_id)
        return shard

    def assign_shard(self, entity_id: str) -> str:
        # Simplistic: pick shard with fewest entities
        shard_counts = {}
        for i in range(8):
            shard_counts[i] = self.redis.scard(f"shard_entities:{i}")
        target = min(shard_counts, key=shard_counts.get)
        self.redis.sadd(f"shard_entities:{target}", entity_id)
        self.redis.set(f"shard:{entity_id}", target)
        return target

    def rebalance(self):
        # Move entities from overloaded to underloaded shards
        pass
Directory-based sharding is ideal when shard allocation changes frequently
Use it when tenants can be moved to different shards for capacity planning, or when you need to migrate individual entities without full resharding. The lookup table must be highly available and low-latency – Redis cluster or a replicated in-memory cache works well.
Production Insight
A multi-tenant SaaS platform used directory-based sharding to place large tenants on dedicated shards and small tenants on pooled shards. The lookup table was replicated across shards to avoid a single point of failure. When a tenant outgrew its shard, they updated the lookup and performed a double-write migration. This worked well but required a robust caching layer to avoid Redis bottleneck.
Key Takeaway
Choose a sharding strategy based on your workload's write/read ratio, range query needs, and how often you expect to add or remove shards. Directory-based offers flexibility at the cost of a single point of coordination; consistent hashing minimizes migration pain.

Shard Key Selection and Trade-offs

Choosing the wrong shard key is the most common sharding mistake. A good shard key has three properties: high cardinality (many distinct values), even distribution, and inclusion in most queries. Common choices are user_id, tenant_id, order_id, or a hash of email.

If you need both even distribution and efficient range queries over time, consider a composite shard key (e.g., hash(user_id) + month). The primary key includes the hash, and a secondary index supports the time range scan. This adds storage and complexity but may be worthwhile.

Another approach: use a lookup table that maps each shard key to a shard ID. This decouples routing from the key value, allowing you to change shards without modifying the key. However, the lookup becomes a potential bottleneck and single point of failure.

io/thecodeforge/sharding/shard_map.sqlSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- Schema for shard map lookup table
CREATE TABLE io_thecodeforge_sharding.shard_map (
    shard_key_hash BIGINT PRIMARY KEY,
    shard_id INT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (shard_id) REFERENCES io_thecodeforge_sharding.shard_config(shard_id)
);

-- When a new entity is created, insert into the map
INSERT INTO io_thecodeforge_sharding.shard_map (shard_key_hash, shard_id)
VALUES (MD5(CONCAT('user_', :user_id)) MOD 1000000, :target_shard);

-- Query to find which shard holds a given user
SELECT shard_id FROM io_thecodeforge_sharding.shard_map
WHERE shard_key_hash = MD5(CONCAT('user_', :user_id)) MOD 1000000;
Production Insight
A lookup table adds ~1ms per query and must be replicated or sharded itself. If it goes down, you cannot route any queries.
We use it only for tenants that explicitly need data locality flexibility.
Rule: Default to hash sharding. Only add a lookup table when you have dynamic shard allocation requirements.

Cross-Shard Queries and Transactions

When a query doesn't include the shard key, the system must broadcast it to all shards and merge results — this is slow and resource-intensive. For range queries across shards, consider a secondary index or a search engine (Elasticsearch) that pre-indexes the data.

Distributed transactions (e.g., transfer money between users on different shards) require coordination — typically two-phase commit (2PC). 2PC is slow, blocks on failure, and reduces availability. Most production systems avoid cross-shard transactions by designing the data model so that related data lives on the same shard. For example, put all orders for a customer on the same shard as the customer record. If absolute consistency is needed, use saga patterns with compensating actions.

io/thecodeforge/sharding/cross_shard.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Package: io.thecodeforge.sharding.cross_shard

# Example of a saga that transfers funds across shards
import asyncio

class SagaCoordinator:
    async def transfer(self, from_account, to_account, amount):
        # Step 1: Prepare both shards
        from_shard = self.get_shard(from_account.user_id)
        to_shard = self.get_shard(to_account.user_id)
        
        from_ok = await self.prepare_debit(from_shard, from_account, amount)
        if not from_ok:
            return abort
        to_ok = await self.prepare_credit(to_shard, to_account, amount)
        if not to_ok:
            await self.rollback_debit(from_shard, from_account, amount)
            return abort
        
        # Step 2: Commit
        try:
            await self.commit_debit(from_shard, from_account, amount)
            await self.commit_credit(to_shard, to_account, amount)
        except Exception:
            # Compensating transaction
            await self.compensate(from_shard, to_shard)
            raise
        return success
Same-shard locality is the single most important design constraint
  • A user's profile, orders, and payments should all land on the same shard via user_id shard key.
  • If you need to join user and product data, either keep products on every shard (replicated) or accept cross-shard joins via materialized views.
  • Cross-shard transactions are always slower and less reliable. Prefer eventual consistency with idempotent operations.
Production Insight
A finance app tried 2PC for cross-shard transfers. When one shard was under maintenance, all transactions failed. Then they moved to a saga pattern with retries and manual reconciliation.
Rule: Design your shard key to make cross-shard operations the exception, not the norm.

Distributed Transactions in Sharded Databases

When a transaction spans multiple shards, atomicity becomes a distributed systems problem. Two-Phase Commit (2PC) is the classic solution: a coordinator sends a prepare request to each participant; if all agree, it sends a commit. 2PC has well-known drawbacks. The coordinator becomes a single point of failure. If the coordinator crashes after prepare but before commit, participants remain locked until recovery. This can cause cascading failures and timeouts. The protocol also adds latency due to the two-round-trip overhead.

Given these issues, most production systems avoid distributed transactions by designing for data locality. If cross-shard transactions are unavoidable, the saga pattern is the preferred alternative. A saga breaks a transaction into a sequence of local transactions, each with a compensating action. If a step fails, the saga runs the compensating actions for all completed steps, rolling back to a consistent state. Sagas can be choreographed (each step triggers the next via events) or orchestrated (a central coordinator manages the flow).

When evaluating whether to use distributed transactions, consider the CAP trade-off. 2PC ensures strong consistency but sacrifices availability under partition (CP). Sagas provide availability and eventual consistency (AP with human reconciliation). In practice, AP is often acceptable for financial transfers with reconciliation, while CP is required for inventory deduplication in critical systems.

If you must use 2PC, minimize the number of participants and keep their prepare phases fast. Use low timeouts and have a manual override process for stuck transactions.

io/thecodeforge/sharding/saga_orchestrator.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Package: io.thecodeforge.sharding.saga_orchestrator

class SagaOrchestrator:
    def __init__(self):
        self.steps = []
        self.compensations = []

    def add_step(self, action, compensation):
        self.steps.append(action)
        self.compensations.append(compensation)

    async def execute(self):
        executed = []
        for i, step in enumerate(self.steps):
            try:
                await step()
                executed.append(i)
            except Exception as e:
                # Rollback executed steps in reverse order
                for j in reversed(executed):
                    await self.compensations[j]()
                raise e

# Usage: orchestrator.add_step(debit_from_shard1, reverse_debit_from_shard1)
#        orchestrator.add_step(credit_to_shard2, reverse_credit_to_shard2)
#        await orchestrator.execute()
2PC blocks on coordinator failure
If the coordinator crashes after sending prepare but before commit, all participants remain locked. This can cause cascading failures and requires manual intervention. Always have a timeout and a recovery daemon.
Production Insight
A payments company initially used 2PC for cross-shard transfers. During a network partition, one shard was unreachable and all prepare locks held for 30 seconds, causing timeouts and failed transactions. They switched to a saga pattern with idempotent operations and eventual reconciliation, reducing failures by 99% and eliminating manual intervention.
Key Takeaway
Distributed transactions in sharded systems are fragile and slow. Prefer to design data locality to avoid cross-shard operations. When unavoidable, use the saga pattern with compensating transactions and idempotent handlers.

Resharding and Consistent Hashing

Resharding — changing the number of shards — is the most painful operation in a sharded system. With simple modulo hash, changing N to N+1 remaps nearly all rows (about 87.5% for 4->8). The system must read all data, rehash it, and write to new locations. This requires downtime or a complex double-write migration.

Consistent hashing dramatically reduces the amount of data that moves: only the data in the immediate neighbor of a new node is affected. This is why Cassandra and DynamoDB use it. However, consistent hashing adds complexity in handling node failures and virtual nodes (vnodes).

Production resharding strategies include
  • Pre-splitting: Start with many virtual shards (e.g., 1024) that map to physical shards. To add a physical shard, reassign some virtual shards.
  • Mirroring: Write all new data to both old and new shards simultaneously, then backfill old data.
  • Read-repair: After migration, inconsistent reads are fixed lazily during read operations (Cassandra's approach).
io/thecodeforge/sharding/consistent_hash.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Package: io.thecodeforge.sharding.consistent_hash

import hashlib
from bisect import bisect_left

class ConsistentHash:
    def __init__(self, nodes: list[str], vnodes: int = 150):
        self.ring = {}
        self.sorted_keys = []
        for node in nodes:
            for i in range(vnodes):
                key = hashlib.md5(f"{node}:{i}".encode()).hexdigest()
                h = int(key, 16)
                self.ring[h] = node
                self.sorted_keys.append(h)
        self.sorted_keys.sort()

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None
        h = int(hashlib.md5(key.encode()).hexdigest(), 16)
        idx = bisect_left(self.sorted_keys, h)
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

    def add_node(self, new_node: str, vnodes: int = 150):
        # Adding only affects adjacent ring keys
        for i in range(vnodes):
            key = hashlib.md5(f"{new_node}:{i}".encode()).hexdigest()
            h = int(key, 16)
            self.ring[h] = new_node
            self.sorted_keys.append(h)
        self.sorted_keys.sort()
Production Insight
A team using modulo sharding needed to go from 4 to 8 shards. They scheduled 18 hours of read-only downtime. Halfway through, the migration script hit primary key conflicts and they had to restart from a backup. They lost 2 hours of writes.
Rule: Always test migration under realistic load. Use consistent hashing if you anticipate adding shards.

Resharding Architectural Flow

Resharding involves three phases: preparation, migration, and cutover. The exact steps depend on the strategy and whether you use consistent hashing or modulo. Below is a typical resharding flow using a double-write pattern, which is the safest approach for production.

Phase 1: Preparation – Deploy the new shards. Set up replication so they are empty targets. Instrument your application with a feature flag to enable double-write. Create a migration coordination table to track progress.

Phase 2: Migration – Start writing new data to both old and new shards simultaneously. Then backfill existing data from old shards to new shards in batches. Use checkpointing to allow resumption after failures. Validate data integrity by comparing row counts and checksums.

Phase 3: Cutover – Once migration is complete and validated, point all reads to the new shards. Keep double-write active for a grace period in case of rollback. Finally, disable writes to old shards and decommission them.

The mermaid diagram below illustrates the flow with a double-write strategy.

io/thecodeforge/sharding/resharding_migration.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Package: io.thecodeforge.sharding.resharding_migration

import asyncio

class ReshardingMigration:
    def __init__(self, old_router, new_router):
        self.old = old_router
        self.new = new_router
        self.migration_lock = asyncio.Lock()

    async def write(self, key, data):
        # Write to both shards atomically (within lock)
        async with self.migration_lock:
            await self.old.write(key, data)
            await self.new.write(key, data)

    async def migrate_all(self):
        # Backfill old data to new shards
        for key in await self.old.scan_keys():
            data = await self.old.read(key)
            await self.new.write(key, data)

    async def cutover(self):
        # Point read/write logic to new shards
        self.old.shutdown()
        self.new.activate()
Production Insight
During a resharding from 4 to 8 shards, the double-write increased write latency by 15% because each write hit two databases. The team optimized by batching writes and using async writes with a job queue. They also ran the migration in chunks with checkpointing to resume on failure. The entire migration took 14 hours with zero downtime for reads.
Key Takeaway
Resharding is complex and risky. Use consistent hashing to minimize future resharding needs, and always plan for rollback. Double-write with validation and gradual cutover reduces risk but adds latency.
Resharding Flow with Double-Write
YesNoStart ReshardingDeploy New ShardsEnable Double-Write: write toboth old and newMigrate Existing Data from Oldto NewCompare Data IntegrityIntegrity ok?Cutover: point reads to newshardsRollback and investigateDisable Old ShardsDecommission Old Shards

Why Most Teams Get Shard Key Selection Wrong (And How to Fix It)

The shard key is the single most important decision in your sharding architecture. Pick wrong, and you'll rebuild everything. Most teams default to user_id or customer_id because it's obvious — but that's not always right.

The real question: what access pattern drives 90% of your reads? In a SaaS product, maybe it's tenant_id. In a messaging app, maybe it's conversation_id. Your shard key must align with your hottest query path. If every query crosses shards, you've built a distributed system that acts like a single database — except slower.

There's a second trap: uneven distribution. A range-based shard on "created_at" sounds smart until all new data lands on one shard. That's not sharding. That's a hot partition with extra complexity.

Choose a key with high cardinality and uniform distribution. Hash it. Route by the hash. Then design your shard key so that 95% of your queries resolve to one shard. That's not theory — that's the difference between a system that works at 10x scale and one that collapses at 2x.

ShardRouter.javaJAVA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// io.thecodeforge.shardrouter
// Never expose raw shard keys to clients
import java.security.MessageDigest;
import java.nio.charset.StandardCharsets;
import java.math.BigInteger;

public class ShardRouter {
    private final int shardCount;
    
    public ShardRouter(int shards) {
        this.shardCount = shards;
    }
    
    public int resolveShard(String userId) {
        // Hash the key - never use direct values
        byte[] hash = hashKey(userId);
        // Use BigInteger to avoid negative modulo
        return new BigInteger(1, hash)
            .mod(BigInteger.valueOf(shardCount))
            .intValue();
    }
    
    private byte[] hashKey(String key) {
        try {
            MessageDigest md = MessageDigest.getInstance("SHA-256");
            return md.digest(key.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            throw new RuntimeException("Hashing failed", e);
        }
    }
}
Output
// Shard 3 for userId 'alice'
// Shard 7 for userId 'bob'
// Uniform distribution across 8 shards
Production Trap:
Don't use Java's hashCode() for shard routing. It's not consistent across JVM restarts. Always use a cryptographic hash (SHA-256) or a consistent hashing library.
Key Takeaway
One shard key to rule them all: align with your hottest read path, hash it, and guarantee uniform distribution.

Read-Through Caches: The Band-Aid That Became a Production Standard

Sharding solves write scaling, but reads still hurt when every shard handles its own cache. The fix: read-through caching with a distributed cache layer like Redis or Memcached positioned in front of your sharded database.

Here's the pattern: application queries cache first. On cache miss, fetch from the correct shard, populate cache, return. This keeps hot data off your database shards entirely. For read-heavy workloads (80/20 read/write split), this reduces database load by 60-80%.

But there's a catch — cache invalidation across shards is brutal. When a write updates a user's profile on shard 4, you must invalidate that user's cached record everywhere. Implement a TTL-based strategy (5-15 minutes) and use cache-aside with explicit invalidation on writes.

Don't try perfect consistency. Accept eventual consistency. Your shards have enough work to do without coordinating cache purges at transaction commit time. In production, we've seen teams burn weeks building distributed invalidation systems that a 10-minute TTL would have solved.

ReadThroughCache.javaJAVA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// io.thecodeforge.cache
// Read-through pattern for sharded databases
import redis.clients.jedis.Jedis;

public class ReadThroughCache {
    private final Jedis cache;
    private final ShardRouter router;
    private final ShardClient[] shards;
    private final int ttlSeconds = 600; // 10 min
    
    public UserProfile getUser(String userId) {
        String cacheKey = "user:" + userId;
        
        // 1. Check cache first
        String cached = cache.get(cacheKey);
        if (cached != null) {
            return deserialize(cached);
        }
        
        // 2. Miss: fetch from correct shard
        int shard = router.resolveShard(userId);
        UserProfile profile = shards[shard].fetchUser(userId);
        
        // 3. Populate cache with TTL
        cache.setex(cacheKey, ttlSeconds, serialize(profile));
        
        return profile;
    }
    
    public void updateUser(UserProfile profile) {
        // Write-through: update DB first
        int shard = router.resolveShard(profile.getId());
        shards[shard].updateUser(profile);
        
        // Then invalidate cache
        cache.del("user:" + profile.getId());
    }
}
Output
// Cache hit latency: ~2ms (redis)
// Cache miss latency: ~50ms (database shard)
// 95% cache hit rate expected
Production Trap:
Never set cache TTL to zero thinking you'll manually invalidate. Your cache WILL stale out and users WILL see old data. Always set a TTL, even if it's 24 hours.
Key Takeaway
Read-through caching cuts shard load by 80%. Accept eventual consistency. Use TTLs. Move on.
● Production incidentPOST-MORTEMseverity: high

The Hot Shard That Took Down Checkout at Midnight

Symptom
Order creation latency spiked from 20ms to 12s. The checkout API returned 504 Gateway Timeout errors. Monitoring showed one shard at 95% CPU while others idled at 15%.
Assumption
The team assumed user IDs were uniformly distributed and that range sharding by customer_id would spread load evenly across shards.
Root cause
The signup flow generated sequential IDs — new customers all landed in the last shard. During a flash sale, most orders came from new users, all hitting the same shard.
Fix
Switched to hash sharding on customer_id (target: 8 shards, hash modulo 8). Migrated existing data using a double-write strategy: write to both old and new shards for a week, then cut over.
Key lesson
  • Always simulate write patterns before choosing a shard key — sequential IDs kill range sharding.
  • Use hash sharding or consistent hashing for write-heavy workloads.
  • Monitor per-shard CPU and throughput; set alerts for imbalance.
Production debug guideCommon symptoms and the actions you take to diagnose them4 entries
Symptom · 01
A query that used to run under 10ms now takes 500ms
Fix
Check if the query includes the shard key. If not, the router broadcasts to all shards. Add the key to the WHERE clause.
Symptom · 02
One shard's disk fills up much faster than others
Fix
Compute the shard key distribution: SELECT shard, COUNT(*) FROM metadata. If heavily skewed, your shard key is bad or you need resharding.
Symptom · 03
Distributed transactions fail with 'unrecoverable' errors
Fix
Check the transaction coordinator logs. 2PC prepare phase may be timing out. Consider redesigning the data model to keep transaction rows on the same shard.
Symptom · 04
After adding a shard, queries return inconsistent results
Fix
Verify that the routing logic has been updated. For modulo-based sharding, you cannot just add a shard without a full data migration. Use consistent hashing to avoid this.
★ Sharding Quick Debug Cheat SheetImmediate commands and fixes for the most common sharding failures.
Query not returning results for a known user
Immediate action
Compute which shard should hold that user: shard_index = hash(user_id) % num_shards.
Commands
SELECT shard FROM shard_map WHERE user_id = ?
Check the router logs: tail -100 /var/log/shard_router.log | grep 'user_id=123'
Fix now
If the user's data is on the wrong shard due to a routing logic bug, run a one-time migration script to move that record.
Cross-shard query timeout (e.g., aggregate across shards)+
Immediate action
Cancel the query. Check if the query can be rewritten to use shard key. If not, consider materialized aggregates on each shard.
Commands
SHOW PROCESSLIST on each shard → kill the offending queries.
Run EXPLAIN on the original query to see if it can be optimized with shard key filtering.
Fix now
If it's a one-off, temporarily increase query timeout. Long-term, redesign the query to avoid full scan.
Resharding in progress, data inconsistent+
Immediate action
Stop application writes. Check which rows have been migrated and which haven't.
Commands
SELECT COUNT(*) FROM migration_audit WHERE status = 'PENDING'
Restart migration from last checkpoint: migrate_resume --last-checkpoint <timestamp>
Fix now
If migration is stuck due to conflicts, use a force flag to re-sync the batch.
Hash vs Range Sharding Comparison
PropertyHash ShardingRange Sharding
DistributionEven — no hot shardsCan be skewed — hot shard risk near boundaries
Range QueriesMust scan all shardsFast if range fits within one shard
ReshardingFull rehash requiredAdds new boundaries, still moves data
Write PerformanceUniform across shardsBursts concentrate on active range
ComplexityLowMedium — need boundary management
Use CaseHigh write volume, any access patternRead-heavy with predictable range queries

Key takeaways

1
Sharding enables horizontal scale
add more servers as data grows.
2
The shard key must be included in all queries
otherwise all shards must be scanned.
3
Hash sharding distributes evenly; range sharding allows range queries but risks hot shards.
4
Cross-shard joins and transactions are hard
design your shard key to minimise them.
5
Most applications should NOT shard
optimise indexes, add read replicas, and scale vertically first.

Common mistakes to avoid

4 patterns
×

Choosing a low-cardinality shard key

Symptom
Some shards hold 90% of the data and handle 90% of queries, while others are nearly empty.
Fix
Choose a shard key with at least 100 times more distinct values than the number of shards. Use a hash of a high-cardinality column like user_id or email.
×

Forgetting to include the shard key in every query

Symptom
Queries that filter on other columns broadcast to all shards and become 10–100x slower. The database CPU spikes and response times degrade.
Fix
Add a NOTICE or WARNING log whenever a query does not include the shard key. Enforce it with a query parser that rejects unfiltered queries or routes them to a dedicated read-only shard pool.
×

Using modulo sharding with a future expectation to add shards

Symptom
When a shard is added, the team discovers that 87% of data needs to be moved, and there is no efficient migration plan.
Fix
Use consistent hashing from the start. If already on modulo, plan for eventual migration by pre-splitting into many virtual shards (e.g., 1024) that map to physical shards.
×

Not monitoring per-shard load and skew

Symptom
One shard runs out of disk or CPU while others are at 20% utilization, but no alerts exist for skew.
Fix
Set up monitoring for each shard: CPU, disk, query latency, connection count. Alert if any shard exceeds 150% of the average.
INTERVIEW PREP · PRACTICE MODE

Interview Questions on This Topic

Q01SENIOR
What is a shard key and how do you choose a good one?
Q02SENIOR
What is the hot shard problem and how do you prevent it?
Q03SENIOR
How does consistent hashing help with resharding?
Q04SENIOR
How would you design a system that supports cross-shard joins without sa...
Q01 of 04SENIOR

What is a shard key and how do you choose a good one?

ANSWER
A shard key is the column or set of columns used to determine which shard stores a row. A good shard key has high cardinality, even distribution, and is present in most queries. Prefer natural keys like user_id or tenant_id over auto-increment IDs. If you need range queries, consider a composite key or use a hash with a secondary index.
FAQ · 4 QUESTIONS

Frequently Asked Questions

01
How do you handle transactions that span multiple shards?
02
What is resharding and why is it painful?
03
Can I shard on a non-unique column like status or country?
04
What's the difference between sharding and partitioning?
N
Naren Founder & Principal Engineer

20+ years shipping high-throughput database systems. Drawn from code that ran under real load.

Follow
Verified
production tested
May 24, 2026
last updated
1,554
articles · all by Naren
🔥

That's Database Design. Mark it forged?

11 min read · try the examples if you haven't

Previous
Database Relationships
6 / 16 · Database Design
Next
Database Replication