Medium 14 min · May 28, 2026

Distributed Training: Data vs. Model Parallelism in Production

Master data and model parallelism for distributed ML training.

N
Naren Founder & Principal Engineer

20+ years shipping production Java in banking & fintech. Every example here is drawn from a real system.

Follow
Production
production tested
June 02, 2026
last updated
1,510
articles · all by Naren
 ● Production Incident 🔎 Debug Guide ⚙ Triage Commands
Quick Answer
  • Data parallelism replicates the model across devices, splitting batches for faster training.
  • Model parallelism partitions a model across devices, essential for huge models that don't fit on one GPU.
  • Pipeline parallelism is a variant of model parallelism that reduces idle time by staggering micro-batches.
  • Tensor parallelism splits individual operations (e.g., matrix multiplies) across devices.
  • Hybrid parallelism combines data and model parallelism for massive models like GPT-4.
  • Communication overhead (all-reduce, all-to-all) is the main bottleneck in distributed training.
✦ Definition~90s read
What is Distributed Training?

Distributed training uses multiple devices (GPUs/TPUs) to accelerate model training. Data parallelism replicates the model on each device and splits the data batch; model parallelism partitions the model itself across devices. Hybrid approaches combine both to handle models that exceed single-device memory while maximizing throughput.

Imagine a team of chefs baking a giant cake.
Plain-English First

Imagine a team of chefs baking a giant cake. Data parallelism is like each chef baking a whole cake from a different batch of batter—they all work independently and combine results. Model parallelism is like dividing the recipe: one chef mixes dry ingredients, another adds wet, and a third bakes—each handles a different part of the process. For really huge cakes, you need both: multiple teams each handling a slice of the recipe.

Training a state-of-the-art language model or vision transformer is no longer a single-GPU affair. Models with hundreds of billions of parameters demand distributed training across clusters of GPUs, often spanning multiple nodes. The choice between data parallelism and model parallelism—and their hybrids—determines whether your training job finishes in days or never converges due to communication bottlenecks.

Data parallelism remains the dominant pattern for most workloads: replicate the model, split the batch, sync gradients. But when the model itself exceeds GPU memory, you must slice it across devices using model parallelism. The naive approach—simply placing layers on different GPUs—leads to severe underutilization as devices wait for each other.

Pipeline parallelism and tensor parallelism emerged as production-grade solutions. Pipeline parallelism staggers micro-batches to keep devices busy, while tensor parallelism splits individual operations, reducing memory per device at the cost of more communication. The real art lies in choosing the right combination for your model architecture and hardware topology.

This article gives you actionable patterns, common failure modes, and debugging strategies. Whether you're scaling a 7B parameter model or a massive recommendation system, understanding these parallelism strategies is non-negotiable for production ML engineering.

Why Distributed Training? The Scale Imperative

By 2026, the largest production models routinely exceed 10 trillion parameters. A single NVIDIA B200 GPU offers 1.8 TB/s memory bandwidth and 192 GB HBM3e — enough to hold a 100B-parameter model at FP16, but not a 1T+ model. More critically, training a 1T model on a single GPU would take over 200 years. Distributed training is now table stakes; it is the only path to feasible training timelines. The imperative is simple: split the work across hundreds or thousands of accelerators to reduce time-to-train from decades to days.

The economics are stark. Training GPT-4 (estimated 1.8T parameters) cost around $100M in compute. Without distributed techniques, that cost would be prohibitive even for hyperscalers. Distributed training enables linear scaling of throughput with accelerator count, up to the point where communication overhead dominates. The bottleneck is no longer compute but inter-node bandwidth: NVLink 5.0 provides 1.8 TB/s within a node, but cross-node InfiniBand NDR-400 offers only 400 Gb/s per link. This asymmetry forces careful parallelism strategy selection.

Three primary paradigms dominate: data parallelism (DP), model parallelism (MP), and pipeline parallelism (PP). Each addresses a different constraint. DP replicates the model across devices and splits the batch; MP partitions the model itself across devices; PP stages layers across devices and streams micro-batches. Hybrid approaches — combining all three — are standard in production. The key metric is Model FLOPS Utilization (MFU), which measures achieved throughput relative to peak hardware FLOPS. State-of-the-art systems achieve 50-60% MFU on 10k+ GPU clusters.

The decision tree is practical: if your model fits on one GPU but you need faster training, use DP. If it doesn't fit, use MP or PP. If you have many GPUs, combine them. The rest of this article dissects each technique with concrete math and production code.

Scale Reality Check
Training a 1T-parameter model on 10,000 H100 GPUs at 50% MFU still takes ~30 days. Distributed training is an engineering discipline, not a magic wand.
Production Insight
Always benchmark communication overhead before scaling. A 10% communication-to-compute ratio is acceptable; above 30%, rethink your parallelism strategy or invest in faster interconnects.
Key Takeaway
Distributed training is mandatory for models >100B parameters. Choose parallelism based on model size vs. GPU memory, and always optimize for MFU.
Distributed Training: Parallelism Strategies THECODEFORGE.IO Distributed Training: Parallelism Strategies Data, Model, Pipeline, Tensor, and Hybrid parallelism flow Data Parallelism Replicate model, all-reduce gradients Model Parallelism Split layers across devices Pipeline Parallelism Micro-batches with 1F1B schedule Tensor Parallelism Split operations within a layer Hybrid Parallelism Combine strategies for massive scale ⚠ Communication bottlenecks dominate at scale Profile all-reduce and overlap with computation THECODEFORGE.IO
thecodeforge.io
Distributed Training: Parallelism Strategies
Distributed Training Deep Learning

