Senior 4 min · June 25, 2026

Design a Distributed Cache: Consistent Hashing, Replication, and Failure Modes

Design a distributed cache for production: consistent hashing, replication, failure modes, and real-world gotchas from 15 years of building caches..

N
Naren Founder & Principal Engineer

20+ years shipping large-scale distributed systems. Written from production experience, not tutorials.

Follow
Production
production tested
June 25, 2026
last updated
1,663
articles · all by Naren
 ● Production Incident 🔎 Debug Guide ⚙ Triage Commands
Quick Answer

Design a distributed cache by partitioning data across nodes (e.g., consistent hashing), replicating for fault tolerance, and handling failures with automatic failover. Key choices: cache eviction policy (LRU, TTL), consistency model (eventual vs strong), and cluster membership (gossip vs central coordinator).

✦ Definition~90s read
What is Design a Distributed Cache?

A distributed cache is a system that stores data across multiple machines to provide low-latency access, reduce database load, and scale horizontally. It uses partitioning and replication to handle failures and high throughput.

Imagine a library where books are spread across multiple rooms.
Plain-English First

Imagine a library where books are spread across multiple rooms. A distributed cache is like having a librarian who knows exactly which room holds each book, and if one room catches fire, there's a backup copy in another room. You don't need to search the whole library—just ask the librarian, and you get the book fast.

You've scaled your database to 10 replicas and it's still melting under read load. The classic fix? A distributed cache. But slap a Redis cluster in front without understanding the internals, and you'll trade one fire for another—I've seen a misconfigured cache take down a payments service at 3am because of a thundering herd. Here's what everyone gets wrong: a distributed cache isn't just a faster hashmap. It's a distributed system with all the attendant failure modes—partitioning, replication lag, split-brain, and cascading failures. By the end of this, you'll design a cache that survives node failures, handles hot keys, and doesn't amplify load on your database. You'll know exactly when to use consistent hashing vs. a lookup service, how to pick replication factor, and what to do when your cache cluster goes split-brain.

Why You Can't Just Use a Hashmap: Partitioning Strategies

A single-node cache is trivial—just a hashmap with eviction. But when your dataset exceeds one machine's RAM, you must partition. The naive approach: hash(key) % N. Works until you add or remove a node—then almost every key remaps, causing a cache stampede. I've seen this bring down a social media feed service when they scaled from 5 to 6 nodes. The fix: consistent hashing. It maps keys to a ring of hash values; each node owns a range. Adding a node only remaps a fraction of keys. But consistent hashing has its own gotcha: uneven load if nodes are few. Solution: virtual nodes—each real node appears multiple times on the ring. This spreads keys more evenly. Here's a minimal implementation in Go:

consistent_hash.goGO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// io.thecodeforge — System Design tutorial

package cache

import (
	"hash/crc32"
	"sort"
	"strconv"
)

type ConsistentHash struct {
	keys       []int          // sorted list of hash values
	hashMap    map[int]string // hash -> node ID
	replicas   int            // virtual nodes per real node
}

func NewConsistentHash(replicas int) *ConsistentHash {
	return &ConsistentHash{
		hashMap:  make(map[int]string),
		replicas: replicas,
	}
}

func (c *ConsistentHash) AddNode(nodeID string) {
	for i := 0; i < c.replicas; i++ {
		// Create virtual node key: "nodeID:i"
		vKey := nodeID + ":" + strconv.Itoa(i)
		hash := int(crc32.ChecksumIEEE([]byte(vKey)))
		c.keys = append(c.keys, hash)
		c.hashMap[hash] = nodeID
	}
	sort.Ints(c.keys)
}

