Skip to main content

Memory Profiling and Debugging

The Production Scenario

A text classification service processes documents for a legal firm. Each request runs a BERT inference. The service starts at 2.1 GB RSS. After 24 hours it is at 4.8 GB. After 72 hours it is at 9.3 GB. The server has 16 GB RAM. At roughly 96 hours it will OOM and restart. The service has been restarting every four days for three months. The team knows there is a memory leak. Nobody has found it.

The problem with long-running memory leaks is that they are invisible at small scale. The unit tests pass. The load tests pass (they run for 10 minutes). The staging environment looks fine. Only in production, after hours of continuous traffic, does the growth become measurable.

The engineer who finally diagnosed this had to learn a different way of thinking about the problem. The usual approach - "let me add some print statements" - does not work for memory. You cannot see where the memory is going just by reading the code, because the cause is often something that looks completely innocent: a class attribute, a dictionary update, a logging call. The fix is systematic: take two memory snapshots far apart in time, compare them, and let the numbers tell you where the memory went.

In this case, the leak was in the request logging system. Each request logged a RequestRecord object to a module-level deque with maxlen=10000. But RequestRecord contained a reference to the tokenized input (input_ids, a list of 512 integers). 10,000 records times 512 integers times 28 bytes per Python integer equals approximately 143 MB, not counting the list objects and dict overhead. This grew to 10,000 records over the first few hundred requests and then stayed fixed - but the team had measured RSS growth over hours, not this 143 MB plateau, so they were looking for the wrong kind of leak. The actual growth was from something else: the logging system was also keeping a running statistics dict with per-token frequency counts. After processing millions of documents, the dict had millions of keys. That was the leak.

This lesson is a toolkit. Every technique here exists because no single tool finds every leak. You will use them all at different stages.


Why This Exists

Memory bugs in production are hard to find because the evidence accumulates slowly. A function that allocates 100 KB and never frees it is undetectable in testing (where it runs once) but catastrophic in production (where it runs a million times). The tools in this lesson solve the evidence-gathering problem: they let you observe memory state at two points in time and compare what changed.

The tools divide into two categories. Profilers show you where memory is currently being used - what objects exist, how many bytes they consume, and what allocated them. Detectors find specific classes of bugs: use-after-free (accessing freed memory), double-free (freeing memory twice), and buffer overflows. In Python, pure-Python code cannot have use-after-free or double-free (the GC prevents this). But Python C extensions - NumPy, PyTorch, custom operators - absolutely can, and these bugs are among the hardest to diagnose.


Historical Context

Valgrind was developed by Julian Seward at the University of New South Wales and released in 2002. Valgrind Memcheck works by running the target program in a simulated CPU that tracks the validity and addressability of every byte of memory. It can detect use-after-free, buffer overflows, and memory leaks with extremely high accuracy. The cost is a 10-20x slowdown, making it impractical for production use but invaluable for C extension development.

AddressSanitizer (ASAN) was developed at Google (Konstantin Serebryany and others) in 2012. It works by instrumenting the compiled binary (adding checks at every memory access) rather than simulating a CPU, making it 2x slower rather than 20x. ASAN is built into both GCC and Clang and can be used to build Python itself with memory checking enabled.

Python's tracemalloc module was added in Python 3.4 by Victor Stinner (2013). Before tracemalloc, the only way to track Python allocations was Meliae (a heap inspector that required monkey-patching the allocator) or external tools that could not distinguish Python allocations from C extension allocations. tracemalloc integrates directly with Python's memory allocator hooks, giving accurate per-call-site allocation tracking with minimal overhead (~10%).

memory_profiler (developed by Fabian Pedregosa and Olivier Grisel, around 2011) predates tracemalloc and uses a different approach: it instruments each line of a function by sampling process RSS at each line boundary. This is coarser (line-level rather than allocation-level) but does not require instrumentation and works for any Python code including C extensions.


Core Concepts

Memory Metrics: RSS vs VMS vs PSS vs USS

Before profiling, you need to know which memory metric to measure. They mean different things:

  • RSS (Resident Set Size) - physical RAM currently used by the process. This includes shared libraries. If libc is mapped into both your process and 50 other processes, all 51 processes report libc's pages in their RSS. Commonly overestimates.
  • VMS (Virtual Memory Size) - total virtual address space reserved, including unused pages, swap, and memory-mapped files. Usually much larger than RSS. Rarely useful for leak detection.
  • PSS (Proportional Set Size) - each shared page is counted fractionally (divided by number of processes sharing it). PSS is more accurate than RSS for measuring "actual cost" of a process, but requires reading /proc/PID/smaps (Linux-only).
  • USS (Unique Set Size) - memory pages that belong exclusively to this process (not shared with any other process). This is the most accurate measure of what your process uniquely consumes. Available via psutil on Linux.
import psutil
import os

def memory_report(label=""):
"""Report all four memory metrics for the current process."""
process = psutil.Process(os.getpid())
mem = process.memory_full_info() # requires elevated permissions on some systems

print(f"Memory report [{label}]:")
print(f" RSS: {mem.rss / 1024**2:>8.1f} MB (resident physical memory)")
print(f" VMS: {mem.vms / 1024**2:>8.1f} MB (virtual memory size)")

if hasattr(mem, 'pss'):
print(f" PSS: {mem.pss / 1024**2:>8.1f} MB (proportional, Linux only)")
if hasattr(mem, 'uss'):
print(f" USS: {mem.uss / 1024**2:>8.1f} MB (unique to this process)")