Data Parallelism: Replication, All-Reduce, and Scaling Laws

Data parallelism (DP) is the simplest form of distributed training. Each GPU holds a complete copy of the model and processes a different subset of the global batch. After the forward and backward passes, gradients are averaged across all replicas using an all-reduce collective operation. The updated weights are then synchronized, and the next iteration begins. The math is straightforward: with N GPUs, the effective batch size becomes N × local_batch_size, and the time per step ideally drops by a factor of N, ignoring communication.

The all-reduce operation is the critical performance bottleneck. A naive implementation would sum gradients on a central node and broadcast back, but this is O(N) communication. Modern frameworks use ring all-reduce, which achieves O(1) per-GPU communication cost by pipelining data across a ring topology. For a model with M parameters, ring all-reduce requires 2(N-1)/N × M × sizeof(float) bytes sent per GPU per step. For a 7B-parameter model at FP16, that's 2 × (7e9 × 2 bytes) = 28 GB per step — significant even on NVLink.

Scaling laws for DP are well-understood. The weak scaling efficiency η = T_single / (N × T_N) degrades as N increases due to communication overhead. For compute-bound models (e.g., large transformers), η > 90% is achievable up to hundreds of GPUs. For memory-bound models, the overhead is higher. The critical batch size — beyond which gradient noise reduces sample efficiency — limits DP scaling. For transformers, the optimal batch size is around 2-8 million tokens; beyond that, diminishing returns set in.

In practice, DP is implemented via PyTorch DDP (DistributedDataParallel) or FSDP (Fully Sharded Data Parallel). DDP replicates the entire model on each GPU; FSDP shards optimizer states, gradients, and parameters across GPUs, reducing memory per GPU by a factor of N. FSDP is now the default for large models because it enables training models that would otherwise exceed single-GPU memory. The trade-off is increased communication: FSDP requires all-gather and reduce-scatter operations, adding ~50% more communication than DDP.

io/thecodeforge/distributed_training/data_parallelism.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP

# Initialize process group (run with torchrun --nproc_per_node=4)
dist.init_process_group(backend='nccl')
local_rank = dist.get_rank()
torch.cuda.set_device(local_rank)

# Simple model
model = nn.Linear(1024, 1024).cuda()
ddp_model = DDP(model, device_ids=[local_rank])

# Data: each GPU gets a different slice
data = torch.randn(64, 1024).cuda(local_rank)
target = torch.randn(64, 1024).cuda(local_rank)

# Forward + backward
loss = nn.MSELoss()(ddp_model(data), target)
loss.backward()

# All-reduce happens automatically in backward
# Optimizer step (all GPUs have identical gradients)
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.01)
optimizer.step()

if local_rank == 0:
    print(f"Loss: {loss.item():.4f}")

dist.destroy_process_group()
Output
Loss: 0.9823
Gradient Sync Overhead
DDP's all-reduce is synchronous — all GPUs must finish backward before any can proceed. If one GPU is slower (straggler), the entire training stalls. Use gradient accumulation to hide latency.
Production Insight
For models >10B parameters, always use FSDP with mixed precision. Set sharding_strategy to SHARD_GRAD_OP for best memory-throughput trade-off. Monitor NCCL timeouts: if you see 'NCCL timeout' errors, increase the timeout or reduce the number of GPUs per node.
Key Takeaway
Data parallelism scales training by replicating the model across GPUs. Ring all-reduce makes it efficient, but communication overhead limits scaling to hundreds of GPUs. FSDP extends DP to larger models by sharding optimizer states.

Model Parallelism: When Your Model Exceeds GPU Memory

Model parallelism (MP) partitions a single model across multiple GPUs, with each device holding a subset of layers or parameters. This is necessary when the model's memory footprint exceeds a single GPU's capacity. For example, a 175B-parameter model at FP16 requires 350 GB of memory — far beyond the 80 GB of an H100. MP splits the model along the layer dimension: layers 1-10 on GPU 0, layers 11-20 on GPU 1, etc. During forward pass, activations flow sequentially from GPU 0 to GPU 1, and backward pass flows in reverse.

The memory equation is straightforward: if a model has L layers and each GPU holds L/N layers, the per-GPU memory is approximately (L/N) × (parameter_memory + activation_memory). However, activation memory can dominate for large batch sizes. For a transformer with hidden dimension d, sequence length s, and batch size b, the activation memory per layer is O(b × s × d). With MP, activations must be communicated between GPUs, adding latency proportional to the number of layers.

The key challenge is load balancing. If layers have different sizes (e.g., embedding layers vs. Transformer layers), some GPUs become bottlenecks. In practice, MP is often combined with tensor parallelism (TP), which splits individual matrix multiplications across GPUs. Megatron-LM popularized this: each transformer layer's attention and MLP are split across GPUs using column-wise and row-wise partitioning. This reduces communication to all-reduce on smaller tensors, improving efficiency.

