Skip to main content

:::tip ๐ŸŽฎ Interactive Playground Visualize this concept: Try the CUDA Programming Model demo on the EngineersOfAI Playground - no code required. :::

Distributed Training Strategies

The 70B Parameter Decisionโ€‹

The engineering team needed to train a 70B parameter language model. They had a cluster of 64 A100 80GB GPUs. The question: how should they distribute the training?

The options seemed overwhelming: data parallel (DDP), fully sharded data parallel (FSDP/ZeRO-3), tensor parallel, pipeline parallel, or some combination. Each blogpost they found advocated a different approach. The Megatron-LM paper described "3D parallelism" combining all three but gave no guidance on how to choose the right dimensions for their setup.

After two weeks of experimentation, they discovered something not in any blog post: the right combination depended on their specific cluster's network topology. Their 64 GPUs were organized in 8 nodes of 8 GPUs each, with NVLink within nodes (600 GB/s) and InfiniBand between nodes (200 Gbps = 25 GB/s). The correct strategy was to maximize communication within nodes (where NVLink was fast) and minimize communication across nodes (where InfiniBand was the bottleneck).

This lesson gives you the framework to make this decision systematically: understand what each parallelism strategy communicates, at what volume, and why the network topology determines the right combination.


Why One GPU Is Never Enough for Large Modelsโ€‹

The math is simple. A 70B parameter model in mixed precision:

  • Weights: 140 GB (fp16)
  • Gradients: 140 GB
  • Adam optimizer states: 840 GB (fp32)
  • Total: 1.12 TB

No single GPU has 1.12 TB of VRAM. The largest H100 has 80 GB. Training a 70B model requires distributing it across at least 14 H100s just for memory, and in practice 32โ€“128 GPUs for reasonable throughput.

Distributed training is not a performance optimization - for large models, it is a necessity.


Data Parallelism (DDP)โ€‹

Data Parallelism is the simplest distributed training strategy. Each GPU has a full copy of the model. Training data is split across GPUs. Gradients are synchronized after each backward pass.

How it works:

  1. Split the batch: GPU 0 gets examples 0โ€“63, GPU 1 gets 64โ€“127, etc.
  2. Each GPU runs forward and backward pass independently
  3. All-reduce: sum gradients across all GPUs (equivalent to averaging the per-GPU gradients)
  4. All GPUs update their local model copy with the same gradient

The result: training behaves as if you used a single GPU with a batch size equal to the sum of all per-GPU batches.

The communication cost: one all-reduce per training step, synchronizing all gradients. For a 7B parameter model, that is 28 GB of gradients per step.

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os

def setup_ddp():
"""Initialize distributed process group."""
dist.init_process_group(backend="nccl") # nccl for GPU-to-GPU communication
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
return local_rank


def cleanup_ddp():
dist.destroy_process_group()


def train_with_ddp(model_class, dataset, n_epochs: int = 3):
"""
DDP training: each process owns one GPU.
Launch with: torchrun --nproc_per_node=8 train_ddp.py
"""
local_rank = setup_ddp()
device = torch.device(f"cuda:{local_rank}")

# Each process creates its own model replica
model = model_class().to(device)

# Wrap with DDP: handles gradient synchronization automatically
# find_unused_parameters=True if some parameters don't participate in loss
ddp_model = DDP(model, device_ids=[local_rank], find_unused_parameters=False)

# DistributedSampler ensures each GPU sees different data
from torch.utils.data import DistributedSampler, DataLoader
sampler = DistributedSampler(dataset, shuffle=True)
loader = DataLoader(dataset, batch_size=32, sampler=sampler)

optimizer = torch.optim.AdamW(ddp_model.parameters(), lr=1e-4)

for epoch in range(n_epochs):
sampler.set_epoch(epoch) # Important: ensures different shuffling per epoch
for step, (inputs, labels) in enumerate(loader):
inputs = inputs.to(device)
labels = labels.to(device)

optimizer.zero_grad()

with torch.cuda.amp.autocast(dtype=torch.bfloat16):
output = ddp_model(inputs)
loss = nn.functional.cross_entropy(output, labels)

loss.backward() # DDP synchronizes gradients here

optimizer.step()