return mem.rss

# Simpler version that works without elevated permissions
def rss_mb():
"""Get RSS in MB. Works everywhere."""
return psutil.Process(os.getpid()).memory_info().rss / 1024**2

print(f"Initial RSS: {rss_mb():.1f} MB")

# Demonstrate RSS growth
before = rss_mb()
large_list = [bytearray(1024 * 1024) for _ in range(100)] # 100 MB
after_alloc = rss_mb()
print(f"After 100 MB allocation: {after_alloc:.1f} MB (+{after_alloc-before:.1f} MB)")

del large_list
import gc
gc.collect()
after_free = rss_mb()
print(f"After free: {after_free:.1f} MB ({after_free-before:.1f} MB retained)")
# glibc often retains freed memory in its heap - this is not a leak,
# but it looks like one if you only measure RSS

Reading /proc/PID/status for RSS Tracking

On Linux, you can read memory directly from the kernel without psutil:

import os
import re

def read_proc_status(pid=None):
"""Read memory stats from /proc/PID/status on Linux."""
if pid is None:
pid = os.getpid()

try:
with open(f'/proc/{pid}/status', 'r') as f:
content = f.read()
except FileNotFoundError:
return {}

fields = {}
for line in content.splitlines():
match = re.match(r'(\w+):\s+(\d+)\s+kB', line)
if match:
fields[match.group(1)] = int(match.group(2))

interesting = ['VmRSS', 'VmSize', 'VmPeak', 'VmData', 'VmStk', 'VmSwap']
result = {}
for field in interesting:
if field in fields:
result[field] = fields[field]
return result

stats = read_proc_status()
if stats:
print("Memory from /proc/self/status:")
for k, v in stats.items():
print(f" {k}: {v:,} kB ({v/1024:.1f} MB)")

# psutil RSS monitoring daemon pattern
import threading
import time
import statistics

class RSSMonitor:
"""Background thread that samples RSS every N seconds."""

def __init__(self, interval=5.0, max_samples=1000):
self.interval = interval
self.max_samples = max_samples
self.samples = []
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)

def start(self):
self._thread.start()
return self

def stop(self):
self._stop.set()
self._thread.join(timeout=self.interval * 2)

def _run(self):
process = psutil.Process(os.getpid())
while not self._stop.wait(self.interval):
try:
rss = process.memory_info().rss / 1024**2
self.samples.append((time.time(), rss))
if len(self.samples) > self.max_samples:
self.samples = self.samples[-self.max_samples:]
except psutil.NoSuchProcess:
break

def report(self):
if not self.samples:
return
rsses = [s[1] for s in self.samples]
print(f"RSS stats ({len(self.samples)} samples, {self.interval}s interval):")
print(f" Min: {min(rsses):.1f} MB")
print(f" Max: {max(rsses):.1f} MB")
print(f" Current: {rsses[-1]:.1f} MB")
print(f" Growth: {rsses[-1] - rsses[0]:.1f} MB over {len(rsses)*self.interval:.0f}s")

# Usage
monitor = RSSMonitor(interval=0.5)
monitor.start()
time.sleep(0.5)

# Simulate workload
data = [list(range(10000)) for _ in range(100)]
time.sleep(1.0)
del data
gc.collect()
time.sleep(1.0)

monitor.stop()
monitor.report()

tracemalloc: The Primary Python Memory Profiler

tracemalloc is the most accurate tool for finding Python-level memory growth. It works by hooking into Python's allocator to record the traceback (call stack) of every allocation:

import tracemalloc
import linecache
import os
import gc

# tracemalloc overhead: ~10% slower execution, some memory for tracking metadata
# Use 10-25 frames for detailed tracebacks; 1 frame for minimal overhead
tracemalloc.start(25)

def format_bytes(n):
if n < 1024:
return f"{n} B"
elif n < 1024**2:
return f"{n/1024:.1f} KB"
else:
return f"{n/1024**2:.1f} MB"

def snapshot_report(snapshot, title="Snapshot", top_n=15, min_size_kb=10):
"""Display a tracemalloc snapshot in human-readable form."""
snapshot = snapshot.filter_traces([
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<frozen importlib._bootstrap_external>"),
tracemalloc.Filter(False, linecache.__file__),
])

stats = snapshot.statistics('traceback')

print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")

total = sum(s.size for s in stats)
print(f"Total tracked: {format_bytes(total)}")
print(f"\nTop {top_n} allocations (>= {min_size_kb} KB):")

shown = 0
for stat in stats[:top_n]:
if stat.size < min_size_kb * 1024:
continue
shown += 1
print(f"\n #{shown}: {format_bytes(stat.size)} "
f"({stat.count} objects)")
for line in stat.traceback.format()[:5]:
print(f" {line}")

def diff_report(snap1, snap2, title="Memory Growth", top_n=10):
"""Show what grew between two snapshots."""
snap2 = snap2.filter_traces([
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<frozen importlib._bootstrap_external>"),
])

stats = snap2.compare_to(snap1, 'traceback')

print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")

total_growth = sum(s.size_diff for s in stats if s.size_diff > 0)
print(f"Total growth: {format_bytes(total_growth)}")

print(f"\nTop {top_n} growers:")
count = 0
for stat in stats:
if stat.size_diff <= 0:
continue
count += 1
if count > top_n:
break
print(f"\n #{count}: +{format_bytes(stat.size_diff)} "
f"({stat.count_diff:+d} objects)")
for line in stat.traceback.format()[:3]:
print(f" {line}")