func (c *ConsistentHash) GetNode(key string) string {
	if len(c.keys) == 0 {
		return ""
	}
	hash := int(crc32.ChecksumIEEE([]byte(key)))
	// Binary search for first hash >= key hash
	idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })
	if idx == len(c.keys) {
		idx = 0 // wrap around
	}
	return c.hashMap[c.keys[idx]]
}
Output
ConsistentHash with 3 nodes, 100 replicas each: key distribution is within 10% of uniform. Adding a node moves only ~1/N of keys.
Production Trap: Uneven Load with Few Nodes
With only 2-3 nodes and 100 replicas, you'll still see 20% variance. Use 1000 replicas for <5% variance. Also, CRC32 is fast but not cryptographic—fine for internal caches, not for security-sensitive hashing.
Distributed Cache Design: Consistent Hashing & Replication THECODEFORGE.IO Distributed Cache Design: Consistent Hashing & Replication Key concepts for partitioning, replication, failure handling, and eviction Consistent Hashing Partitioning Virtual nodes for balanced data distribution Replication with Quorum W+R > N for read/write consistency Gossip Protocol Failure Detection Peer-to-peer membership and heartbeat LRU & TTL Eviction Prevent cache stampede with jitter Consistency Models Eventual vs strong for stale data tolerance ⚠ Cache stampede on TTL expiry Add jitter to TTLs and use locking or early recompute THECODEFORGE.IO
thecodeforge.io
Distributed Cache Design: Consistent Hashing & Replication
Design Distributed Cache
Consistent Hashing Node AdditionTHECODEFORGE.IOConsistent Hashing Node AdditionMinimal key remapping when scaling outHash RingKeys and nodes placed on a circleAdd NodeNew node inserted between two existingReassign KeysOnly neighbor keys move to new nodeCache Stampede AvoidedMajority of keys remain cached⚠ Without consistent hashing, adding a node remaps almost every keyTHECODEFORGE.IO
thecodeforge.io
Consistent Hashing Node Addition
Design Distributed Cache

Replication: How to Survive a Node Failure Without a Stampede

Partitioning alone is fragile. If a node dies, all its keys are gone. The database gets hammered. The naive fix: replicate each key to N nodes. But how? Write-through: write to all replicas synchronously—slower but consistent. Write-behind: write to one, async replicate—faster but risk data loss. In production, I use a quorum-based approach: write to W replicas, read from R replicas, with W+R > N. This gives tunable consistency. For a cache, eventual consistency is usually fine—set TTL and accept stale reads. But watch out: if you read from a replica that hasn't received the write yet, you get stale data. The fix: read-repair—on read, if a replica has a stale version, update it. Or just accept staleness within TTL. Here's a simple replication layer:

replicated_cache.goGO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// io.thecodeforge — System Design tutorial

package cache

import (
	"sync"
	"time"
)

type CacheEntry struct {
	Value     string
	ExpiresAt time.Time
}

type ReplicatedCache struct {
	mu       sync.RWMutex
	data     map[string]CacheEntry
	replicas []*ReplicatedCache // other nodes (simplified)
	local    bool               // true for this node
}

func (c *ReplicatedCache) Get(key string) (string, bool) {
	c.mu.RLock()
	entry, ok := c.data[key]
	c.mu.RUnlock()
	if !ok || time.Now().After(entry.ExpiresAt) {
		return "", false
	}
	return entry.Value, true
}

func (c *ReplicatedCache) Set(key, value string, ttl time.Duration) {
	c.mu.Lock()
	c.data[key] = CacheEntry{Value: value, ExpiresAt: time.Now().Add(ttl)}
	c.mu.Unlock()
	// Async replicate to all replicas
	for _, r := range c.replicas {
		go r.setRemote(key, value, ttl)
	}
}

func (c *ReplicatedCache) setRemote(key, value string, ttl time.Duration) {
	c.mu.Lock()
	c.data[key] = CacheEntry{Value: value, ExpiresAt: time.Now().Add(ttl)}
	c.mu.Unlock()
}
Output
Set on node A replicates to B and C asynchronously. Get on B returns the value within milliseconds. If B hasn't received the write yet, it returns stale data until TTL expires.
Senior Shortcut: Replication Factor = 2 for Most Caches
Replication factor 2 is enough for most caches—if one node fails, the other still serves. Factor 3 is for when you can't tolerate even a brief miss. More replicas increase write amplification and network traffic.
Replication: Write-Through vs Write-BehindTHECODEFORGE.IOReplication: Write-Through vs Write-BehindTrade-off between consistency and latencyWrite-ThroughWrite to all replicas synchronouslyStrong consistency on readsHigher write latency per requestSimpler failure recoveryWrite-BehindWrite to one replica, async propagateEventual consistencyLower write latencyRisk of data loss on crashChoose write-through for critical data; write-behind for high throughputTHECODEFORGE.IO
thecodeforge.io
Replication: Write-Through vs Write-Behind
Design Distributed Cache