MP introduces a fundamental constraint: the pipeline depth equals the number of GPUs. For a 96-layer model on 8 GPUs, each GPU handles 12 layers. The sequential dependency means that at any given time, only one GPU is active — leading to poor utilization. This is why pure MP is rarely used alone; it is combined with pipeline parallelism to keep all GPUs busy.

io/thecodeforge/distributed_training/model_parallelism.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import torch
import torch.nn as nn

class ModelParallelTransformer(nn.Module):
    def __init__(self, num_layers, hidden_dim, num_heads, num_gpus):
        super().__init__()
        self.num_gpus = num_gpus
        layers_per_gpu = num_layers // num_gpus
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            device_id = i // layers_per_gpu
            layer = nn.TransformerEncoderLayer(hidden_dim, num_heads, batch_first=True)
            self.layers.append(layer.to(f'cuda:{device_id}'))
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

# Usage
model = ModelParallelTransformer(num_layers=12, hidden_dim=768, num_heads=12, num_gpus=4)
x = torch.randn(32, 128, 768).cuda(0)
output = model(x)  # Automatically moves between GPUs
print(f"Output shape: {output.shape}")
Output
Output shape: torch.Size([32, 128, 768])
Tensor Parallelism vs. Pipeline Parallelism
Tensor parallelism splits individual operations across GPUs, reducing memory per operation. Pipeline parallelism splits layers across GPUs. Use TP for intra-node (NVLink) and PP for inter-node (InfiniBand).
Production Insight
Never use pure model parallelism for more than 8 GPUs. The sequential GPU utilization drops to 1/N. Always combine with pipeline parallelism to keep GPUs busy. For tensor parallelism, ensure all GPUs are on the same node to leverage NVLink bandwidth.
Key Takeaway
Model parallelism partitions the model across GPUs to fit large models. It is memory-efficient but suffers from low GPU utilization due to sequential execution. Combine with pipeline parallelism for production use.

Pipeline Parallelism: Micro-Batches and the 1F1B Schedule

Pipeline parallelism (PP) addresses the utilization problem of pure model parallelism by dividing the global batch into micro-batches and streaming them through the pipeline. Instead of one GPU being active at a time, multiple GPUs process different micro-batches simultaneously. The key insight: while GPU 0 processes micro-batch 1 through layers 1-4, GPU 1 can process micro-batch 0 through layers 5-8, and so on. This overlaps computation across GPUs, dramatically improving throughput.

The classic schedule is 1F1B (one forward, one backward). Each GPU alternates between forward and backward passes on different micro-batches. The schedule ensures that the pipeline is always full after an initial warm-up phase. For a pipeline with P stages and M micro-batches, the total time is (P + M - 1) × (forward_time + backward_time) / P, approaching M × (forward_time + backward_time) / P for large M. The bubble overhead — idle time during warm-up and cool-down — is (P-1)/M, which becomes negligible for M >> P.

Memory management is critical in PP. During forward pass, activations must be stored until the corresponding backward pass. With 1F1B, each GPU stores activations for at most P micro-batches simultaneously. This is a significant improvement over naive schedules that store all M micro-batches. However, for very deep pipelines (P > 32), activation memory can still be problematic. Techniques like activation recomputation (checkpointing) trade compute for memory by recomputing activations during backward.

In production, PP is typically combined with data parallelism (DP) and tensor parallelism (TP) in a 3D parallelism setup. For example, a 1000-GPU cluster might use TP=8 (within node), PP=16 (across nodes), and DP=8 (data replicas). This configuration balances communication costs: TP uses fast NVLink, PP uses slower inter-node links, and DP uses all-reduce across replicas. The optimal configuration depends on model size, batch size, and hardware topology.

io/thecodeforge/distributed_training/pipeline_parallelism.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import torch
import torch.nn as nn
from torch.distributed.pipeline.sync import Pipe

# Define a pipeline with 4 stages
class PipelineModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.stage1 = nn.Sequential(nn.Linear(1024, 1024), nn.ReLU()).cuda(0)
        self.stage2 = nn.Sequential(nn.Linear(1024, 1024), nn.ReLU()).cuda(1)
        self.stage3 = nn.Sequential(nn.Linear(1024, 1024), nn.ReLU()).cuda(2)
        self.stage4 = nn.Sequential(nn.Linear(1024, 512), nn.ReLU()).cuda(3)
    
    def forward(self, x):
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        return x

model = PipelineModel()
# Wrap with Pipe for automatic micro-batching
pipeline = Pipe(model, chunks=8)  # 8 micro-batches

# Input: batch of 64
x = torch.randn(64, 1024).cuda(0)
output = pipeline(x)
print(f"Output shape: {output.shape}")
Output
Output shape: torch.Size([64, 512])
Pipeline Bubble
Think of a pipeline as an assembly line. The first GPU starts working immediately, but the last GPU is idle until the first micro-batch reaches it. This idle time is the bubble. More micro-batches = smaller bubble.
Production Insight
Set the number of micro-batches (chunks) to at least 4× the pipeline depth to keep bubble overhead below 25%. Monitor GPU memory: if you OOM, reduce chunks or enable activation checkpointing. Use torch.distributed.pipeline.sync.Pipe for production; avoid manual implementations.
Key Takeaway
Pipeline parallelism uses micro-batches to overlap computation across GPUs, achieving near-linear scaling. The 1F1B schedule minimizes memory and bubble overhead. Combine with DP and TP for optimal throughput on large clusters.