# Take baseline snapshot after warmup
gc.collect()
baseline = tracemalloc.take_snapshot()

# Simulate a leaky workload: accumulates data without clearing
running_stats = {} # module-level accumulator - the "leak"

def process_request(request_id, tokens):
"""Simulate processing that has a subtle leak."""
for token in tokens:
if token not in running_stats:
running_stats[token] = {'count': 0, 'requests': []}
running_stats[token]['count'] += 1
running_stats[token]['requests'].append(request_id) # grows without bound!

# Process some requests
import random
for req_id in range(500):
tokens = [f"token_{random.randint(0, 100)}" for _ in range(50)]
process_request(req_id, tokens)

after_500 = tracemalloc.take_snapshot()

# Process 500 more
for req_id in range(500, 1000):
tokens = [f"token_{random.randint(0, 100)}" for _ in range(50)]
process_request(req_id, tokens)

after_1000 = tracemalloc.take_snapshot()

# Report
snapshot_report(after_1000, "After 1000 requests")
diff_report(baseline, after_1000, "Growth from baseline to 1000 requests")
diff_report(after_500, after_1000, "Growth from 500 to 1000 requests (shows steady leak)")

tracemalloc.stop()

memory_profiler: Line-Level Memory Usage

memory_profiler complements tracemalloc by showing RSS change per line, which is useful when you want to understand the memory cost of a specific function:

# Install: pip install memory-profiler
# Usage: mprof run script.py (generates memory usage over time)
# Or use the @profile decorator (requires running with: python -m memory_profiler)

from memory_profiler import profile
import numpy as np
import time

@profile
def data_loading_pipeline(n_samples=10000, feature_dim=768):
"""
A data loading function with annotated memory usage per line.
Run with: python -m memory_profiler this_script.py
"""
# Load raw data as Python list of dicts
raw_data = [
{'id': i, 'features': [float(j) for j in range(feature_dim)]}
for i in range(n_samples)
] # <-- large: n_samples * feature_dim Python floats = ~130 MB for 10K * 768

# Convert to NumPy array: massive reduction
feature_matrix = np.array(
[sample['features'] for sample in raw_data],
dtype=np.float32
) # <-- 10000 * 768 * 4 bytes = ~29 MB (NumPy, no per-element overhead)

# Release the Python list - should reduce RSS significantly
del raw_data
import gc
gc.collect()

# Process the NumPy matrix
norms = np.linalg.norm(feature_matrix, axis=1, keepdims=True)
normalized = feature_matrix / norms # <-- additional 29 MB

del feature_matrix
del norms

return normalized

# Running memory_profiler programmatically (no decorator needed)
from memory_profiler import memory_usage

def measured_function():
data = [list(range(1000)) for _ in range(1000)]
time.sleep(0.1)
del data
gc.collect()

# Sample memory every 0.01 seconds during execution
mem_usage = memory_usage(measured_function, interval=0.01, timeout=10)
print(f"\nMemory usage over time (RSS MB):")
print(f" Start: {mem_usage[0]:.1f} MB")
print(f" Peak: {max(mem_usage):.1f} MB")
print(f" End: {mem_usage[-1]:.1f} MB")
print(f" Delta: {max(mem_usage) - mem_usage[0]:.1f} MB peak increase")

Valgrind Memcheck for C Extensions

When a Python C extension (NumPy custom op, PyTorch custom kernel, compiled cython module) crashes with a segfault or causes corrupted memory, Valgrind Memcheck is the definitive tool. It runs the entire program under a simulated CPU that tracks validity of every byte:

# Build Python with debug flags (no optimizations, debug symbols)
# Or use pyenv to install a debug Python:
# pyenv install --debug 3.11.6

# Run Python under Valgrind
# --tool=memcheck: use the memory checker (default)
# --leak-check=full: show all leaks at exit
# --show-leak-kinds=all: include indirect and still-reachable
# --track-origins=yes: show where uninitialized values came from (slower)
# --suppressions: suppress known Python false positives

valgrind \
--tool=memcheck \
--leak-check=full \
--show-leak-kinds=all \
--track-origins=yes \
--suppressions=$(python3 -c "import sys; print(sys.prefix)")/lib/python3.11/test/valgrind-python.supp \
python3 -c "
import numpy as np
# Test your C extension here
arr = np.zeros(1000, dtype=np.float64)
result = arr.sum()
print(result)
"

# Valgrind output format:
# ==12345== Invalid read of size 8
# ==12345== at 0x... (function in your extension)
# ==12345== by 0x... (caller)
# ==12345== Address 0x... is 16 bytes after a block of size 80 alloc'd
# ==12345== at 0x... (malloc)
# This means: you read 8 bytes past the end of an 80-byte allocation
# Common Valgrind errors and what they mean:

# "Invalid read of size N" at ADDRESS
# - You are reading from memory you do not own
# - Causes: array out-of-bounds, use-after-free, uninitialized pointer

# "Invalid write of size N" at ADDRESS
# - You are writing to memory you do not own
# - More dangerous than invalid read (corrupts state)

# "Conditional jump or move depends on uninitialised value(s)"
# - Using a variable before initializing it
# - With --track-origins=yes, Valgrind shows where it was allocated

# "N bytes in 1 blocks are definitely lost in loss record M of N"
# - You allocated memory and never freed it (true leak)