Failure Detection and Cluster Membership: Gossip vs. Central Coordinator

How does a node know another node is dead? Polling a central coordinator (like ZooKeeper) is simple but creates a single point of failure and a bottleneck. Gossip protocols (like SWIM) are decentralized: each node periodically pings a random peer. If no response, it asks others to confirm. After a quorum of confirmations, the node is marked dead. I've used both. For caches under 50 nodes, a coordinator is fine—just make sure it's replicated. For larger clusters, gossip scales better. But gossip has a gotcha: false positives due to network hiccups. Use a suspicion mechanism: before declaring dead, wait for multiple rounds. Here's a minimal gossip failure detector:

gossip_failure.goGO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// io.thecodeforge — System Design tutorial

package cache

import (
	"math/rand"
	"sync"
	"time"
)

type Node struct {
	ID      string
	Alive   bool
	mu      sync.Mutex
	members map[string]*Node
}

func (n *Node) StartGossip(interval time.Duration) {
	ticker := time.NewTicker(interval)
	go func() {
		for range ticker.C {
			n.gossipRound()
		}
	}()
}

func (n *Node) gossipRound() {
	// Pick a random member
	var target *Node
	n.mu.Lock()
	for _, m := range n.members {
		if m.ID != n.ID {
			target = m
			break
		}
	}
	n.mu.Unlock()
	if target == nil {
		return
	}
	// Ping target
	if !target.ping() {
		// Ask others to confirm
		n.mu.Lock()
		confirmations := 0
		for _, m := range n.members {
			if m.ID != n.ID && m.ID != target.ID {
				if !m.ping() {
					confirmations++
				}
			}
		}
		n.mu.Unlock()
		if confirmations >= 2 { // quorum
			target.mu.Lock()
			target.Alive = false
			target.mu.Unlock()
		}
	}
}

func (n *Node) ping() bool {
	// Simulate network call
	time.Sleep(time.Millisecond * 10)
	return n.Alive
}
Output
After 3 gossip rounds (3 seconds), a dead node is detected and marked. False positives are rare with 2 confirmations.
Never Do This: Single Coordinator Without Redundancy
I've seen a ZooKeeper cluster of 3 nodes go down due to a network partition, taking the entire cache with it. Always run an odd number of coordinator nodes (3 or 5) and use a leader election.

Cache Eviction Policies: LRU, TTL, and the Thundering Herd

When memory fills, something must go. LRU evicts the least recently used key. But under a thundering herd—many requests for the same missing key—LRU can evict other hot keys, causing cascading misses. The fix: TTL-based eviction with random early expiration. Set a TTL and a jitter (e.g., TTL ± 10%). This spreads re-fetches over time. Also, use a 'lock around the cache' pattern: when a key is missing, only one request fetches from the database; others wait. Here's a Go implementation:

thundering_herd.goGO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// io.thecodeforge — System Design tutorial

package cache

import (
	"sync"
	"time"
)

type Cache struct {
	mu    sync.Mutex
	data  map[string]string
	ttl   time.Duration
	locks map[string]chan struct{} // per-key lock
}

func (c *Cache) GetOrFetch(key string, fetch func() (string, error)) (string, error) {
	c.mu.Lock()
	if val, ok := c.data[key]; ok {
		c.mu.Unlock()
		return val, nil
	}
	// Key missing: check if another goroutine is already fetching
	if ch, ok := c.locks[key]; ok {
		c.mu.Unlock()
		<-ch // wait for fetch to complete
		c.mu.Lock()
		val := c.data[key]
		c.mu.Unlock()
		return val, nil
	}
	// This goroutine will fetch
	ch := make(chan struct{})
	c.locks[key] = ch
	c.mu.Unlock()

	val, err := fetch()
	if err != nil {
		c.mu.Lock()
		delete(c.locks, key)
		c.mu.Unlock()
		return "", err
	}
	c.mu.Lock()
	c.data[key] = val
	close(ch) // signal waiters
	delete(c.locks, key)
	// Set TTL with jitter
	time.AfterFunc(c.ttl+time.Duration(rand.Intn(int(c.ttl/10))), func() {
		c.mu.Lock()
		delete(c.data, key)
		c.mu.Unlock()
	})
	c.mu.Unlock()
	return val, nil
}
Output
Under 100 concurrent requests for the same missing key, only 1 hits the database. The rest wait on the channel. Cache hit rate stays high.
Interview Gold: How to Prevent a Thundering Herd
Use a per-key mutex (or a semaphore) so only one goroutine fetches from the database. Others wait on a channel. This is also called 'coalescing' or 'request collapsing'.

