Multi-GPU Training Architectures
The Production Crisis at 3 AM
The alert fires at 3:17 AM. Your team's flagship language model - 70 billion parameters, 6 weeks into training on 512 H100s - has stalled. GPU utilization has dropped from 82% to 31% across the entire cluster. The training loss curve on your dashboard looks like a cliff edge.
You SSH into the head node. nvidia-smi shows all GPUs active but idle-busy, not compute-busy. You pull the NCCL logs. The all-reduce collective for gradient synchronization is taking 4.1 seconds per step instead of the usual 0.4 seconds. Ten times slower. You dig further: one of the InfiniBand switch links in rack 7 has silently degraded to half bandwidth. All 512 GPUs are waiting on that single bottleneck.
You reroute traffic, the step time drops back to 0.38 seconds, and the cluster returns to life. But you have lost three hours of training time - roughly $4,200 at cloud H100 rates. And this was a network failure, the kind you can diagnose. The harder failures are architectural: wrong parallelism strategy for model size, communication volume that dominates compute, pipeline bubbles eating 40% of your compute budget.
That scenario plays out every week at every serious AI lab. GPT-4 did not train on one GPU. Llama 3 405B did not train on one machine. The moment a model exceeds the memory of a single GPU - roughly 80 GB for H100 SXM - you need to split the work across hardware. The question is not whether to distribute. The question is how: and the wrong answer costs millions of dollars and weeks of wall-clock time.
This lesson is about the three fundamental strategies for distributing model training across multiple GPUs: data parallelism, tensor parallelism, and pipeline parallelism. Each solves a different constraint. Each has a different communication cost. And the best practitioners know how to combine all three - what researchers call 3D parallelism - to saturate hundreds or thousands of GPUs efficiently.
Why This Exists - The Single-GPU Wall
Before multi-GPU training, researchers hit a hard wall in 2016-2017. ResNet-50 fit on a single GPU. VGG-16 barely fit. But the models researchers wanted to train - GPT-1 at 117M parameters was a shock to the system in 2018 - were pushing past the memory capacity of any single accelerator available.
The naive approach was to train on CPU, or to train on a single GPU but with very small batch sizes. Both failed in practice. CPU training for modern deep networks is 100-1000x slower than GPU training. And very small batch sizes create training instability - the gradient noise is so high that the optimizer makes poor update decisions, and you need many more steps to converge, which further compounds the wall-clock time problem.
The "embarrassingly parallel" observation came first: if you have 8 GPUs and a batch of 8192 samples, each GPU can process 1024 samples independently and then share gradients. This is data parallelism, and it works beautifully for small models. The problem appears when the model itself does not fit in one GPU's memory. A GPT-3 style model with 175B parameters requires roughly 350 GB just for the weights in FP16 - more than four H100s worth of memory for weights alone, before activations or optimizer states.
That is the problem multi-GPU training architectures solve: distributing both computation and memory across hardware so that models that physically cannot exist on one device can still be trained efficiently.
Historical Context - How We Got Here
The first serious multi-GPU training paper that got traction was the 2012 AlexNet paper by Krizhevsky et al. at the University of Toronto. AlexNet used two GTX 580 GPUs with 3 GB each, splitting the model across them - a form of primitive model parallelism. But this was ad hoc, not systematic.
The systematic understanding came from Google in 2016 with the DistBelief paper's successor work, and then crystallized in the 2017 "Accurate, Large Minibatch SGD" paper from Facebook AI Research (Goyal et al.). That paper showed you could train ResNet-50 in 1 hour using 256 GPUs with data parallelism, as long as you scaled the learning rate linearly with batch size. This was the "linear scaling rule" and it unlocked industrial-scale data parallel training.
Tensor parallelism was formalized by the Megatron-LM team at NVIDIA in 2019 (Shoeybi et al.). The key insight - that transformer attention heads and MLP weight matrices could be split across GPUs along specific dimensions with minimal communication overhead - made it possible to train models with billions of parameters across machines. The original Megatron paper trained an 8.3B parameter transformer, which was enormous for 2019.
Pipeline parallelism at scale was described in the GPipe paper (Huang et al., Google Brain, 2019) and PipeDream (Narayanan et al., Microsoft/Stanford, 2019). GPipe introduced micro-batching to hide pipeline bubbles. PipeDream introduced 1F1B (one-forward-one-backward) scheduling to reduce the memory cost of in-flight activations.
The synthesis of all three - 3D parallelism - was published by the DeepSpeed team at Microsoft in 2021 (Narayanan et al.) and shown to scale to over 1 trillion parameter models. This is the foundation that underlies every serious large model training run today.
Core Concepts - Intuition First
Data Parallelism: Copy Everything, Synchronize Gradients
Imagine you are baking 100 identical loaves of bread. You have 8 bakers. The simplest strategy: give each baker the full recipe (the model weights), split the 100 loaves across all 8 bakers, and at the end of the day, average what each baker learned about how to improve the recipe.
That is data parallelism. Each GPU holds a complete copy of the model. The training batch is split across GPUs. Each GPU runs forward pass, computes loss, runs backward pass to get gradients. Then all GPUs average their gradients via an all-reduce collective operation. Each GPU applies the averaged gradient to update its local copy of the weights.
The key property: the model on every GPU stays identical after every step. They start identical, they process different data, they compute different gradients, and then the all-reduce brings the gradients to consensus.
When it works: the model fits in one GPU's memory. You have many GPUs. You want to process large batches fast.
When it breaks: the model is too large for a single GPU. Or the model has so many parameters that the gradient communication volume overwhelms the network.
Tensor Parallelism: Split Individual Layers
Now imagine the recipe is so large it does not fit on one baker's workbench. You need to literally split the recipe across two bakers who work side by side. When they need to compute something, they each do half the computation and share intermediate results.
That is tensor parallelism. Individual weight matrices are split across GPUs. A transformer's MLP block has two linear layers: where and . In Megatron-LM style tensor parallelism, is split column-wise across GPUs and is split row-wise. Each GPU does half the computation, and the results are summed via an all-reduce.
Column splitting: where each . GPU 1 computes , GPU 2 computes .
Row splitting: where each . GPU 1 computes , GPU 2 computes . The final result requires an all-reduce.
This pairing - column split in the first matrix, row split in the second - is the Megatron trick. It requires exactly one all-reduce at the end of the MLP block (and one more for the attention block), minimizing communication to two all-reduces per transformer layer.
When it works: individual layers are too large for one GPU. You have NVLink-connected GPUs with very high bandwidth (600 GB/s NVLink 4.0). The all-reduce latency inside the layer is small relative to compute.
When it breaks: GPUs are connected only by PCIe or slow Ethernet. The all-reduce inside each layer adds up across hundreds of layers and dominates total step time.
Pipeline Parallelism: Assembly Line the Layers
Now imagine you have 4 bakers in sequence, each specializing in a different phase: mixing, kneading, baking, decorating. Each baker only knows their part of the recipe. Bread flows through as a pipeline.
Pipeline parallelism assigns consecutive groups of layers to different GPUs. GPU 1 runs layers 1-8, GPU 2 runs layers 9-16, GPU 3 runs layers 17-24, GPU 4 runs layers 25-32. The forward pass flows through all GPUs in sequence: GPU 1 computes its layers, sends activations to GPU 2, which computes its layers, sends to GPU 3, and so on. The backward pass flows in reverse.
The fundamental problem with naive pipeline parallelism is the "pipeline bubble". While GPU 4 is doing the forward pass on microbatch 1, GPUs 1, 2, 3 are idle. While GPU 1 is doing the backward pass on microbatch 1, GPUs 2, 3, 4 are idle. The fraction of time wasted in bubbles for a pipeline of depth with microbatches is:
With stages and microbatch, bubble fraction is 75%. With microbatches, it drops to 37.5%. With , it is 9.1%. So you inject many small microbatches to keep the pipeline full.
When it works: the model is very deep. Layers can be grouped into stages of roughly equal compute. You have enough microbatches to amortize the bubble cost.
When it breaks: layers have very unequal compute (the last stage is a bottleneck). The number of microbatches is small (e.g., batch size constraints). The pipeline is very deep (many stages).
The Math of Communication Volume
Understanding when parallelism strategies become communication-bound requires computing exactly how many bytes must move across the network per training step.
Data Parallelism Communication
In data parallelism with GPUs, each GPU computes full gradients for all parameters. The all-reduce operation must aggregate these across all GPUs.
With the ring all-reduce algorithm (used by NCCL), the total data moved per GPU is:
For FP32 gradients ( bytes each) with 175B parameters and GPUs:
At HDR InfiniBand bandwidth of 200 Gb/s (25 GB/s), this all-reduce takes roughly 55 seconds. That is completely communication-bound. This is why vanilla data parallelism does not scale to very large models.
Tensor Parallelism Communication
In Megatron-style tensor parallelism with tensor-parallel GPUs, a transformer layer requires 2 all-reduce operations per forward pass and 2 per backward pass. Each all-reduce moves activations of shape where is batch size, is sequence length, and is hidden dimension.
For a model with hidden dim 8192, batch 4, sequence 2048, in BF16 (2 bytes):
Across 96 transformer layers: 51 GB per training step. At NVLink 4.0 bandwidth of 900 GB/s bidirectional (450 GB/s unidirectional), this takes about 0.11 seconds - fast enough to overlap with compute.
Pipeline Parallelism Communication
Pipeline parallelism only communicates activations between adjacent pipeline stages. The communication per microbatch crossing a stage boundary is:
Where is microbatch size. For microbatch size 1, sequence 2048, hidden 8192, BF16: MB per stage crossing. With 4 pipeline stages, that is 3 stage crossings in the forward pass. Pipeline communication is the cheapest of the three parallelism strategies - which is why it can work over slower inter-node connections.
3D Parallelism: Combining All Three
The Megatron-DeepSpeed 3D parallelism paper showed that you can compose all three strategies simultaneously. The standard configuration for a 1T parameter model training on 3072 GPUs was:
- Tensor parallelism: (within a node, over NVLink)
- Pipeline parallelism: (across nodes, using InfiniBand)
- Data parallelism: (across pipeline groups, using InfiniBand)
Total GPUs: .
The hierarchy maps onto hardware: tensor parallelism is most communication-intensive, so it lives within the NVLink domain of a single DGX node. Pipeline parallelism crosses node boundaries but communicates less frequently (once per microbatch per stage). Data parallelism also crosses node boundaries but communicates only once per full forward-backward pass.
3D Parallelism Hierarchy:
Within node (NVLink, 900 GB/s):
GPU 0, 1, 2, 3, 4, 5, 6, 7 -> tensor parallel group
Across nodes in a pipeline (InfiniBand, 400 Gb/s):
Node 0 (layers 1-4) -> Node 1 (layers 5-8) -> ... -> Node 7 (layers 29-32)
Across pipeline replica groups (InfiniBand):
Replica 0 (nodes 0-7) <-> Replica 1 (nodes 8-15) <-> ... -> all-reduce gradients
Code Examples
PyTorch DDP - Data Parallel Training
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
def setup(rank, world_size):
"""Initialize the distributed process group."""
dist.init_process_group(
backend="nccl", # NCCL for GPU-GPU communication
init_method="env://", # reads MASTER_ADDR, MASTER_PORT from env
world_size=world_size,
rank=rank
)
torch.cuda.set_device(rank)
def cleanup():
dist.destroy_process_group()
def train_ddp(rank, world_size, model_fn, dataset):
setup(rank, world_size)
# Each process gets its own device
device = torch.device(f"cuda:{rank}")
# Create model and move to device
model = model_fn().to(device)
# Wrap with DDP - this hooks into backward pass to trigger all-reduce
# find_unused_parameters=False is faster when all params participate
model = DDP(
model,
device_ids=[rank],
find_unused_parameters=False,
# gradient_as_bucket_view=True reduces memory by 1/3
gradient_as_bucket_view=True
)
# DistributedSampler ensures each GPU sees non-overlapping data
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
loader = torch.utils.data.DataLoader(
dataset,
batch_size=64, # per-GPU batch size
sampler=sampler,
num_workers=4,
pin_memory=True # speeds up host-to-device transfer
)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
# Important: set epoch so sampler shuffles differently each epoch
sampler.set_epoch(epoch)
for batch_idx, (inputs, targets) in enumerate(loader):
inputs = inputs.to(device, non_blocking=True)
targets = targets.to(device, non_blocking=True)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward() # DDP hooks trigger all-reduce here
# Gradient clipping before optimizer step
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
if rank == 0 and batch_idx % 100 == 0:
print(f"Epoch {epoch}, Step {batch_idx}, Loss: {loss.item():.4f}")
cleanup()
# Launch: torchrun --nproc_per_node=8 train.py
# Or with torch.multiprocessing.spawn:
# mp.spawn(train_ddp, args=(8, model_fn, dataset), nprocs=8)
PyTorch FSDP - Fully Sharded Data Parallel
FSDP shards both parameters AND gradients AND optimizer states across GPUs. This is the key difference from DDP: DDP replicates all of these, FSDP shards them.
import torch
import torch.distributed as dist
from torch.distributed.fsdp import (
FullyShardedDataParallel as FSDP,
MixedPrecision,
BackwardPrefetch,
ShardingStrategy,
CPUOffload
)
from torch.distributed.fsdp.wrap import (
transformer_auto_wrap_policy,
size_based_auto_wrap_policy
)
from transformers import LlamaForCausalLM, LlamaConfig
import functools
def setup_fsdp_training(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
# Mixed precision policy: compute in BF16, masters in FP32
mp_policy = MixedPrecision(
param_dtype=torch.bfloat16,
reduce_dtype=torch.bfloat16, # gradients in BF16 during all-reduce
buffer_dtype=torch.bfloat16
)
# Wrap policy: each transformer layer is its own FSDP unit
# This means each layer's parameters are sharded across all GPUs
from transformers.models.llama.modeling_llama import LlamaDecoderLayer
auto_wrap_policy = functools.partial(
transformer_auto_wrap_policy,
transformer_layer_cls={LlamaDecoderLayer}
)
config = LlamaConfig(
hidden_size=4096,
intermediate_size=11008,
num_hidden_layers=32,
num_attention_heads=32,
vocab_size=32000
)
with torch.device("meta"):
# Create model on meta device - no actual memory allocation
model = LlamaForCausalLM(config)
# FSDP with FULL_SHARD: shards params, grads, optimizer states
# This is ZeRO Stage 3 semantics
model = FSDP(
model,
sharding_strategy=ShardingStrategy.FULL_SHARD,
mixed_precision=mp_policy,
auto_wrap_policy=auto_wrap_policy,
backward_prefetch=BackwardPrefetch.BACKWARD_PRE,
# param_init_fn allows materializing from meta device per-shard
param_init_fn=lambda module: module.to_empty(
device=torch.device("cuda"),
recurse=False
),
device_id=torch.cuda.current_device(),
)
return model
# Key FSDP behaviors:
# - Forward pass: each FSDP unit all-gathers its full params before compute,
# then immediately discards them to save memory
# - Backward pass: all-gather params again for gradient computation,
# then reduce-scatter the gradients so each GPU holds its gradient shard
# - Optimizer step: each GPU only updates its own parameter shard
# Memory comparison for a 7B parameter model on 8 GPUs:
# DDP: 7B params x 4 bytes (FP32) x 3 (param+grad+adam) = 84 GB per GPU
# FSDP: 84 GB / 8 GPUs = 10.5 GB per GPU (+ activation memory)
Megatron-LM Style Tensor Parallelism
import torch
import torch.nn as nn
import torch.distributed as dist
class ColumnParallelLinear(nn.Module):
"""
Split weight matrix column-wise across tensor-parallel GPUs.
Input X is replicated on all GPUs.
Output is split: each GPU holds a chunk of the output features.
For an MLP: Y = GeLU(X * A) where A is split column-wise.
Each GPU computes: Y_i = GeLU(X * A_i) where A_i is its column chunk.
"""
def __init__(self, in_features, out_features, tp_group):
super().__init__()
self.tp_group = tp_group
self.tp_size = dist.get_world_size(tp_group)
self.tp_rank = dist.get_rank(tp_group)
# Each GPU holds 1/tp_size of the output features
assert out_features % self.tp_size == 0
self.local_out = out_features // self.tp_size
# Weight shard: in_features x local_out (no bias splitting shown)
self.weight = nn.Parameter(
torch.empty(self.local_out, in_features)
)
nn.init.kaiming_uniform_(self.weight)
def forward(self, x):
# x is identical on all tp GPUs (replicated input)
# Each GPU computes its output chunk
return torch.nn.functional.linear(x, self.weight)
class RowParallelLinear(nn.Module):
"""
Split weight matrix row-wise across tensor-parallel GPUs.
Input is already split (comes from ColumnParallelLinear output).
Output requires all-reduce to sum partial results.
For an MLP: Z = Y * B where B is split row-wise.
GPU i computes: Z_i = Y_i * B_i (partial result).
All-reduce: Z = sum(Z_i) over all tp GPUs.
"""
def __init__(self, in_features, out_features, tp_group):
super().__init__()
self.tp_group = tp_group
self.tp_size = dist.get_world_size(tp_group)
assert in_features % self.tp_size == 0
self.local_in = in_features // self.tp_size
# Weight shard: local_in x out_features
self.weight = nn.Parameter(
torch.empty(out_features, self.local_in)
)
nn.init.kaiming_uniform_(self.weight)
def forward(self, x):
# x is the split output from ColumnParallelLinear
# x shape: [batch, seq, local_in]
partial = torch.nn.functional.linear(x, self.weight)
# partial shape: [batch, seq, out_features] but only partial sum
# All-reduce: sum across all tp GPUs to get full output
# This is the single communication operation for the full MLP block
dist.all_reduce(partial, op=dist.ReduceOp.SUM, group=self.tp_group)
return partial
class TensorParallelMLP(nn.Module):
"""
Complete tensor-parallel MLP block.
Requires exactly 2 all-reduce operations total:
- 1 in forward pass (in RowParallelLinear)
- 1 in backward pass (gradient of input to ColumnParallelLinear)
"""
def __init__(self, hidden_dim, ffn_dim, tp_group):
super().__init__()
self.fc1 = ColumnParallelLinear(hidden_dim, ffn_dim, tp_group)
self.fc2 = RowParallelLinear(ffn_dim, hidden_dim, tp_group)
self.activation = nn.GELU()
def forward(self, x):
# x: [batch, seq, hidden_dim] - replicated on all tp GPUs
h = self.fc1(x) # [batch, seq, ffn_dim/tp_size] - split
h = self.activation(h) # activation applied locally
out = self.fc2(h) # [batch, seq, hidden_dim] + all-reduce
return out
Benchmarking Parallelism Strategies
import time
import torch
import torch.distributed as dist
def benchmark_allreduce(tensor_size_gb, backend="nccl"):
"""
Benchmark all-reduce throughput for given tensor size.
Run this on your actual cluster to understand communication limits.
"""
rank = dist.get_rank()
world_size = dist.get_world_size()
device = torch.device(f"cuda:{rank}")
# Tensor size in elements (float32, 4 bytes each)
num_elements = int(tensor_size_gb * 1e9 / 4)
tensor = torch.randn(num_elements, device=device)
# Warmup
for _ in range(3):
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
torch.cuda.synchronize()
# Benchmark
n_iters = 10
start = time.perf_counter()
for _ in range(n_iters):
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
avg_time = elapsed / n_iters
# Ring all-reduce transfers 2*(N-1)/N * size bytes through each link
effective_bytes = 2 * (world_size - 1) / world_size * tensor_size_gb * 1e9
bandwidth_gb_s = effective_bytes / avg_time / 1e9
if rank == 0:
print(f"Tensor: {tensor_size_gb:.2f} GB, "
f"Time: {avg_time*1000:.1f} ms, "
f"Bandwidth: {bandwidth_gb_s:.1f} GB/s")
# Run to characterize your cluster's all-reduce performance:
# torchrun --nproc_per_node=8 benchmark.py
# Then compare against theoretical peak to find bottlenecks
Architecture Diagrams
Production Engineering Notes
Choosing Tensor Parallel Degree
Tensor parallelism degree should almost always be bounded by the NVLink domain. An H100 DGX node has 8 GPUs connected via NVLink 4.0 at 900 GB/s bidirectional. Setting TP = 8 keeps all tensor-parallel communication within this high-bandwidth domain.
Setting TP = 16 would require crossing InfiniBand links at 400 Gb/s (50 GB/s) for the all-reduces inside each transformer layer. For a model with 128 transformer layers, each requiring 2 all-reduces of ~500 MB each, you would move 128 GB of data per training step across InfiniBand just for tensor-parallel communication. At 50 GB/s that is 2.6 seconds per step - utterly unusable.
Rule of thumb: TP degree = GPUs per node (typically 8). Never exceed the NVLink domain for tensor parallelism.
Choosing Pipeline Parallel Degree
Pipeline parallelism crosses node boundaries but only communicates activations between adjacent stages, not doing broadcast across all nodes. The communication is point-to-point, not collective.
Pipeline depth should be chosen so that:
- The bubble fraction (d-1)/(d-1+m) stays below 5-10%
- Each pipeline stage has balanced compute (roughly equal FLOP count)
- The number of microbatches m >= 4*(d-1) to keep bubble below 20%
For a 96-layer model on 8 pipeline stages: 12 layers per stage. With microbatch size 4 and 8 pipeline stages, you need at least 28 microbatches for 20% bubble or 112 microbatches for 6% bubble. This constrains your minimum global batch size.
FSDP vs DDP Decision
Use DDP when:
- Model fits comfortably in GPU memory (< 60% of VRAM for a 80 GB H100)
- You want simplicity - DDP has fewer gotchas
- Training stability is critical - DDP's behavior is well-understood
Use FSDP when:
- Model is 7B+ parameters per GPU
- You need ZeRO Stage 2 or 3 memory savings
- You are using mixed precision and can afford the all-gather overhead
- Optimizer states (Adam: 2x param size) would OOM without sharding
FSDP has a higher communication overhead than DDP for small models because it does all-gather before every forward pass. For a 7B model on 8 GPUs, FSDP saves ~60 GB of memory per GPU but adds ~15% communication overhead versus DDP.
Gradient Accumulation with Pipeline Parallelism
When using pipeline parallelism, gradient accumulation is mandatory - not optional. The number of gradient accumulation steps equals the number of microbatches (). This means your effective global batch size is:
Where is microbatch size, is microbatches per pipeline flush, and is data-parallel degree.
For a 3D parallelism setup with , , : global batch = 3072 samples. This is important for learning rate scaling (use the linear scaling rule: multiply LR by or the linear rule for smaller regimes).
Monitoring Distributed Training Health
# Check GPU utilization across all nodes
for node in $(cat hostfile); do
ssh $node nvidia-smi --query-gpu=index,utilization.gpu,memory.used \
--format=csv,noheader
done
# NCCL debug output - set before launching
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL
# Check for stragglers - GPUs taking much longer than others
# Look for variance in step times across ranks
# In PyTorch, use torch.distributed.monitored_barrier() to detect hangs
# Profile communication vs compute ratio
export NCCL_ALGO=Ring # force ring algorithm
export NCCL_PROTO=Simple # force simple protocol for debugging
Common Mistakes
:::danger Dead processes and zombie training runs
When a single GPU in a distributed job fails or OOMs, the entire job hangs rather than crashing cleanly. The dist.barrier() call that every rank expects will hang indefinitely when one rank has died. Always set NCCL_ASYNC_ERROR_HANDLING=1 and implement a watchdog that kills the job if no progress is made within a timeout. Without this, a failed GPU in a 512-GPU job will silently consume cluster resources for hours.
:::
:::danger Wrong gradient synchronization frequency with gradient accumulation
A common mistake with DDP and gradient accumulation: calling loss.backward() multiple times without the model.no_sync() context manager triggers an all-reduce after every backward call, not just after the last accumulation step. This is N times more communication than needed and will destroy throughput. Always use:
for step in range(accumulation_steps):
with model.no_sync() if step < accumulation_steps - 1 else contextlib.nullcontext():
loss = model(inputs) / accumulation_steps
loss.backward()
optimizer.step()
:::
:::warning Mismatched pipeline stage sizes
In pipeline parallelism, if one stage has significantly more compute than others (e.g., the last stage has the language model head on top of the transformer layers, which involves a very large vocabulary projection), that stage becomes a bottleneck. All other stages will sit idle waiting for it. Profile per-stage compute time with torch.cuda.synchronize() timing before committing to a pipeline split. Aim for less than 10% variance in stage compute time.
:::
:::warning FSDP with non-transformer architectures
FSDP's transformer_auto_wrap_policy works well for transformer architectures where each layer is a natural sharding unit. For CNNs or custom architectures, the auto-wrap policy may create FSDP units that are too small (causing excessive all-gather overhead) or too large (causing OOM during the all-gather of one unit). Always explicitly specify your wrapping policy and validate that the memory profile is what you expect before a long training run.
:::
:::warning Forgetting to set the DistributedSampler epoch
If you forget to call sampler.set_epoch(epoch) at the start of each epoch in DDP training, the sampler will use the same random seed every epoch. Every GPU will see the exact same data ordering across all epochs, eliminating the benefit of shuffling and potentially causing overfitting to the order of data in the first epoch. This is a silent bug - training will appear to work, but generalization will be worse.
:::
Choosing Parallelism: A Decision Framework
When you sit down with a model spec sheet and a cluster spec, the decision process is systematic. Here is how to work through it.
Step 1: Does the model fit on one GPU?
Compute parameter memory: bytes (BF16) for the model alone. Add optimizer states (Adam in FP32 masters: bytes). Add gradient buffer ( bytes). Total: roughly bytes for mixed-precision Adam training.
For an H100 with 80 GB VRAM: maximum parameter count before memory pressure = billion parameters. So any model above 6-7B requires either FSDP (which shards memory across GPUs) or model parallelism.
Step 2: What is your NVLink domain size?
Modern DGX nodes have 8 GPUs connected via NVLink. Set your tensor-parallel degree to this number: TP = 8. Do not exceed the NVLink domain. If you have a custom cluster with only PCIe connectivity, set TP = 1 (no tensor parallelism) and rely on pipeline and data parallelism.
Step 3: How many layers?
Pipeline parallelism works by assigning equal groups of consecutive layers to pipeline stages. For efficiency, you want at least 4-8 layers per stage. If the model has 96 layers and you use PP = 8, that is 12 layers per stage - reasonable. If you use PP = 32, that is 3 layers per stage - too coarse-grained, load imbalance becomes a problem.
Choose PP such that: layers-per-stage >= 4 AND bubble fraction < 10%.
Step 4: Compute required microbatches for bubble tolerance
From the bubble formula: to keep bubble below 10% (where is pipeline depth). For PP = 8: need microbatches. For PP = 4: need .
If your batch size constraints prevent sufficient microbatches (e.g., memory limits prevent large batches), reduce PP depth.
Step 5: Remaining GPUs go to data parallelism
After assigning TP and PP, remaining GPUs form data-parallel replica groups. This determines your effective global batch size per step.
Summary table for common model sizes:
| Model Size | TP | PP | DP | Min GPUs | Rec. GPUs |
|---|---|---|---|---|---|
| 1-7B | 1 | 1 | N | 1 | 8-64 |
| 7-13B | 8 | 1 | N | 8 | 16-64 |
| 13-70B | 8 | 4 | N | 32 | 128-512 |
| 70-175B | 8 | 8 | N | 64 | 256-1024 |
| 175B-1T | 8 | 16 | N | 128 | 1024-4096 |
TP stays at 8 (one DGX node). PP scales with model depth. DP provides throughput scaling once memory requirements are met.
Weak vs Strong Scaling Efficiency
Two important metrics for distributed training:
Strong scaling keeps the global batch size fixed and adds more GPUs. Ideal: with 2x GPUs, training takes 1/2 the time.
In practice, communication overhead means efficiency drops below 1.0. For large models with high compute-to-communication ratios, you can achieve 85-95% strong scaling efficiency with good NVLink/InfiniBand topology.
Weak scaling increases both GPUs and batch size proportionally. Ideal: with 2x GPUs processing 2x data per step, training time per step stays constant.
Weak scaling is almost always easier to achieve than strong scaling because you are not reducing per-GPU compute - only adding communication overhead.
For a 1T parameter model on 3072 H100s, the Megatron-DeepSpeed paper reported approximately 52% Model FLOP Utilization (MFU), meaning 52% of the theoretical peak FLOP/s was being used productively. Communication, pipeline bubbles, and framework overhead consumed the rest. Getting above 40% MFU on very large multi-node runs is considered excellent.
Interview Q&A
Q1: What is the key difference between PyTorch DDP and FSDP, and when would you choose each?
DDP replicates the full model on every GPU and synchronizes gradients via all-reduce after each backward pass. Each GPU holds: full parameters (P), full gradients (P), and full optimizer states (for Adam: 2P). Memory per GPU is bytes at FP32 or bytes for mixed-precision with Adam.
FSDP shards all three across GPUs. Memory per GPU is bytes for parameters, plus optimizer states and gradients are also sharded. This is equivalent to ZeRO Stage 3. The trade-off is that FSDP must all-gather parameters before each forward and backward pass through each FSDP unit, adding communication overhead that DDP does not have.
Choose DDP when the model fits comfortably (< 60% VRAM, accounting for activations). Choose FSDP when you need to train models that would OOM with DDP, specifically 7B+ parameter models on 80 GB GPUs. Also use FSDP when optimizer state memory (2x param size for Adam) is the bottleneck - FSDP eliminates this by sharding optimizer states.
Q2: Explain the Megatron-LM column-row splitting trick for tensor parallelism. Why are exactly two all-reduces needed per MLP block, not one or four?
The MLP block computes . Splitting column-wise means each GPU computes a local - no communication needed because X is replicated on all GPUs. The column split of A naturally creates a row split of the input to B, so splitting B row-wise means each GPU computes a partial sum . To get the full output Z, one all-reduce sums these partial results.
That is one all-reduce in the forward pass. In the backward pass, the gradient needs to flow back through the row-parallel linear, and then the column-parallel linear's input needs its gradient aggregated - this requires one more all-reduce. Total: exactly 2 per MLP block.
Why not one? You cannot avoid the second one without changing the computation structure. Why not four? Because the column-to-row matching eliminates the intermediate all-reduce between the two linear layers. This is the elegance of the Megatron trick - it pairs the splits so the only communication is at the boundary of the full MLP block.
Q3: What is the pipeline bubble and how do you calculate its cost? What is the 1F1B schedule and how does it reduce memory?
The pipeline bubble is the idle time that occurs at the start and end of a pipeline forward-backward pass. With pipeline stages, the first stage must wait for other stages to receive their first microbatch before the backward pass can start. Similarly, at the end, the last stage finishes early while earlier stages are still processing. The bubble fraction is where is microbatches per pipeline flush.
The naive GPipe schedule stores all intermediate activations for all microbatches simultaneously (needed for the backward passes). Memory scales as .
The 1F1B (one-forward-one-backward) schedule interleaves forward and backward passes so that after the initial pipeline fill, each stage does one forward and one backward pass in alternation. This means the backward pass for microbatch 1 starts immediately after all stages have processed it, before all microbatches have been processed. This reduces peak activation memory from to - a factor of improvement. The bubble fraction is the same as GPipe, but memory is dramatically reduced.
Q4: A 65B parameter model in BF16 needs to be trained on H100s with 80 GB VRAM each. Walk through how you would decide on the parallelism strategy.
First, compute memory requirements. The model in BF16 uses bytes = 130 GB for parameters alone. This does not fit on one GPU. With optimizer states (AdamW in BF16 masters with FP32 moments): roughly bytes = 910 GB for parameters + gradients + optimizer states combined.
Minimum GPU count for memory: with FSDP/ZeRO-3 across GPUs, parameter memory per GPU is GB. For : 16.25 GB parameters, leaving 63.75 GB for activations. At typical sequence lengths and batch sizes for 65B models, this is workable.
Proposed configuration for a DGX H100 cluster (8 GPUs per node):
- TP = 8 (within each node, over NVLink) - handles the wide layers
- PP = 4 (across 4 nodes) - handles depth, 65B models have ~80 layers, 20 per stage
- DP = 4 (4 pipeline replicas for data parallelism)
- Total: 8 x 4 x 4 = 128 GPUs across 16 nodes
Pipeline micro-batches: to keep bubble below 10%, need microbatches. With microbatch size 2 and 30 microbatches: global batch = 2 x 30 x 4 = 240 samples per step.
Q5: How does FSDP's all-gather overhead interact with NVLink and InfiniBand bandwidth? What is the practical throughput impact?
FSDP all-gathers parameters before each FSDP unit's forward pass and backward pass. For a model with FSDP units (typically one per transformer layer), this means all-gather operations per training step (once forward, once backward).
Each all-gather moves the full parameter shard for one layer. For a 7B model with 32 transformer layers, each layer has roughly parameters. In BF16, that is 438 MB per layer. Each all-gather across 8 NVLink-connected GPUs moves MB per GPU.
At NVLink 4.0 bandwidth of 450 GB/s unidirectional: one all-gather takes ms. For 32 layers x 2 (forward + backward) = 64 all-gathers per step: ms of all-gather communication per step.
If your forward-backward compute time is 500 ms per step, FSDP adds roughly 11% overhead. This can be partially hidden by prefetching the next layer's parameters while computing the current layer - FSDP's backward_prefetch=BackwardPrefetch.BACKWARD_PRE does exactly this, typically reducing visible overhead to 3-5%.
The critical point: this only works with NVLink. Over InfiniBand at 50 GB/s, the same all-gather takes ms per layer, and 64 all-gathers = 493 ms - exceeding the compute time and making training 2x slower. FSDP across nodes over InfiniBand requires careful tuning or switching to pipeline parallelism to reduce inter-node communication frequency.
Q6: What is weak vs strong scaling efficiency, and what MFU should you realistically target for large model training?
Strong scaling efficiency measures how well training time improves as you add GPUs while keeping global batch size fixed. Perfect strong scaling would mean 2x GPUs = 2x faster. In practice, communication overhead grows with GPU count because all-reduce takes steps and the communication volume per GPU is constant regardless of N.
Weak scaling efficiency measures throughput (samples/second) as you add GPUs while also increasing global batch size proportionally. This is easier to achieve because you are not reducing per-GPU work - you are just adding communication overhead.
Model FLOP Utilization (MFU) = actual training throughput in FLOP/s divided by the GPU's peak FLOP/s. For an H100 at BF16 peak of 989 TFLOP/s:
- A single GPU training a small model: 40-60% MFU (framework overhead)
- 8 GPUs within a DGX node (NVLink): 50-65% MFU (excellent)
- 64 GPUs across 8 nodes (InfiniBand): 45-60% MFU
- 512 GPUs, 3D parallelism: 40-55% MFU
- 3000+ GPUs, frontier models: 35-52% MFU
The Megatron-DeepSpeed 1T parameter paper reported 52% MFU on 3072 A100s. GPT-4 training reportedly achieved around 32-38% MFU on custom clusters. Anything above 40% at scale (100+ GPUs) is engineering excellence. Below 30% suggests significant bottlenecks in communication, load balancing, or checkpointing.