# "N bytes in 1 blocks are possibly lost"
# - You have a pointer but it does not point to the start of the block
# - Could be a pointer into the middle of an array (not always a bug)

AddressSanitizer (ASAN): Faster than Valgrind

ASAN is a compiler-based sanitizer that detects memory errors at runtime with much less overhead than Valgrind (2x vs 20x):

# Build Python with ASAN enabled (CPython development)
# This is for developing/testing C extensions that will be loaded into Python

# Option 1: Build a debug Python with ASAN
./configure --with-address-sanitizer --without-pymalloc
make -j$(nproc)

# Option 2: Use an ASAN-instrumented Python from your distro
# (some distributions provide python3-dbg packages)

# Run with ASAN
ASAN_OPTIONS="detect_leaks=1:abort_on_error=0" \
LD_PRELOAD=$(gcc -print-file-name=libasan.so) \
python3 your_extension_test.py

# ASAN output format:
# ==12345==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x...
# READ of size 8 at 0x... thread T0
# #0 0x... in your_function extensions/mymodule.c:42
# #1 0x... in PyObject_Call ...
# 0x... is located 0 bytes to the right of 64-byte region [0x..., 0x...)
# allocated by thread T0 here:
# #0 0x... in malloc
# #1 0x... in your_alloc_function extensions/mymodule.c:15

# ASAN detects:
# - Heap buffer overflow (write past end of malloc'd region)
# - Use-after-free
# - Double-free
# - Stack buffer overflow
# - Memory leaks (with LeakSanitizer enabled)
# Testing a C extension with ASAN from Python
# This shows how to structure tests that expose memory bugs

import ctypes
import os

# Suppose you have a C extension 'myextension.so' that you suspect has a bug
# Build it with: gcc -g -fsanitize=address -shared -fPIC mymodule.c -o mymodule_asan.so

# Simple demonstration: use ctypes to call malloc/free directly
# and intentionally trigger an error for illustration

def demonstrate_asan_detection():
"""
Shows what ASAN would catch if this were a C extension.
This is Python-safe code showing the concepts.
"""
print("ASAN-detectable errors (would crash a C extension):")
print()
print("1. Heap buffer overflow:")
print(" char* buf = malloc(8);")
print(" buf[8] = 'x'; // writes 1 byte past end - ASAN catches this")
print()
print("2. Use-after-free:")
print(" char* ptr = malloc(64);")
print(" free(ptr);")
print(" ptr[0] = 'x'; // ASAN catches this")
print()
print("3. Double-free:")
print(" char* ptr = malloc(64);")
print(" free(ptr);")
print(" free(ptr); // ASAN catches this")
print()
print("4. Memory leak (LeakSanitizer):")
print(" char* ptr = malloc(1024);")
print(" // never freed before program exit - LSan reports this")

demonstrate_asan_detection()

Common Memory Leaks in Python ML Systems

The most common sources of memory leaks in Python ML services:

import sys
import gc
import weakref
import tracemalloc

# ============================================================
# LEAK PATTERN 1: Global variable accumulation
# ============================================================
request_log = [] # grows without bound

def handle_request_bad(request_data):
result = {"input": request_data, "output": [float(i) for i in range(100)]}
request_log.append(result) # LEAK: request_log grows forever
return result["output"]

# Fix: use a bounded structure
from collections import deque
request_log_fixed = deque(maxlen=1000) # automatically discards old entries

def handle_request_good(request_data):
result = {"output": [float(i) for i in range(100)]}
request_log_fixed.append(result["output"]) # and don't store input
return result["output"]

# ============================================================
# LEAK PATTERN 2: Circular references in callbacks
# ============================================================
class ModelWrapper:
def __init__(self, model_name):
self.model_name = model_name
self.callbacks = [] # list of callback functions
self.history = [] # training history

def register_callback(self, fn):
self.callbacks.append(fn)

def train_step(self, loss):
self.history.append(loss)

# This creates a cycle: wrapper -> callbacks -> lambda -> wrapper
def create_leaky_wrapper():
wrapper = ModelWrapper("gpt2")
wrapper.register_callback(
lambda loss: wrapper.history # captures 'wrapper' - cycle!
)
return wrapper

# Fix: use weakref
def create_safe_wrapper():
wrapper = ModelWrapper("gpt2")
wrapper_ref = weakref.ref(wrapper)

def safe_callback(loss):
w = wrapper_ref()
if w is not None:
return w.history

wrapper.register_callback(safe_callback)
return wrapper

# ============================================================
# LEAK PATTERN 3: Exception traceback retention
# ============================================================
stored_exceptions = [] # common in error reporting systems

def process_with_error_storage(data):
try:
result = 1 / data # may raise ZeroDivisionError
return result
except ZeroDivisionError:
exc_info = sys.exc_info()
# LEAK: exc_info[2] is the traceback, which holds a reference
# to the stack frame, which holds all local variables
stored_exceptions.append(exc_info) # holds entire frame!
return None

# Fix: extract what you need and drop the traceback
import traceback as tb_module

safe_exceptions = []

def process_with_safe_error_storage(data):
try:
return 1 / data
except ZeroDivisionError:
# Extract string representation - no frame references
safe_exceptions.append(tb_module.format_exc())
return None

# ============================================================
# LEAK PATTERN 4: Unclosed file handles (affects DataLoaders)
# ============================================================
def leaky_data_reader(filepath):
f = open(filepath) # LEAK: file never closed if function raises
data = f.read() # if this raises, f is never closed
f.close()
return data