Tensor Parallelism: Splitting Operations Across Devices

Tensor parallelism (TP) partitions individual tensor operations—like matrix multiplies or attention projections—across multiple devices. Unlike data parallelism, where each device holds a full copy of the model and processes different microbatches, TP splits the weight matrices themselves. For a linear layer with weight W of shape [out_features, in_features], you can shard along the output dimension (column-wise) or input dimension (row-wise). Each device holds only a slice, computes its partial result, and then an all-reduce or reduce-scatter + all-gather step combines the outputs. This is the core of Megatron-LM's approach for transformer layers: split the QKV projection column-wise, then split the output projection row-wise to avoid an extra all-reduce.

The communication cost of TP is high—each forward pass requires an all-reduce on the activation size, which for a hidden dimension of 4096 and sequence length 2048 is 409620482 bytes (FP16) = 16 MB per layer per all-reduce. With 8 GPUs, that's 16 MB 8 = 128 MB moved per all-reduce. On NVLink (600 GB/s), that's ~0.2 ms, but on cross-node InfiniBand (12.5 GB/s), it's ~10 ms. This is why TP is typically used within a single node (8 GPUs) and not across nodes. The math: for a transformer layer with hidden size H, TP splits the attention and MLP weights across P devices. The all-reduce time is approximately (2 H seq_len dtype_bytes) / (P bandwidth) for the forward pass, ignoring overhead. For H=4096, seq_len=2048, dtype_bytes=2, P=8, bandwidth=600 GB/s, that's (2409620482)/(8600e9) = 33,554,432 / 4.8e12 = 7e-6 seconds? Wait, that's wrong: 2Hseq_lendtype_bytes = 240962048*2 = 33,554,432 bytes = 33.5 MB. Divided by P=8 gives 4.2 MB per device. At 600 GB/s, that's 4.2e6 / 600e9 = 7e-6 s = 7 µs. But the all-reduce requires a reduce-scatter and all-gather, each of which moves that amount twice, so ~14 µs. In practice, latency and kernel launch overhead add 10-20 µs, so total ~30 µs per all-reduce. For a 32-layer model, that's ~1 ms per forward pass, acceptable.

TP is essential for models that exceed a single GPU's memory. For example, a 175B parameter model in FP16 requires 350 GB of memory. With TP=8, each GPU holds 350/8 = 43.75 GB of parameters, plus optimizer states and activations. Without TP, you'd need 8 GPUs with data parallelism only, but each GPU would still need 350 GB—impossible. TP reduces per-device memory proportionally. The trade-off: TP increases communication volume linearly with model size, so it's not free. For extremely large models (1T+), you combine TP with pipeline parallelism and data parallelism (see Hybrid Parallelism). The key insight: TP is a compute-bound strategy that trades communication for memory, and it works best when the model's hidden dimension is large enough to keep each device's compute utilization high (matrix multiply of size [microbatch, H/P] x [H/P, H] is still efficient for H/P >= 1024).

io/thecodeforge/tensor_parallel_linear.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F

class ColumnParallelLinear(nn.Module):
    """Column-wise split: each device holds out_features / world_size columns."""
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        self.world_size = world_size
        self.rank = rank
        self.out_features_per_rank = out_features // world_size
        # Each rank owns a slice of the weight: [in_features, out_features_per_rank]
        self.weight = nn.Parameter(torch.randn(in_features, self.out_features_per_rank))
        self.bias = nn.Parameter(torch.zeros(self.out_features_per_rank))

    def forward(self, x):
        # x: [batch, seq, in_features]
        local_out = F.linear(x, self.weight, self.bias)  # [batch, seq, out_features_per_rank]
        # All-gather to combine outputs across ranks
        out = [torch.zeros_like(local_out) for _ in range(self.world_size)]
        dist.all_gather(out, local_out)
        return torch.cat(out, dim=-1)  # [batch, seq, out_features]

# Usage (assumes dist.init_process_group called elsewhere)
# world_size = 4, rank = 0..3
# model = ColumnParallelLinear(4096, 4096, world_size=4, rank=dist.get_rank())
# x = torch.randn(2, 1024, 4096)
# y = model(x)  # all-gather happens internally
Output
Output shape: [2, 1024, 4096] after all-gather. Each rank computes 1024 columns.
TP Communication Overhead Is Not Free
Tensor parallelism introduces an all-reduce per transformer layer. For models with many layers (e.g., 96), the cumulative communication can dominate if not on high-bandwidth intra-node interconnects (NVLink). Always profile the all-reduce time vs. Compute time.
Production Insight
In production, never use TP across nodes—stick to 8 GPUs per node with NVLink. For cross-node, use pipeline parallelism or data parallelism. Also, fuse the all-reduce with the backward pass to overlap communication and computation using torch.distributed.all_reduce with async_op and CUDA streams.
Key Takeaway
Tensor parallelism splits weight matrices across devices, reducing per-GPU memory proportionally. It requires high-bandwidth intra-node communication (NVLink) and adds an all-reduce per layer. Use it for models that don't fit on a single GPU, but combine with other parallelism for extreme scale.