if local_rank == 0 and step % 100 == 0:
print(f"Epoch {epoch}, Step {step}, Loss: {loss.item():.4f}")

cleanup_ddp()

DDP's Gradient Synchronizationโ€‹

DDP uses a "bucket" strategy: it begins all-reducing the gradient of a parameter as soon as that parameter's gradient is computed during the backward pass, overlapping communication with computation. For later layers (computed last in backward), their gradients are all-reduced while earlier layers are still computing gradients.

This overlap is critical. Without it, all-reduce waits until the entire backward pass completes. With it, communication latency is partially hidden by computation.


Fully Sharded Data Parallel (FSDP)โ€‹

DDP requires each GPU to hold a full copy of all model parameters, gradients, and optimizer states. For models larger than VRAM capacity, DDP is impossible.

FSDP (PyTorch's implementation of ZeRO-3) shards everything: model parameters, gradients, and optimizer states are split across GPUs. Each GPU holds only 1/N of each component.

Trade-off: FSDP requires all-gathering parameters before each forward computation (to assemble the full parameter tensor from shards), then discarding them after the forward pass. This adds communication and reduces training speed by 10โ€“25% compared to DDP - but enables training models that would not fit in DDP.

from torch.distributed.fsdp import (
FullyShardedDataParallel as FSDP,
MixedPrecision,
ShardingStrategy,
BackwardPrefetch,
)
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools

def setup_fsdp(model: nn.Module, transformer_layer_class) -> FSDP:
"""
Wrap a model with FSDP (ZeRO-3).
transformer_layer_class is the class of individual transformer layers
(e.g., GPT2Block, BertLayer) - each gets its own FSDP unit.
"""
# Mixed precision: bf16 for computation, fp32 accumulated internally
mp_policy = MixedPrecision(
param_dtype=torch.bfloat16,
reduce_dtype=torch.bfloat16,
buffer_dtype=torch.bfloat16,
)

# Wrap each transformer layer in its own FSDP unit
# This enables layer-by-layer all-gather (efficient memory use)
wrap_policy = functools.partial(
transformer_auto_wrap_policy,
transformer_layer_cls={transformer_layer_class},
)

fsdp_model = FSDP(
model,
auto_wrap_policy=wrap_policy,
mixed_precision=mp_policy,
sharding_strategy=ShardingStrategy.FULL_SHARD, # ZeRO-3
backward_prefetch=BackwardPrefetch.BACKWARD_PRE, # prefetch next params
device_id=torch.cuda.current_device(),
sync_module_states=True, # ensure initial params are in sync across ranks
)

return fsdp_model

Tensor Parallelismโ€‹

Tensor Parallelism splits individual layer computations across GPUs. Rather than each GPU having the full weight matrix, each GPU has a column (or row) slice of the weight matrix.

For a linear layer Y=XWY = XW with WโˆˆRdร—dโ€ฒW \in \mathbb{R}^{d \times d'}:

  • Split WW into NN column groups: W=[W1โˆฃW2โˆฃ...โˆฃWN]W = [W_1 | W_2 | ... | W_N]
  • GPU ii computes Yi=XWiY_i = X W_i
  • All-gather the results: Y=[Y1โˆฃY2โˆฃ...โˆฃYN]Y = [Y_1 | Y_2 | ... | Y_N]

This partitions the most compute-intensive operation (the matrix multiply) across GPUs. For transformer attention, both the QKV projection and the output projection can be column-split, achieving perfect parallelism with one all-reduce per attention block.

When to use tensor parallelism:

  • Model is too large for a single GPU even with FSDP
  • You have NVLink-connected GPUs (tensor parallelism requires very low-latency communication)
  • The communication overhead (all-reduces every layer) is justified by the memory savings

Tensor parallelism degree is typically 2โ€“8 (within a single NVLink-connected node). Crossing NVLink domain boundaries for tensor parallelism is very expensive.

# Tensor parallelism is complex to implement from scratch
# In practice, use Megatron-LM or ColossalAI for production tensor parallel training

# Conceptual illustration of column-parallel linear:
class ColumnParallelLinear(nn.Module):
"""
Each GPU holds a column slice of the weight matrix.
Input X is replicated across all ranks.
Output Y is gathered across ranks.
"""
def __init__(
self,
in_features: int,
out_features: int,
world_size: int,
rank: int,
):
super().__init__()
assert out_features % world_size == 0
self.local_out = out_features // world_size

# Each GPU stores only its column slice
self.weight = nn.Parameter(
torch.empty(self.local_out, in_features)
)
nn.init.kaiming_uniform_(self.weight)

def forward(self, x: torch.Tensor) -> torch.Tensor:
# Local matmul: x @ weight.T = (batch, in) @ (local_out, in).T
local_output = x @ self.weight.T # (batch, local_out)

# All-gather outputs from all GPUs
# gather_output is (batch, out_features) assembled from all ranks
gathered = [torch.zeros_like(local_output) for _ in range(dist.get_world_size())]
dist.all_gather(gathered, local_output)
return torch.cat(gathered, dim=-1) # (batch, out_features)

Pipeline Parallelismโ€‹

Pipeline parallelism partitions model layers across GPUs. GPU 0 runs layers 0โ€“5, GPU 1 runs layers 6โ€“11, etc. Data flows through the pipeline: GPU 0 processes the first microbatch, then GPU 1 processes it while GPU 0 processes the next microbatch.

The pipeline bubble: Without careful scheduling, GPUs are idle waiting for their turn in the pipeline. A naive pipeline has utilization of only 1โˆ’1/k1 - 1/k where kk is the number of microbatches. With k=1k=1 microbatch and 8 GPUs: only 1/8 = 12.5% utilization.

1F1B scheduling: Gpipe and PipeDream solve the bubble problem with interleaved scheduling: each GPU maintains both forward and backward passes for different microbatches simultaneously. This reduces the bubble to O(pโˆ’1)/(k+pโˆ’1)O(p-1)/(k + p - 1) where pp is pipeline stages and kk is microbatches.

When to use pipeline parallelism:

  • Model is extremely deep (100+ layers)
  • Inter-node bandwidth is the bottleneck (pipeline requires less cross-node communication than tensor parallel)
  • You can use large enough microbatch count (kโ‰ฅ4pk \geq 4p) to amortize the pipeline bubble

3D Parallelismโ€‹

For very large models (100B+), Megatron-LM (Narayanan et al., 2021) combines all three strategies:

  • Data parallel: across nodes
  • Tensor parallel: within a node (NVLink domain, typically 8 GPUs)
  • Pipeline parallel: across nodes within a tensor-parallel group

The organization: if you have 64 GPUs in an 8ร—8 grid (8 nodes, 8 GPUs each):

  • Tensor parallel degree = 8: within each node, all 8 GPUs hold one tensor-parallel slice
  • Pipeline parallel degree = 4: 4 pipeline stages, each stage running on 2 nodes
  • Data parallel degree = 2: two copies of the full model, each processing half the batch

This arrangement minimizes cross-node communication (only pipeline-stage boundaries) while maximizing NVLink-accelerated tensor parallelism within nodes.


Choosing the Right Strategyโ€‹

def recommend_parallelism_strategy(
n_parameters: int,
n_gpus: int,
vram_per_gpu_gb: float,
nvlink_available: bool,
infiniband_bandwidth_gbps: float = 200,
) -> dict:
"""
Rule-based recommendation for parallelism strategy.
Real decisions require benchmarking, but this gives a starting point.
"""
# Memory requirement for full model (optimizer states dominate)
# fp16 weights + fp16 grads + fp32 master + fp32 Adam m + fp32 Adam v
bytes_per_param = 2 + 2 + 4 + 4 + 4 # = 16 bytes for mixed precision training
total_memory_gb = (n_parameters * bytes_per_param) / 1e9

gpus_for_memory = int(total_memory_gb / (vram_per_gpu_gb * 0.8)) + 1 # 80% VRAM

strategy = {}

if gpus_for_memory <= 1:
strategy["primary"] = "Single GPU or DDP"
strategy["data_parallel_degree"] = n_gpus
strategy["tensor_parallel_degree"] = 1
strategy["pipeline_parallel_degree"] = 1

elif gpus_for_memory <= n_gpus and nvlink_available:
strategy["primary"] = "FSDP (ZeRO-3)"
strategy["data_parallel_degree"] = max(1, n_gpus // 8)
strategy["tensor_parallel_degree"] = 1
strategy["pipeline_parallel_degree"] = 1

elif n_parameters > 70e9:
# Very large models need 3D parallelism
tp_degree = 8 if nvlink_available else 1 # tensor parallel within NVLink domain
pp_degree = max(2, gpus_for_memory // (8 if nvlink_available else 1))
dp_degree = max(1, n_gpus // (tp_degree * pp_degree))

strategy["primary"] = "3D Parallelism (Megatron-LM style)"
strategy["tensor_parallel_degree"] = tp_degree
strategy["pipeline_parallel_degree"] = pp_degree
strategy["data_parallel_degree"] = dp_degree

else:
strategy["primary"] = "FSDP with data parallel"
strategy["data_parallel_degree"] = n_gpus
strategy["tensor_parallel_degree"] = 1
strategy["pipeline_parallel_degree"] = 1

strategy["total_memory_required_gb"] = round(total_memory_gb, 1)
strategy["gpus_needed_for_memory"] = gpus_for_memory
return strategy

Communication Primitivesโ€‹

Understanding the underlying communication operations helps you reason about distributed training performance.

All-Reduce: Each GPU contributes a tensor; all GPUs receive the element-wise sum. Used by DDP for gradient synchronization.

All-Gather: Each GPU contributes a tensor slice; all GPUs receive the full tensor. Used by FSDP to reconstruct sharded parameters before computation.

Reduce-Scatter: The element-wise reduction of a tensor, but instead of all GPUs receiving the full result, GPU ii receives only the ii-th shard. Used by FSDP during the backward pass.

For a ring all-reduce (most common NCCL implementation): time=2ร—n_elementsร—dtype_bytesbandwidthร—Nโˆ’1N\text{time} = 2 \times \frac{n\_elements \times dtype\_bytes}{\text{bandwidth}} \times \frac{N-1}{N}

Where NN is the number of GPUs. For 8 GPUs and 28 GB of gradients over NVLink (600 GB/s):

timeโ‰ˆ2ร—28ร—109600ร—109ร—78=0.082ย seconds\text{time} \approx 2 \times \frac{28 \times 10^9}{600 \times 10^9} \times \frac{7}{8} = 0.082 \text{ seconds}

82ms all-reduce time. If your forward+backward pass is 200ms, communication is 29% of total step time - significant but manageable.


Production Engineering Notesโ€‹

Overlap communication with computation. DDP's bucket-based gradient synchronization overlaps all-reduce with the backward pass - ensure find_unused_parameters=False when possible (setting it to True disables this overlap). For FSDP, use BackwardPrefetch.BACKWARD_PRE to prefetch the next layer's parameters while computing the current layer.

Use gradient accumulation to amortize communication cost. With gradient accumulation, you run multiple forward+backward passes before each optimizer step, spreading the communication cost over more computation. For pipeline parallelism, this is essential to reduce the bubble ratio.

Profile with NCCL traces. Use NCCL_DEBUG=INFO environment variable to log NCCL communication operations. PyTorch Profiler with with_stack=True shows where all-reduce operations are being initiated. If all-reduce takes more than 30% of step time, investigate network topology or reduce gradient communication frequency.


Common Mistakesโ€‹

:::danger Using DDP when the model does not fit on a single GPU DDP replicates the full model on each GPU. If the model + gradients + optimizer states exceed GPU VRAM, DDP fails with OOM even if you have many GPUs. For models that do not fit in a single GPU's memory, use FSDP (ZeRO-3) or tensor parallelism. :::

:::warning Setting pipeline parallel degree without enough microbatches Pipeline parallelism with p=8p=8 pipeline stages and only k=4k=4 microbatches per step results in a pipeline bubble of (pโˆ’1)/(k+pโˆ’1)=7/11=64%(p-1)/(k+p-1) = 7/11 = 64\% - almost two thirds of your GPUs are idle most of the time. Set microbatch count kโ‰ฅ2pk \geq 2p (ideally 4p4p) to keep pipeline utilization above 67% (ideally 80%+). :::

:::tip For most models under 30B parameters, FSDP is the right choice Unless you need the sophistication of 3D parallelism (which requires a dedicated infrastructure team to maintain), FSDP covers most training scenarios up to 30B parameters on standard GPU clusters. It integrates directly into PyTorch, does not require custom model modifications, and performs within 15โ€“20% of Megatron-LM on standard transformer architectures. :::


Interview Questionsโ€‹

Q1: What is data parallelism and what is its memory and communication cost?

Data parallelism replicates the full model on each GPU and splits the input batch. Each GPU runs a full forward and backward pass on its subset, then all-reduce synchronizes gradients across GPUs. Memory cost: each GPU needs full model + gradients + optimizer states - does not scale beyond single-GPU memory limit. Communication cost: one all-reduce per step, transferring 2ร— gradient size (the ring all-reduce algorithm sends each byte twice across the ring). For a 7B model (28 GB gradients) across 8 GPUs on NVLink: ~82ms per step. Data parallelism is simple, well-supported, and works well for models that fit in a single GPU's memory.

Q2: When would you use tensor parallelism over FSDP?

Tensor parallelism splits individual layer computations across GPUs - each GPU computes with only a column/row slice of weight matrices. It is appropriate when: (1) the model is too large even for FSDP to handle efficiently (100B+ parameters), (2) you have NVLink-connected GPUs (tensor parallel requires all-reduces every layer - PCIe bandwidth makes this catastrophically slow), and (3) you need the absolute maximum throughput for a fixed GPU count. FSDP is simpler, works over PCIe, and handles most models up to 30B well. Use tensor parallelism only when you have both the model scale and the NVLink hardware to justify the complexity.

Q3: Explain the pipeline bubble problem and how to minimize it.

In a pp-stage pipeline with kk microbatches per step, the pipeline bubble (fraction of time with idle GPUs) is approximately (pโˆ’1)/(k+pโˆ’1)(p-1)/(k+p-1). With p=8p=8 and k=1k=1: 87.5% bubble - 7 out of 8 GPUs idle most of the time. The solution: increase microbatch count kk. With k=32k=32: (8โˆ’1)/(32+8โˆ’1)=7/39=18%(8-1)/(32+8-1) = 7/39 = 18\% bubble. The 1F1B (one forward, one backward) scheduling in PipeDream reduces peak memory by allowing backward passes to start before all microbatch forwards complete. In practice, pipeline degree should be set to minimize cross-node communication (stages per node), and microbatch count should be set large enough to reduce the bubble below 20%.

Q4: How does gradient synchronization overhead scale with the number of GPUs and model size?

All-reduce time for ring all-reduce: 2ร—gradient_bytesbandwidthร—Nโˆ’1N2 \times \frac{\text{gradient\_bytes}}{\text{bandwidth}} \times \frac{N-1}{N}. The (Nโˆ’1)/N(N-1)/N factor approaches 1 quickly - gradient size dominates, not GPU count. Doubling the number of GPUs barely changes all-reduce time (from 87.5% of 1-GPU baseline at N=8 to 93.75% at N=16). What matters most is gradient size (linear in parameters) and bandwidth. This means DDP scales well in terms of communication for large GPU counts - the bottleneck is the parameter count and network bandwidth, not the number of nodes.

Q5: You are training a 30B parameter model on 32 A100 80GB GPUs in a cluster with NVLink within nodes and InfiniBand between nodes. What parallelism strategy do you recommend?

30B parameters in mixed precision: 16 bytes/param = 480 GB total, which does not fit on a single 80 GB GPU. With 32 GPUs (4 nodes ร— 8 GPUs each): Option A - FSDP across all 32 GPUs: each GPU holds 480/32 = 15 GB of state. All-reduce over InfiniBand between nodes adds overhead. Expected throughput: moderate. Option B - Tensor parallel degree 8 (within each node) + data parallel degree 4 (across nodes): each GPU holds 480/32 = 15 GB but tensor parallel reduces per-layer memory further. All-reduce happens within nodes via fast NVLink. Cross-node communication (data parallel gradient sync) is smaller than full tensor parallel all-reduce. Recommended: Option B if you have Megatron-LM or NeMo infrastructure. Option A (FSDP) if you need simpler implementation and can accept 15โ€“20% lower throughput. The choice depends on your team's infrastructure expertise.

ยฉ 2026 EngineersOfAI. All rights reserved.