def safe_data_reader(filepath):
with open(filepath) as f: # always closed, even if read() raises
return f.read()

# Check for open file handles
def count_open_files():
import psutil
proc = psutil.Process()
return len(proc.open_files())

print(f"Open files: {count_open_files()}")

# ============================================================
# LEAK PATTERN 5: DataLoader worker leaks
# ============================================================
# PyTorch DataLoader workers are forked processes
# Each worker gets a copy of all Python objects in the parent process
# including large module-level variables

# Pattern that causes workers to use excessive memory:
# BAD: large data cached at module level before forking
HUGE_VOCAB = {f"word_{i}": i for i in range(500000)} # 500K entries at module level
# Every DataLoader worker inherits this 500K dict

# Better: load data lazily inside the worker, or use shared memory
# torch.multiprocessing.Manager().dict() for shared state
# or memory-mapped files (numpy.memmap) for read-only data

DataLoader Memory Leak Detection

PyTorch DataLoaders run workers in separate processes. Memory leaks in workers are particularly insidious because the worker processes are hidden:

import os
import psutil
import time

def find_dataloader_worker_processes():
"""Find DataLoader worker processes spawned by the current process."""
current_pid = os.getpid()
current_proc = psutil.Process(current_pid)

children = current_proc.children(recursive=True)
print(f"Child processes of PID {current_pid}:")
total_rss = 0
for child in children:
try:
rss = child.memory_info().rss / 1024**2
cmdline = ' '.join(child.cmdline()[:3])
print(f" PID {child.pid}: {rss:.1f} MB - {cmdline[:50]}")
total_rss += rss
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
print(f" Total child RSS: {total_rss:.1f} MB")

# Demonstrate with a mock multi-process scenario
def monitor_worker_memory(dataloader, n_batches=10):
"""
Monitor memory growth across DataLoader workers.
Only works with num_workers > 0 (spawns actual worker processes).
"""
parent_rss_before = psutil.Process().memory_info().rss / 1024**2

batch_count = 0
for batch in dataloader:
batch_count += 1
if batch_count >= n_batches:
break

if batch_count % 5 == 0:
parent_rss = psutil.Process().memory_info().rss / 1024**2
children = psutil.Process().children(recursive=True)
child_rss = sum(
c.memory_info().rss / 1024**2
for c in children
if c.is_running()
)
print(f"Batch {batch_count}: parent={parent_rss:.1f}MB, "
f"workers={child_rss:.1f}MB, "
f"total={parent_rss+child_rss:.1f}MB")

parent_rss_after = psutil.Process().memory_info().rss / 1024**2
print(f"Parent RSS change: {parent_rss_after - parent_rss_before:+.1f} MB")

# Usage with PyTorch (if available)
try:
import torch
from torch.utils.data import Dataset, DataLoader

class SimpleDataset(Dataset):
def __init__(self, size=1000):
# This data is shared with workers via fork
# Large datasets here are copied to each worker
self.data = list(range(size))

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
return torch.tensor(self.data[idx], dtype=torch.float32)

dataset = SimpleDataset(size=10000)
dataloader = DataLoader(
dataset,
batch_size=32,
num_workers=2, # spawns 2 worker processes
persistent_workers=True # workers stay alive between epochs
)

print("Monitoring DataLoader worker memory:")
monitor_worker_memory(dataloader, n_batches=20)

except ImportError:
print("PyTorch not available - DataLoader demo skipped")

GPU Memory Debugging

import torch
import gc

def full_gpu_memory_debug(device: int = 0) -> None:
"""
Complete GPU memory debug procedure.
Call this when you hit CUDA OOM or notice unexpected GPU memory growth.
"""
if not torch.cuda.is_available():
print("CUDA not available")
return

print("=" * 60)
print("GPU Memory Debug Report")
print("=" * 60)

# 1. Basic stats
allocated = torch.cuda.memory_allocated(device) / 1024**3
reserved = torch.cuda.memory_reserved(device) / 1024**3
total = torch.cuda.get_device_properties(device).total_memory / 1024**3

print(f"\n[1] Memory overview (device {device}):")
print(f" Total GPU: {total:.2f} GB")
print(f" Reserved: {reserved:.2f} GB ({reserved/total:.1%} of total)")
print(f" Allocated: {allocated:.2f} GB ({allocated/total:.1%} of total)")
print(f" Cache free: {reserved-allocated:.2f} GB")
print(f" OS free: {total-reserved:.2f} GB")

# 2. Memory stats for fragmentation analysis
stats = torch.cuda.memory_stats(device)
print(f"\n[2] Allocator statistics:")
key_stats = {
'Peak allocated (GB)': stats.get('active_bytes.all.peak', 0) / 1024**3,
'Alloc retries': stats.get('num_alloc_retries', 0),
'OOM events': stats.get('num_ooms', 0),
'Fragment bytes (MB)': stats.get('inactive_split_bytes.all.current', 0) / 1024**2,
}
for k, v in key_stats.items():
print(f" {k:30s}: {v:.3f}" if isinstance(v, float) else f" {k:30s}: {v}")

# 3. Try to find large tensors in Python's object graph
print(f"\n[3] Large CUDA tensors in Python object graph:")
large_tensors = []
for obj in gc.get_objects():
if isinstance(obj, torch.Tensor) and obj.is_cuda:
size_mb = obj.element_size() * obj.nelement() / 1024**2
if size_mb > 10: # only show tensors > 10 MB
large_tensors.append((size_mb, obj.shape, obj.dtype))