Hybrid Parallelism: Combining Strategies for Massive Models

Hybrid parallelism—also called 3D parallelism—combines data parallelism (DP), pipeline parallelism (PP), and tensor parallelism (TP) to train models with hundreds of billions of parameters. The standard recipe from Megatron-LM and DeepSpeed: use TP within a node (8 GPUs), PP across nodes (each node holds a stage of the pipeline), and DP across multiple pipeline replicas. For a 1T parameter model, you might have TP=8, PP=64, DP=8, totaling 8648 = 4096 GPUs. The memory savings are multiplicative: each GPU holds (total_params / (TP * PP)) + optimizer states / DP. For 1T FP16 params (2 TB), with TP=8, PP=64, each GPU holds 2 TB / 512 = 4 GB of parameters. Add optimizer states (mixed precision: 16 bytes per param) = 16 TB / 512 = 32 GB, total ~36 GB per GPU, feasible on 40 GB A100s.

The communication pattern is hierarchical: TP all-reduces happen on intra-node NVLink (fast), PP point-to-point sends happen across nodes (slower, but only once per microbatch), and DP all-reduces happen across replicas (also across nodes, but only for gradients). The key challenge is balancing the pipeline stages to minimize idle time (pipeline bubbles). For a pipeline with P stages and M microbatches, the bubble overhead is (P-1)/(M+P-1). With P=64 and M=32, that's 63/(95) = 66% bubble—unacceptable. To reduce it, use interleaved scheduling (1F1B) where each device processes multiple microbatches in a staggered pattern, reducing bubble to ~ (P-1)/(M) for large M. With M=128, bubble = 63/128 = 49%, still high. Better: use virtual pipelines (DeepSpeed) or increase DP to reduce PP depth. For example, with DP=64, PP=8, M=128, bubble = 7/135 = 5.2%.

Memory management in hybrid parallelism is complex. Each device must allocate memory for: parameters (sharded by TP and PP), optimizer states (sharded by DP and TP), activations (for recomputation), and communication buffers. Activation memory is the biggest killer: for a transformer with H=4096, L=96, seq_len=2048, batch=1, activation memory per layer is ~ (seq_len H 34) bytes in mixed precision (34 is a rough multiplier for attention + MLP). That's 2048409634 = 285 MB per layer, times 96 = 27 GB. With TP=8, it's 27/8 = 3.4 GB per GPU. But with PP, each device holds only L/PP layers, so with PP=8, it's 27/8/8 = 0.42 GB. Activation recomputation (checkpointing) can further reduce it by storing only a few layers' activations and recomputing the rest during backward. The trade-off: recomputation adds ~33% compute overhead.

io/thecodeforge/hybrid_parallel_setup.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch
import torch.distributed as dist
from torch.distributed.pipeline.sync import Pipe
from torch.nn.parallel import DistributedDataParallel as DDP

# Assume: world_size = 64, rank = 0..63
# TP group size = 8 (intra-node), PP group size = 8 (across nodes), DP group size = 64 / (8*8) = 1? No, adjust.
# Let's say total GPUs = 64, TP=4, PP=4, DP=4 => groups: TP on same node, PP across nodes, DP across replicas.

# Pseudo-setup (simplified):
# 1. Initialize process groups
# dist.init_process_group(backend='nccl')
# rank = dist.get_rank()
# local_rank = rank % 8  # within node

# 2. Create TP group (intra-node)
# tp_group = dist.new_group(ranks=[local_rank + 8*i for i in range(4)])  # 4 GPUs per node

# 3. Create PP group (across nodes, same local rank)
# pp_group = dist.new_group(ranks=[local_rank + 8*j for j in range(4)])  # 4 nodes

# 4. Create DP group (remaining)
# dp_group = dist.new_group(ranks=[r for r in range(64) if r not in tp_group and r not in pp_group])  # simplified

# 5. Build model with TP and PP
# from megatron.model import GPTModel
# model = GPTModel(...)  # internally uses TP
# model = Pipe(model, chunks=8)  # PP with 8 microbatches

# 6. Wrap with DDP for DP
# model = DDP(model, process_group=dp_group)

# 7. Training loop
# for batch in dataloader:
#     output = model(batch)
#     loss = output.mean()
#     loss.backward()
#     optimizer.step()

print("Hybrid parallelism setup requires careful group creation. See Megatron-LM source for full implementation.")
Output
No direct output; conceptual setup.
Pipeline Bubble Math
For P pipeline stages and M microbatches, the bubble fraction is (P-1)/(M+P-1). To keep bubble < 10%, you need M >= 9*(P-1). For P=64, that's M >= 567 microbatches, which may exceed memory. Use interleaved scheduling or reduce P by increasing DP.
Production Insight
Always profile the pipeline bubble with your actual model and batch size. Use DeepSpeed's '--pipeline-parallel-size' and '--model-parallel-size' flags to experiment. Monitor the 'pipe_bubble' metric in logs. If bubble > 20%, increase microbatches or reduce PP depth.
Key Takeaway
Hybrid parallelism (3D) combines TP, PP, and DP to train models beyond single-GPU memory. It requires careful group creation and scheduling to minimize pipeline bubbles. The memory savings are multiplicative, but communication and scheduling complexity increase. Use frameworks like Megatron-LM or DeepSpeed that handle the orchestration.