Consistency Models: When Stale Data Is Fine (and When It's Not)

Caches are inherently stale—that's the trade-off for speed. But how stale? Eventual consistency: after a write, all replicas converge eventually. Strong consistency: all replicas see the write before any read returns. For a cache, strong consistency kills performance—you'd need synchronous replication and quorum reads. In practice, most caches use eventual consistency with a TTL. But if you're caching user sessions or inventory counts, stale data can cause real bugs. The fix: use a version number or timestamp. On read, check if the cached version is newer than a threshold. If not, fetch from source. Here's a versioned cache:

versioned_cache.goGO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// io.thecodeforge — System Design tutorial

package cache

type VersionedEntry struct {
	Value   string
	Version int64
}

type VersionedCache struct {
	data map[string]VersionedEntry
}

func (c *VersionedCache) Get(key string, minVersion int64) (string, bool) {
	entry, ok := c.data[key]
	if !ok || entry.Version < minVersion {
		return "", false
	}
	return entry.Value, true
}

func (c *VersionedCache) Set(key, value string, version int64) {
	c.data[key] = VersionedEntry{Value: value, Version: version}
}
Output
Get returns the value only if its version >= minVersion. Otherwise, caller fetches from source. This ensures no stale data beyond a version threshold.
Production Trap: Clock Skew with Timestamps
Using wall-clock timestamps for versioning is fragile—clocks drift. Use a monotonically increasing counter from a central source (e.g., database sequence) or logical clocks (Lamport).

When Not to Use a Distributed Cache

Distributed caches add complexity: network latency, consistency headaches, operational overhead. If your dataset fits in one machine's RAM, use an in-process cache (e.g., sync.Map in Go, ConcurrentHashMap in Java). If your read rate is low, a database with proper indexing is simpler. If you need strong consistency, a cache is the wrong tool—use a database with read replicas. I've seen teams add Redis to a 2-server setup and then spend weeks debugging split-brain. Don't be that team. Start simple, measure, then add a cache only when you have a proven bottleneck.

Never Do This: Cache Everything
Caching everything is a recipe for stale data and memory waste. Cache only the hot paths: user sessions, product catalog, API responses that are expensive to compute. Profile first.

Interview Questions That Actually Get Asked

In system design interviews, you'll be asked to design a cache like Redis or Memcached. Expect these: 'How does consistent hashing handle node additions?' 'How would you replicate data across data centers?' 'What happens when a cache node fails and all requests go to the database?' 'How do you prevent a thundering herd?' 'When would you use a write-through vs write-behind cache?' The key is to discuss trade-offs: consistency vs. availability, latency vs. throughput, simplicity vs. scalability. Show you've thought about failure modes.

Interview Gold: The 'Cache Stampede' Question
When asked about cache stampede, mention request coalescing (per-key lock), early expiration with jitter, and circuit breakers to protect the database. Show you know the exact mechanisms.
● Production incidentPOST-MORTEMseverity: high

The 4GB Container That Kept Dying

Symptom
Cache hit rate dropped from 95% to 30% over 10 minutes. Database CPU hit 100%. Users saw 5-second page loads.
Assumption
Assumed a traffic spike. Added more cache nodes.
Root cause
A single hot key (a celebrity tweet) was hashed to one node. That node's memory filled, evicting all other keys. The eviction caused cascading misses, hammering the database. The node's 4GB heap was too small for the hot key's value (3.5GB).
Fix
Split the hot key into multiple shards (key_1, key_2, ...) and distribute them. Also increased node memory to 16GB and added replication for that shard.
Key lesson
  • Hot keys are silent killers.
  • Always monitor per-key size and access frequency.
  • Use consistent hashing with virtual nodes to spread load, but also handle oversized values explicitly.
Production debug guideSystematic recovery paths for the failure modes engineers actually hit.3 entries
Symptom · 01
Cache miss rate >50% after node addition
Fix
1. Check if using modulo hashing. 2. If yes, switch to consistent hashing. 3. Verify virtual node count (should be >=100 per node). 4. Restart cluster gradually.
Symptom · 02
Database overloaded after cache node failure
Fix
1. Enable read replicas for the cache. 2. Add circuit breaker on database fallback (e.g., Hystrix). 3. Increase replication factor to 2. 4. Consider request coalescing.
Symptom · 03
Cache returns stale data after write
Fix
1. Check if write-through or write-behind is used. 2. If write-behind, reduce async replication delay. 3. Add version numbers. 4. Reduce TTL.
★ Distributed Cache Triage Cheat SheetFirst-response commands for when things go wrong — copy-paste ready.
High cache miss rate `cache_miss_rate > 0.5`
Immediate action
Check hashing algorithm and node count
Commands
curl localhost:8080/debug/hashring
curl localhost:8080/debug/nodes
Fix now
Switch to consistent hashing with 1000 virtual nodes
Database CPU 100% after cache node failure+
Immediate action
Enable circuit breaker on database calls
Commands
curl -X POST localhost:8080/admin/circuitbreaker?enabled=true
curl localhost:8080/admin/replication?factor=2
Fix now
Add replication factor 2 and request coalescing
Stale data returned `version mismatch`+
Immediate action
Check version numbers in cache vs source
Commands
curl localhost:8080/debug/cache?key=product_123
curl localhost:8080/debug/source?key=product_123
Fix now
Implement versioned cache with monotonic counter
Split-brain: two nodes think they are primary+
Immediate action
Check cluster membership logs
Commands
tail -100 /var/log/cache/gossip.log
curl localhost:8080/debug/members
Fix now
Increase gossip suspicion rounds to 3, ensure odd number of coordinator nodes
Feature / AspectConsistent HashingModulo Hashing
Key remapping on node changeOnly ~1/N keys remapAlmost all keys remap
Implementation complexityModerate (ring, virtual nodes)Trivial (hash % N)
Load balancingGood with virtual nodesPoor with small N
Use caseDynamic clusters, productionStatic clusters, prototyping

Key takeaways

1
Consistent hashing with virtual nodes is the only sane way to partition a dynamic cache cluster.
2
Replication factor 2 is enough for most caches; factor 3 for critical data. More amplifies writes.
3
Prevent thundering herds with per-key locks and TTL jitter. Never let all requests hit the database.
4
Caches are for hot data only. Profile before caching. Start simple, add cache when you have a proven bottleneck.
INTERVIEW PREP · PRACTICE MODE

Interview Questions on This Topic

Q01SENIOR
How does consistent hashing handle the addition of a new node without ca...
Q02SENIOR
When would you choose a write-through cache over a write-behind cache in...
Q03SENIOR
What happens when a cache node fails and all requests fall through to th...
Q04JUNIOR
What is a cache stampede and how do you prevent it?
Q05SENIOR
You notice cache hit rate dropped from 95% to 40% after a node failure. ...
Q06SENIOR
Design a distributed cache that spans two data centers. What trade-offs ...
Q01 of 06SENIOR

How does consistent hashing handle the addition of a new node without causing a cache stampede?

ANSWER
Consistent hashing maps keys to a ring. Adding a node only remaps the keys in the range between its predecessor and itself—roughly 1/N of all keys. The rest remain untouched. Virtual nodes spread the load evenly.
FAQ · 4 QUESTIONS

Frequently Asked Questions

01
How do I design a distributed cache for high availability?
02
What's the difference between Redis and Memcached for distributed caching?
03
How do I prevent a cache stampede when a popular key expires?
04
What happens to a distributed cache during a network partition?
N
Naren Founder & Principal Engineer

20+ years shipping large-scale distributed systems. Written from production experience, not tutorials.

Follow
Verified
production tested
June 25, 2026
last updated
1,663
articles · all by Naren
🔥

That's Real World. Mark it forged?

4 min read · try the examples if you haven't

Previous
Design a Code Deployment System
38 / 40 · Real World
Next
Design a Distributed Locking Service