large_tensors.sort(reverse=True)
for size_mb, shape, dtype in large_tensors[:10]:
print(f" {size_mb:8.1f} MB shape={shape} dtype={dtype}")

if not large_tensors:
print(" No large tensors found in Python object graph")
print(" (Memory may be held by C++ objects or autograd graph)")

# 4. Recommendation
print(f"\n[4] Diagnosis:")
frag = reserved - allocated
if stats.get('num_ooms', 0) > 0:
print(f" OOM events detected: {stats['num_ooms']}")
if frag > 1.0:
print(f" High fragmentation: {frag:.1f} GB in allocator cache")
print(f" Try: torch.cuda.empty_cache()")
if stats.get('num_alloc_retries', 0) > 0:
print(f" Alloc retries: {stats['num_alloc_retries']} (fragmentation indicator)")
if allocated / total > 0.9:
print(f" Near capacity: {allocated/total:.1%} of GPU memory allocated")
print(f" Consider: gradient checkpointing, smaller batch size, model parallelism")

def detect_gpu_memory_leak(device: int = 0, n_steps: int = 100) -> None:
"""
Detect memory leaks in a training loop by monitoring allocation growth.
Run your training step inside the callback.
"""
if not torch.cuda.is_available():
print("CUDA not available")
return

torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats(device)

allocation_history = []

for step in range(n_steps):
# Sample point: run your training step here
# For demo, just allocate a fixed-size tensor and delete it
t = torch.zeros(1024, 1024, device=f'cuda:{device}')
result = t.mean()
del t
del result

allocated_mb = torch.cuda.memory_allocated(device) / 1024**2
allocation_history.append(allocated_mb)

# Check for monotonic growth (leak signature)
first_10 = sum(allocation_history[:10]) / 10
last_10 = sum(allocation_history[-10:]) / 10
growth = last_10 - first_10

print(f"Memory leak detection ({n_steps} steps):")
print(f" First 10 steps avg: {first_10:.1f} MB")
print(f" Last 10 steps avg: {last_10:.1f} MB")
print(f" Growth: {growth:+.1f} MB")

if growth > 10:
print(f" LEAK DETECTED: {growth:.1f} MB growth over {n_steps} steps")
print(f" Likely causes:")
print(f" - Tensors held by autograd graph not being freed")
print(f" - loss.item() not called (retains computation graph)")
print(f" - Tensors stored in Python lists for logging")
else:
print(f" No significant leak detected")

if torch.cuda.is_available():
full_gpu_memory_debug()
detect_gpu_memory_leak(n_steps=50)

torch.cuda.memory_snapshot(): Detailed Block Analysis

PyTorch 2.0+ provides a detailed memory snapshot that shows every block in the caching allocator:

import torch

def analyze_memory_snapshot(device: int = 0) -> None:
"""
Use PyTorch's memory snapshot for deep analysis.
Requires PyTorch >= 2.0.
"""
if not torch.cuda.is_available():
return

try:
torch.cuda.memory._record_memory_history(
max_entries=100000,
trace_alloc_stack_depth=10,
context="all"
)
print("Memory history recording started")

# Run your workload here
tensors = [
torch.zeros(256, 256, device=f'cuda:{device}') for _ in range(10)
]

# Take snapshot
snapshot = torch.cuda.memory._snapshot()

# Summarize
segments = snapshot.get('segments', [])
print(f"\nMemory snapshot summary:")
print(f" Segments: {len(segments)}")

total_reserved = 0
total_allocated = 0

for seg in segments:
seg_reserved = seg.get('total_size', 0)
seg_allocated = sum(
block.get('size', 0)
for block in seg.get('blocks', [])
if block.get('state') == 'active_allocated'
)
total_reserved += seg_reserved
total_allocated += seg_allocated

print(f" Reserved: {total_reserved / 1024**2:.1f} MB")
print(f" Allocated: {total_allocated / 1024**2:.1f} MB")

# Optionally save snapshot for visualization
# import pickle
# with open('/tmp/cuda_snapshot.pkl', 'wb') as f:
# pickle.dump(snapshot, f)
# Visualize with: python -m torch.cuda._memory_viz trace_plot /tmp/cuda_snapshot.pkl

del tensors
torch.cuda.empty_cache()

except AttributeError:
print("Memory snapshot requires PyTorch >= 2.0")
except Exception as e:
print(f"Snapshot failed: {e}")
finally:
try:
torch.cuda.memory._record_memory_history(enabled=None)
except Exception:
pass

if torch.cuda.is_available():
analyze_memory_snapshot()

Production Debugging Workflow

The systematic approach to diagnosing memory growth in production:

import tracemalloc
import gc
import psutil
import os
import time
import threading
from collections import deque

class ProductionMemoryMonitor:
"""
Production-grade memory monitoring for long-running ML services.

Usage:
monitor = ProductionMemoryMonitor()
monitor.start()
# ... run your service ...
monitor.report()
monitor.stop()
"""

def __init__(
self,
rss_interval: float = 60.0, # sample RSS every 60 seconds
snapshot_interval: float = 600.0, # take tracemalloc snapshot every 10 min
alert_threshold_mb: float = 500.0, # alert if RSS grows > 500 MB
):
self.rss_interval = rss_interval
self.snapshot_interval = snapshot_interval
self.alert_threshold_mb = alert_threshold_mb