Communication Bottlenecks: Profiling and Optimizing NCCL

NCCL (NVIDIA Collective Communications Library) underpins distributed training on GPUs. It implements all-reduce, all-gather, reduce-scatter, and broadcast using ring, tree, and NVLink algorithms. The key bottleneck is bandwidth saturation: for a ring all-reduce on P GPUs, the total data moved is 2(P-1)/P message_size, and the time is roughly (2(P-1)/P message_size) / bandwidth. For 8 GPUs on NVLink (600 GB/s), a 128 MB all-reduce takes ~ (27/8 128e6) / 600e9 = (224e6) / 600e9 = 0.37 ms. But on 4 nodes with InfiniBand (12.5 GB/s per link), the same all-reduce takes (224e6) / 12.5e9 = 17.9 ms—50x slower. This is why TP is intra-node only.

Profiling NCCL operations: use nsys profile (NVIDIA Nsight Systems) to trace NCCL kernels. Look for gaps between compute kernels and NCCL kernels—these indicate synchronization overhead. The key metric is 'NCCL bandwidth utilization' as a percentage of theoretical peak. If it's below 50%, you have a bottleneck. Common causes: (1) Small message sizes: NCCL's ring algorithm is inefficient for messages < 1 MB because the startup latency dominates. For small all-reduces (e.g., gradient norms), use torch.distributed.all_reduce with op=dist.ReduceOp.SUM but consider fusing multiple small tensors into one large buffer. (2) Topology mismatch: NCCL uses a default algorithm based on PCIe topology. On multi-node setups, ensure NCCL uses InfiniBand (set NCCL_IB_DISABLE=0) and that the network interfaces are on the same NUMA node as the GPUs. (3) Overlapping communication and computation: Use torch.cuda.Stream to run NCCL operations concurrently with compute. For example, in the backward pass, you can start the gradient all-reduce for layer N while computing gradients for layer N-1.

Optimization techniques: (1) Gradient accumulation: Increase batch size per GPU to reduce the frequency of all-reduces. (2) Gradient compression: Use 1-bit or 2-bit compression (e.g., DeepSpeed's 1-bit Adam) to reduce message size. (3) Topology-aware grouping: Use torch.distributed.new_group with ranks that share the same switch to minimize cross-switch traffic. (4) NCCL tuning: Set NCCL_ALGO=Ring or NCCL_ALGO=Tree depending on message size. For large messages (> 16 MB), tree algorithm can be faster. (5) Use torch.distributed.barrier sparingly—it serializes all GPUs and kills throughput.

Concrete profiling command: nsys profile -t nvtx,cuda,nccl -o trace -w true python train.py. Analyze the timeline: if you see long NCCL kernel gaps, reduce message size or increase overlap. If you see 'NCCL WARN' messages about network timeouts, check your InfiniBand configuration (e.g., ibstatus).

io/thecodeforge/nccl_profiling_overlap.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import torch
import torch.distributed as dist
import torch.cuda as cuda

def overlapped_all_reduce(tensor, group, stream=None):
    """Perform all-reduce on a separate stream to overlap with compute."""
    if stream is None:
        stream = cuda.Stream()
    with cuda.stream(stream):
        dist.all_reduce(tensor, group=group)
    return stream

# Example: overlap gradient all-reduce with next layer's backward
# Assume model has layers 0..N-1
# for i in reversed(range(N)):
#     layer = model.layers[i]
#     loss.backward(retain_graph=True)  # compute gradients for layer i
#     if i > 0:
#         # Start all-reduce for layer i's gradients while computing layer i-1
#         stream = overlapped_all_reduce(layer.weight.grad, group)
#         # Continue backward for layer i-1
#     else:
#         # Last layer: wait for all streams
#         cuda.synchronize()

print("Overlap pattern: all-reduce on separate stream, synchronize at end.")
Output
No direct output; pattern for overlapping communication.
NCCL as a Network Calculus Problem
Think of NCCL all-reduce as a bandwidth-limited operation: time = (2 (P-1)/P message_size) / bandwidth. The constant 2*(P-1)/P approaches 2 for large P. So for 128 GPUs, you move ~2x the message size. Optimizing NCCL is about reducing message size (gradient compression) or increasing effective bandwidth (topology).
Production Insight
Set NCCL_DEBUG=INFO in production to log all NCCL operations. Watch for 'NCCL WARN' messages about 'NET/IB' timeouts—they indicate network congestion. Also, set NCCL_IB_TIMEOUT=22 (default 22) to avoid false timeouts on large clusters. Always use torch.cuda.set_device(local_rank) before initializing NCCL to ensure correct GPU affinity.
Key Takeaway
NCCL bottlenecks are bandwidth-dominated. Profile with nsys, overlap communication with compute using CUDA streams, and tune NCCL algorithms based on message size. For cross-node, use gradient accumulation and compression to reduce traffic. Monitor NCCL logs for timeouts and topology issues.

Production Patterns: Debugging, Monitoring, and Incident Response

Distributed training in production fails in predictable ways: NCCL timeouts, out-of-memory (OOM) errors, gradient explosion, and silent data corruption (SDC). The first line of defense is structured logging and monitoring. Use torch.distributed.barrier() only for synchronization points (e.g., before checkpointing) and log the time spent. For NCCL, set NCCL_DEBUG=WARN to capture errors without flooding logs. Use Prometheus metrics (via torch.distributed.metrics) to track: (1) all-reduce time per step, (2) gradient norm, (3) loss value, (4) GPU memory usage. Set alerts for gradient norm > 1000 (explosion) or loss NaN.

Debugging OOM: Use torch.cuda.memory_summary() to see memory allocation by tensor. Common culprits: activation memory (use checkpointing), optimizer states (use ZeRO stages), and communication buffers (reduce TP or PP). For gradient explosion, add gradient clipping: torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0). For silent data corruption (bit flips), use ECC memory (always on A100/H100) and add checksums for checkpoint files. For NCCL timeouts, increase NCCL_TIMEOUT (default 30s) to 120s for large models, and ensure network interfaces are up (ibstatus).

Incident response playbook: (1) Loss spikes: check learning rate schedule, data pipeline (shuffle order), and gradient norm. If gradient norm is normal, it's a data issue. (2) NCCL timeout: check dmesg for GPU errors, nvidia-smi for ECC errors, and ibstatus for link status. Restart from last checkpoint. (3) OOM: reduce batch size, enable activation checkpointing, or increase ZeRO stage. (4) Hang: use torch.distributed.monitor to detect stuck ranks. Kill and restart with --resume-from-checkpoint. Always save checkpoints every N steps (N=1000 for large models) with torch.save(model.state_dict(), f'checkpoint_{step}.pt') and include optimizer state.

Monitoring infrastructure: Use Weights & Biases or MLflow for loss curves. Use nvidia-smi dmon for GPU metrics (power, temp, memory). Use ibstat for InfiniBand bandwidth. For large clusters (1000+ GPUs), use SLURM job arrays with --exclusive to avoid interference. Set OMP_NUM_THREADS=1 to prevent thread contention. For reproducibility, set torch.manual_seed(42) and torch.backends.cudnn.deterministic=True (slower but deterministic).

io/thecodeforge/production_monitoring.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import torch
import torch.distributed as dist
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def train_step(model, batch, optimizer, grad_clip=1.0):
    start = time.time()
    loss = model(batch)
    loss.backward()
    
    # Gradient clipping
    total_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
    if total_norm > 1000:
        logger.warning(f"Gradient norm spike: {total_norm}")
    
    # All-reduce gradients (handled by DDP, but log time)
    all_reduce_start = time.time()
    # DDP does this internally; we just measure
    optimizer.step()
    optimizer.zero_grad()
    all_reduce_time = time.time() - all_reduce_start
    
    step_time = time.time() - start
    logger.info(f"Step time: {step_time:.3f}s, All-reduce: {all_reduce_time:.3f}s, Loss: {loss.item():.4f}, Grad norm: {total_norm:.2f}")
    
    # Check for NaN
    if torch.isnan(loss).any():
        logger.error("NaN loss detected!")
        raise RuntimeError("NaN loss")
    
    return loss.item()