self._rss_history = deque(maxlen=1440) # 24 hours at 1-minute intervals
self._snapshots = []
self._baseline_rss = None
self._stop = threading.Event()

# Start tracemalloc
tracemalloc.start(10)
self._baseline_snapshot = None

def start(self):
"""Start background monitoring."""
gc.collect()
self._baseline_rss = psutil.Process(os.getpid()).memory_info().rss / 1024**2
self._baseline_snapshot = tracemalloc.take_snapshot()

self._rss_thread = threading.Thread(target=self._monitor_rss, daemon=True)
self._snap_thread = threading.Thread(target=self._monitor_snapshots, daemon=True)
self._rss_thread.start()
self._snap_thread.start()

print(f"Memory monitoring started. Baseline RSS: {self._baseline_rss:.1f} MB")
return self

def _monitor_rss(self):
process = psutil.Process(os.getpid())
while not self._stop.wait(self.rss_interval):
try:
rss = process.memory_info().rss / 1024**2
self._rss_history.append((time.time(), rss))

growth = rss - self._baseline_rss
if growth > self.alert_threshold_mb:
print(f"[MEMORY ALERT] RSS grew by {growth:.1f} MB "
f"(baseline={self._baseline_rss:.1f}, "
f"current={rss:.1f})")
except Exception as e:
print(f"RSS monitoring error: {e}")

def _monitor_snapshots(self):
while not self._stop.wait(self.snapshot_interval):
try:
gc.collect()
snap = tracemalloc.take_snapshot()
self._snapshots.append((time.time(), snap))
if len(self._snapshots) > 10:
self._snapshots = self._snapshots[-10:]
except Exception as e:
print(f"Snapshot error: {e}")

def report(self):
"""Print a comprehensive memory report."""
if not self._rss_history:
print("No data collected yet")
return

rss_values = [r for _, r in self._rss_history]
current_rss = rss_values[-1]
growth = current_rss - self._baseline_rss

print(f"\n{'='*60}")
print("Production Memory Report")
print(f"{'='*60}")
print(f"Baseline RSS: {self._baseline_rss:.1f} MB")
print(f"Current RSS: {current_rss:.1f} MB")
print(f"Growth: {growth:+.1f} MB")
print(f"Peak RSS: {max(rss_values):.1f} MB")
print(f"Samples: {len(rss_values)}")

# Compute growth rate (linear regression slope)
if len(rss_values) > 2:
n = len(rss_values)
x_mean = n / 2
y_mean = sum(rss_values) / n
slope_num = sum((i - x_mean) * (rss_values[i] - y_mean)
for i in range(n))
slope_den = sum((i - x_mean)**2 for i in range(n))
if slope_den > 0:
slope = slope_num / slope_den
rate_per_hour = slope * 3600 / self.rss_interval
print(f"Growth rate: {rate_per_hour:+.1f} MB/hour")

# Tracemalloc comparison
if self._baseline_snapshot and self._snapshots:
_, latest_snapshot = self._snapshots[-1]
stats = latest_snapshot.compare_to(
self._baseline_snapshot, 'lineno'
)
print(f"\nTop allocations since baseline:")
for stat in stats[:5]:
if stat.size_diff > 1024 * 100: # > 100 KB
print(f" +{stat.size_diff/1024:.0f} KB: {stat}")

def stop(self):
self._stop.set()
tracemalloc.stop()

# Usage in a serving application
monitor = ProductionMemoryMonitor(
rss_interval=1.0, # fast for demo
snapshot_interval=5.0,
alert_threshold_mb=50.0
)
monitor.start()

# Simulate 30 seconds of request handling with a small leak
leak_accumulator = []
for i in range(100):
leak_accumulator.append({'data': list(range(100)), 'id': i})
time.sleep(0.1)

monitor.report()
monitor.stop()
danger

Never call tracemalloc.start() in production without measuring its overhead first. For most Python ML services, the overhead is 5-15% slower execution and ~10% more memory (for tracking metadata). In latency-sensitive serving (target P99 under 50 ms), this may be unacceptable. Consider enabling tracemalloc only on a canary instance (one replica out of N) rather than fleet-wide, and compare performance metrics between instrumented and uninstrumented replicas.

warning

gc.get_objects() is expensive: it acquires the GIL and iterates through all tracked Python objects, which can take several milliseconds in a large process. Never call this in a request handler. Use it only in diagnostic scripts or background threads during maintenance windows.


Interview Q&A

Q1: What is the difference between RSS, PSS, and USS, and which should you use to detect a Python memory leak?

RSS (Resident Set Size) is the physical RAM used by the process, including pages shared with other processes (like shared libraries). On a machine running 50 Python processes, each one reports libc's 2 MB in its RSS, even though the physical RAM is shared. PSS (Proportional Set Size) counts shared pages fractionally: if a 2 MB shared library is shared among 50 processes, each gets 2MB/50 = 40 KB credit. PSS is more accurate but requires reading /proc/PID/smaps. USS (Unique Set Size) counts only pages that belong exclusively to your process - the most precise measure of what your process "costs." For detecting a Python-level memory leak (the common case), RSS is usually sufficient because Python allocations are not shared and will show up clearly in RSS growth. For distinguishing a real leak from shared library loading effects, use USS. On macOS, psutil provides these via memory_full_info().

Q2: Walk me through how you would use tracemalloc to find a memory leak in a production service.

Five steps. First, start tracemalloc early in the process startup with a sufficient frame depth: tracemalloc.start(25). Too few frames makes tracebacks useless; 25 is usually enough to identify the call site. Second, let the service handle some requests to warm up (imports, JIT compilation, initial model loading will show up as allocations you do not care about). Third, take a baseline snapshot: snap1 = tracemalloc.take_snapshot(). Fourth, let the service run under production-like load for 10-60 minutes. Fifth, take a second snapshot and compare: stats = snap2.compare_to(snap1, 'lineno'). Sort by size_diff descending and look for allocations that grew monotonically. The traceback tells you the exact file and line that allocated the growing objects. Common findings: a module-level list appended to on every request, a dict accumulating keys without eviction, or a cache with no size limit.

Q3: When should you use Valgrind vs ASAN for debugging a Python C extension?

Use ASAN for regular development. It requires compiling the extension (and ideally Python itself) with -fsanitize=address, which is standard in development builds. ASAN adds ~2x runtime overhead and catches heap buffer overflows, use-after-free, and double-free in real time with a precise error report including the call stack at both the allocation and the bad access. Use Valgrind Memcheck when ASAN is not available (pre-compiled binary, no source access, or you need to test a release build), or when you need leak detection without recompilation (Valgrind works on unmodified binaries). Valgrind is also better for detecting reads of uninitialized memory (--track-origins=yes traces where uninitialized values originated). The 20x slowdown means Valgrind tests must use very small datasets. One practical pattern: use ASAN in your CI pipeline (catches errors fast), and Valgrind in a nightly leak-detection job (thorough but slow).

Q4: What are the most common memory leaks in PyTorch training loops, and how do you detect each?

Three most common. First, retaining computation graphs: if you store loss or intermediate tensors in a list without calling .item() or .detach(), the autograd graph attached to them stays alive. Detection: watch torch.cuda.memory_allocated() across steps; if it grows monotonically it is almost always this. Fix: use loss.item() for scalars, .detach() for tensors you want to keep.

Second, global metrics buffers: code that logs losses.append(loss.item()) every step and never clears the list. After 100K steps this list has 100K Python floats. Detection: tracemalloc diff shows growth in list allocations. Fix: clear the buffer at epoch boundaries or use a fixed-size deque.

Third, DataLoader worker leaks: each worker inherits the parent's memory at fork time. Large datasets, tokenizers, or vocabs loaded at module level before creating the DataLoader are copied to every worker. A 1 GB vocab loaded before 8 workers means 8 GB used for the vocab alone. Detection: check child process RSS with psutil.Process().children(). Fix: load large data lazily inside Dataset.__getitem__, or use memory-mapped files (numpy.memmap, torch.load with mmap_mode).

Q5: How does memory_profiler's @profile decorator work, and when is it more useful than tracemalloc?

memory_profiler's @profile decorator works by inserting RSS sampling at every line boundary of the decorated function. Before each line executes, it records the current RSS; after each line executes, it records RSS again. The output shows the RSS change at each line, making it easy to see which lines cause large allocations. This is more useful than tracemalloc in two scenarios. First, when you want to understand a specific function's memory behavior line by line (tracemalloc gives you allocation sites but not line-level deltas within a function). Second, for diagnosing C extension memory use: tracemalloc only tracks Python-level allocations (via pymalloc), so a NumPy operation that allocates 2 GB internally shows up as near-zero in tracemalloc but clearly in memory_profiler (because RSS increases). The downside is that RSS sampling has ~5ms granularity, so very fast functions may not show meaningful data.

Q6: What is the correct procedure when a training job crashes with CUDA OOM at a specific step?

Four-step procedure. Step 1: determine if it is genuine OOM or fragmentation OOM. Wrap the crashing step in a try/except and call torch.cuda.memory_stats() in the except clause. If reserved - allocated > 1 GB, it is fragmentation - try torch.cuda.empty_cache() before the allocation. Step 2: if genuine OOM, add memory profiling. Use torch.cuda.memory._record_memory_history() and take a snapshot just before the crash. This shows every live allocation with the Python call site. Step 3: look for unexpected accumulation. The most common culprits are: (a) the autograd graph of a loss that was not properly detached, (b) activation tensors from gradient checkpointing that were not released, (c) optimizer state (for Adam, this is 2x the model size in fp32). Step 4: structural fixes. Enable gradient checkpointing (model.gradient_checkpointing_enable()), reduce batch size, use mixed precision (bf16 halves parameter memory), or use ZeRO sharding via DeepSpeed or FSDP.

Q7: How do you detect that a memory issue is allocator fragmentation rather than a Python-level leak?

The key diagnostic is comparing Python-tracked allocations against OS-observed RSS. If RSS is much larger than the memory tracked by Python, the gap is in the allocator. Procedure: (1) Check Python-tracked memory: tracemalloc.get_traced_memory() returns current and peak bytes tracked. (2) Check RSS: psutil.Process().memory_info().rss. (3) If RSS - tracked > 500 MB, the gap is in C-level allocations (NumPy, PyTorch CPU tensors) or allocator fragmentation. To distinguish: check torch.cuda.memory_allocated() for GPU tensors, arr.nbytes for NumPy arrays, and compare their sum against RSS. If the accounting still does not add up, use MALLOC_TRIM_THRESHOLD_=100000 LD_PRELOAD=libjemalloc.so - if RSS drops significantly after switching to jemalloc with aggressive purging, the original issue was allocator fragmentation in glibc, not a real leak.

© 2026 EngineersOfAI. All rights reserved.