# Usage: wrap model in DDP, then call train_step per batch
Output
Example log: 'Step time: 0.234s, All-reduce: 0.045s, Loss: 2.3456, Grad norm: 0.87'
Checkpointing Strategy for Large Models
Save optimizer state and RNG state alongside model weights. Use torch.save({'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'rng_state': torch.get_rng_state()}, path). For multi-GPU, each rank saves its own shard (e.g., with ZeRO).
Production Insight
Set up a dead-man's switch: a cron job that checks if the training process is alive and if loss hasn't changed for 10 minutes. If stuck, kill and restart from last checkpoint. Also, use torch.distributed.elastic for fault-tolerant training with automatic restarts.
Key Takeaway
Production distributed training requires proactive monitoring (loss, grad norm, NCCL time), structured logging, and a clear incident response playbook. Use gradient clipping, checkpointing every N steps, and fault-tolerant frameworks. Always test on a small cluster before scaling to thousands of GPUs.
● Production incidentPOST-MORTEMseverity: high

The 8-GPU Training That Took 3 Weeks Instead of 2 Days

Symptom
Training throughput was 10x slower than expected; GPU utilization hovered at 30%.
Assumption
The team assumed data parallelism would work because the model 'fit' in GPU memory (they saw no OOM errors).
Root cause
PyTorch's default behavior offloaded some parameters to CPU when memory was tight, causing constant CPU-GPU transfers that dominated training time.
Fix
Switched to model parallelism (FSDP with sharding strategy) to distribute parameters across GPUs, eliminating CPU offloading and achieving 90% GPU utilization.
Key lesson
  • Always verify that the entire model (parameters, gradients, optimizer states) fits in GPU memory, not just the forward pass.
  • Profile memory usage with tools like torch.cuda.memory_summary() before scaling.
  • Use model parallelism or ZeRO when model size exceeds 70% of GPU memory to avoid silent offloading.
Production debug guideQuick actions for common production issues4 entries
Symptom · 01
Low GPU utilization (<50%)
Fix
Check if pipeline bubble is too large (increase micro-batches) or if communication is blocking (profile NCCL timings).
Symptom · 02
OOM errors during training
Fix
Reduce batch size, enable gradient checkpointing, or switch to model parallelism (FSDP/tensor parallelism).
Symptom · 03
Training loss diverges or doesn't converge
Fix
Verify that gradient synchronization is correct (check all-reduce implementation) and that learning rate is scaled appropriately for batch size.
Symptom · 04
Slow inter-node communication
Fix
Check network bandwidth (ib_write_bw for InfiniBand), ensure NCCL uses the correct network interface, and consider gradient compression.
★ Quick Debug Cheat SheetImmediate actions for the three most common distributed training failures
GPU OOM
Immediate action
Reduce batch size by 50% and enable gradient accumulation.
Commands
torch.cuda.empty_cache()
torch.cuda.memory_summary()
Fix now
Switch to FSDP with sharding_strategy=ShardingStrategy.SHARD_GRAD_OP
Training stalls (no progress)+
Immediate action
Check if any worker is hanging (NCCL timeout).
Commands
export NCCL_DEBUG=INFO
torch.distributed.barrier()
Fix now
Set timeout in torch.distributed.init_process_group(backend='nccl', timeout=datetime.timedelta(seconds=1800))
Loss not decreasing+
Immediate action
Verify gradient norms are non-zero and not exploding.
Commands
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
wandb.log({'grad_norm': torch.nn.utils.get_total_norm(model.parameters())})
Fix now
Reduce learning rate by 10x and check data loading for corruption
Parallelism Strategies at a Glance
StrategyMemory per DeviceCommunication PatternBest ForKey Limitation
Data ParallelismFull model + batchAll-reduce (gradients)Models fitting on one GPUCommunication overhead at scale
Pipeline Parallelism1/N model layersPoint-to-point (stage boundaries)Very deep modelsPipeline bubble idle time
Tensor Parallelism1/N of each layerAll-to-all (per operation)Large hidden dimensionsRequires high-bandwidth intra-node
Hybrid (DP+PP+TP)Fraction of modelMixed (all-reduce, P2P, all-to-all)Massive models (100B+ params)Complex scheduling and tuning

Key takeaways

1
Data parallelism is simple but limited by model size and communication overhead from gradient sync.
2
Model parallelism is mandatory when a single GPU cannot hold the model, but introduces pipeline bubbles.
3
Pipeline parallelism with micro-batching can achieve near-linear scaling if the number of stages matches the pipeline depth.
4
Tensor parallelism reduces memory per operation but requires high-bandwidth intra-node connections (NVLink).
5
Always profile communication vs. Computation to identify the true bottleneck before choosing a strategy.

Common mistakes to avoid

4 patterns
×

Using data parallelism when the model doesn't fit on one GPU

Symptom
Out-of-memory (OOM) errors or swapping to CPU, causing extreme slowdowns.
Fix
Switch to model parallelism or ZeRO to shard the model across devices.
×

Not tuning the number of micro-batches in pipeline parallelism

Symptom
Low GPU utilization (many idle devices) and poor throughput scaling.
Fix
Increase micro-batches to at least 4x the number of pipeline stages to fill the pipeline.
×

Ignoring network topology when choosing parallelism strategy

Symptom
High communication latency between nodes, negating speedup from parallelism.
Fix
Use tensor parallelism only within a node (NVLink); use data/pipeline parallelism across nodes with high-bandwidth interconnects (e.g., InfiniBand).
×

Assuming linear scaling with more GPUs

Symptom
Diminishing returns or even slowdown beyond a certain number of GPUs.
Fix
Profile communication overhead and compute utilization. Use scaling laws to find the optimal number of devices for your model and batch size.
INTERVIEW PREP · PRACTICE MODE

Interview Questions on This Topic

Q01SENIOR
Explain the difference between synchronous and asynchronous gradient upd...
Q02SENIOR
Describe the pipeline bubble in GPipe-style pipeline parallelism. How do...
Q03SENIOR
What is the role of the all-reduce operation in distributed training? Na...
Q01 of 03SENIOR

Explain the difference between synchronous and asynchronous gradient updates in data parallelism. Which is more common in production and why?

ANSWER
Synchronous gradient updates wait for all workers to finish their forward/backward pass before averaging gradients and updating the model. This ensures consistent model weights across workers but introduces a straggler problem. Asynchronous updates allow workers to update the model independently, which can be faster but leads to stale gradients and potential convergence issues. In production, synchronous all-reduce (e.g., with gradient accumulation to hide latency) is more common because it guarantees deterministic training and better convergence, especially for large models.
FAQ · 4 QUESTIONS

Frequently Asked Questions

01
When should I use data parallelism vs. Model parallelism?
02
What is the main bottleneck in distributed training?
03
How do I choose the number of pipeline stages?
04
What is the difference between pipeline parallelism and tensor parallelism?
N
Naren Founder & Principal Engineer

20+ years shipping production Java in banking & fintech. Every example here is drawn from a real system.

Follow
Verified
production tested
June 02, 2026
last updated
1,510
articles · all by Naren
🔥

That's MLOps. Mark it forged?

14 min read · try the examples if you haven't

Previous
ML Pipeline Orchestration with Airflow and Prefect
12 / 14 · MLOps
Next
Testing Machine Learning Systems