Skip to main content

File Systems and IO Patterns

The Checkpoint That Would Not Write

A team was training a 65-billion-parameter model on 512 GPUs. After 72 hours of training, a gradient explosion killed the run. The automatic checkpoint system should have saved state every 2 hours, giving them at most 2 hours of lost work. Instead, they lost the entire 72 hours.

The investigation revealed a chain of failures. The checkpoint code called torch.save() to write a 130 GB checkpoint file, then continued training. torch.save() returned quickly - in about 4 seconds. The team had assumed this meant the data was on disk. It was not. Linux had buffered the entire 130 GB in the page cache as dirty pages. The actual disk writes happened asynchronously in the background over the next 40 minutes.

When the gradient explosion crashed the training process 30 minutes into writing the checkpoint, the kernel killed the process. The dirty pages in the page cache were discarded. The checkpoint file on disk was incomplete - 45 GB out of 130 GB written. Every previous checkpoint had been quietly corrupted the same way: a race between "written to page cache" and "process crashes before kernel flushes to disk."

The fix required understanding three things: what write() actually does (copies to page cache, does not guarantee durability), what fsync() does (flushes page cache to disk, blocks until done), and the performance cost of calling fsync() after every checkpoint (40 minutes of disk-bound wait during which the GPUs sit idle). The final solution used a background thread with fsync() and a flag to indicate checkpoint durability, allowing training to continue while the checkpoint was being persisted.

This incident captures everything important about file systems for ML engineers. The gap between "I called write()" and "the data is on disk" is not a quirk - it is a fundamental design decision that trades durability for performance. Understanding where that gap is, how large it can be, and how to close it when you need to is what separates an ML engineer who loses 72 hours of training from one who does not.


Why This Exists - The Storage Hierarchy Problem

A modern NVMe SSD reads sequential data at roughly 7 GB/s. A CPU can process data from L3 cache at roughly 200 GB/s. Even the fastest storage device is 30x slower than cache. The file system's job is to bridge this gap - to make storage appear as fast as possible while maintaining the semantics programmers expect: data written should be readable, and data explicitly committed should survive crashes.

The key abstraction is the page cache: a portion of RAM that the kernel uses to buffer file data. Reads are served from the page cache when possible (no disk access). Writes go to the page cache first and are flushed to disk by background kernel threads. This makes most I/O operations appear nearly instant, at the cost of durability: a crash before the page cache is flushed loses data.

File systems are the layer that manages how data is organized on disk: where files live, how they are indexed, how metadata is maintained, and how the system recovers from crashes. Different file systems make different trade-offs between throughput, latency, reliability, and feature set - and those trade-offs matter for ML workloads.


Historical Context

1969 - Unix file system. The original Unix file system (UFS) introduced the concepts that all modern file systems build on: inodes, directory entries as names-to-inode mappings, and data blocks. Everything in Unix is a file - including devices, pipes, and sockets.

1991 - ext (Extended File System). The first Linux-specific file system, designed to overcome limitations in Minix's file system. ext2 (1993) added longer filenames and larger files. ext3 (2001) added journaling for crash recovery. ext4 (2008) added extents (contiguous block allocation), reducing fragmentation for large files like ML model checkpoints.

1994 - XFS. SGI created XFS for high-throughput workloads on IRIX. It used B-tree-based allocation and journaling from the start. XFS excels at large files and high-concurrency I/O - making it the preferred choice for HPC and ML storage servers. Merged into Linux 2.4.25 (2004).

2001 - The VFS layer matures. Linux's Virtual File System (VFS) abstraction allows mounting different file system types transparently. Applications use the same open()/read()/write() calls regardless of whether the underlying storage is ext4, XFS, NFS, or a FUSE filesystem.

2019 - io_uring. Jens Axboe introduced io_uring in Linux 5.1 - a shared ring buffer interface between userspace and kernel that eliminates the system call overhead for I/O operations. io_uring allows batching hundreds of I/O operations with a single system call, reducing overhead by 2-5x compared to traditional read()/write() calls.


Core Concepts

The VFS Layer

Every file on a Unix file system is represented by an inode - a metadata structure that stores:

  • File size, ownership (UID/GID), permissions
  • Timestamps (access time, modification time, change time)
  • Number of hard links to this inode
  • Pointers to data blocks (or extents for large files)

A hardlink is a directory entry that points to an existing inode. Multiple directory entries can point to the same inode - the inode's link count tracks how many. The file data is only deleted when the link count drops to zero AND no process has the file open. Hardlinks are used for space-efficient model versioning: multiple "versions" of a checkpoint share the unchanged layers' data blocks.

A symlink is a file that contains a path to another file. It is a level of indirection - accessing a symlink accesses the target. Symlinks can be broken (if the target is deleted). They can cross file system boundaries (hardlinks cannot).

import os
import stat
from pathlib import Path

def inode_info(path: str) -> dict:
"""Get inode information for a file."""
st = os.stat(path)
return {
"inode": st.st_ino,
"nlinks": st.st_nlink, # hard link count
"size_bytes": st.st_size,
"blocks": st.st_blocks, # 512-byte blocks allocated (not file size!)
"uid": st.st_uid,
"gid": st.st_gid,
"mode": oct(stat.S_IMODE(st.st_mode)),
}

def hardlink_for_model_versioning(
base_checkpoint: str,
versioned_path: str
) -> None:
"""
Create a hardlink for zero-copy checkpoint versioning.
Both paths point to the same inode - no disk space used.
Modifying one does NOT modify the other (Python file writes
create new blocks on CoW-capable filesystems, but hardlinks
still share the same inode initially).
"""
# PyTorch checkpoints are often a directory with many tensor files
src = Path(base_checkpoint)
dst = Path(versioned_path)
dst.mkdir(parents=True, exist_ok=True)

for src_file in src.rglob("*"):
if src_file.is_file():
dst_file = dst / src_file.relative_to(src)
dst_file.parent.mkdir(parents=True, exist_ok=True)
os.link(str(src_file), str(dst_file)) # hardlink
print(f"Hardlinked: {src_file.name} -> {dst_file}")

print(f"Hardlinked checkpoint with inode sharing: {src} -> {dst}")

def symlink_for_latest_checkpoint(checkpoint_dir: str, checkpoint_name: str) -> None:
"""
Update a 'latest' symlink to point to the most recent checkpoint.
Common pattern: 'checkpoint_latest' -> 'checkpoint_step_50000'
"""
latest_link = Path(checkpoint_dir) / "checkpoint_latest"
target = Path(checkpoint_name)

# Remove old symlink if it exists
if latest_link.is_symlink():
latest_link.unlink()

# Create new symlink (relative path for portability)
os.symlink(target.name, str(latest_link))
print(f"Updated symlink: checkpoint_latest -> {target.name}")

The Page Cache and Dirty Pages

When you write to a file with write(), the kernel copies the data to the page cache - a region of RAM managed by the kernel. The actual disk write happens later, by kernel background threads (pdflush/writeback threads). The written pages are called "dirty pages" until they are flushed to disk.

write() call flow:
Application ──write()──> Page Cache (dirty) ──flush──> Disk

read() call flow (cache hit):
Application <──memcpy── Page Cache (clean)

read() call flow (cache miss):
Application <──memcpy── Page Cache <──disk read── Disk

The kernel controls when to flush dirty pages via two parameters:

# Start background writeback when dirty pages exceed X% of RAM
cat /proc/sys/vm/dirty_background_ratio # default: 10
# Block new writes and flush synchronously when dirty pages exceed X% of RAM
cat /proc/sys/vm/dirty_ratio # default: 20

On a 256 GB training machine, 10% dirty_background_ratio means 25 GB of dirty pages before background writeback starts. A 130 GB checkpoint write fills the page cache well beyond this threshold, triggering aggressive writeback - but the training process doesn't know when writeback is complete.

import os
import time

def get_page_cache_stats() -> dict:
"""Read page cache dirty/writeback stats from /proc/meminfo."""
stats = {}
with open("/proc/meminfo") as f:
for line in f:
for key in ["Cached:", "Dirty:", "Writeback:", "Buffers:"]:
if line.startswith(key):
kb = int(line.split()[1])
stats[key.rstrip(":")] = f"{kb / 1024:.0f} MB"
return stats

def wait_for_dirty_pages_to_flush(poll_interval: float = 1.0) -> float:
"""
Wait until all dirty pages have been written to disk.
Useful after torch.save() to know when data is actually on disk.
"""
start = time.perf_counter()
while True:
with open("/proc/meminfo") as f:
content = f.read()
dirty_kb = 0
for line in content.split("\n"):
if line.startswith("Dirty:"):
dirty_kb = int(line.split()[1])
break
if dirty_kb < 1024: # less than 1 MB dirty - effectively flushed
return time.perf_counter() - start
time.sleep(poll_interval)

fsync and Write Barriers

fsync(fd) flushes all dirty data for a specific file to disk and blocks until the storage device acknowledges receipt. This is the correct way to guarantee checkpoint durability.

fdatasync(fd) is like fsync but only flushes data blocks, not metadata. Faster for large data files where you do not care about atime/mtime updates.

import os
import time
import threading
from pathlib import Path
import torch

def save_checkpoint_durable(
checkpoint: dict,
path: str,
background: bool = True
) -> threading.Thread | None:
"""
Save a PyTorch checkpoint with durability guarantees.

If background=True: returns immediately, saves in a background thread.
Caller should check the returned thread's is_alive() to know when done.

If background=False: blocks until checkpoint is on disk.
"""

def _save_and_fsync():
tmp_path = path + ".tmp"
try:
# 1. Write to temp file (in page cache)
torch.save(checkpoint, tmp_path)

# 2. Force data to disk
with open(tmp_path, "rb") as f:
os.fsync(f.fileno()) # blocks until disk acknowledges

# 3. Atomic rename (either old or new, never a partial file)
os.replace(tmp_path, path)

# 4. fsync the directory entry (rename durability)
parent_dir = str(Path(path).parent)
dir_fd = os.open(parent_dir, os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)

print(f"Checkpoint durably saved to {path}")

except Exception as e:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise e

if background:
t = threading.Thread(target=_save_and_fsync, daemon=True)
t.start()
return t
else:
_save_and_fsync()
return None

def benchmark_fsync_cost(path: str, size_mb: int = 100) -> dict:
"""Measure the cost of fsync for a given file size."""
data = b"x" * (size_mb * 1024 * 1024)

# Write without fsync
start = time.perf_counter()
with open(path, "wb") as f:
f.write(data)
write_time = time.perf_counter() - start

# Write with fsync
start = time.perf_counter()
with open(path, "wb") as f:
f.write(data)
os.fsync(f.fileno())
write_fsync_time = time.perf_counter() - start

os.unlink(path)
return {
"size_mb": size_mb,
"write_only_seconds": write_time,
"write_plus_fsync_seconds": write_fsync_time,
"fsync_overhead_seconds": write_fsync_time - write_time,
}

Buffered I/O vs Direct I/O (O_DIRECT)

Buffered I/O (default): data flows through the page cache. Reads from cache are fast (no disk access). Writes go to cache first, flushed later. The kernel automatically reads ahead (prefetches) sequential data.

Direct I/O (O_DIRECT): data bypasses the page cache entirely. The read/write goes directly between userspace buffer and disk. Requires sector-aligned buffers (typically 512 bytes or 4096 bytes).

Direct I/O is useful when:

  1. You manage your own buffer/cache (databases, custom ML data loaders)
  2. You are reading data that will not be re-read (prevents cache pollution)
  3. You are benchmarking raw disk performance (avoids cache distortion)

Direct I/O is harmful when:

  1. You re-read data (each read hits disk, no caching)
  2. Your access pattern is random (each random read requires a full I/O vs page cache serving partial matches)
  3. You have small I/O operations (DMA requires aligned buffers, small ops have high overhead)
import os
import mmap
import ctypes
import time

def open_direct(path: str, flags: int = os.O_RDONLY) -> int:
"""
Open a file with O_DIRECT for unbuffered I/O.
Returns raw file descriptor (not Python file object).
"""
O_DIRECT = 0x4000 # Linux-specific constant (not in Python's os module)
fd = os.open(path, flags | O_DIRECT)
return fd

def read_direct_aligned(fd: int, size: int, offset: int = 0) -> bytes:
"""
Read from a file opened with O_DIRECT.
Buffer must be aligned to 512 bytes (or block size of device).
"""
ALIGNMENT = 4096 # safe default for modern SSDs

# Allocate aligned buffer using ctypes
buf_size = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1)
buf = (ctypes.c_char * buf_size)()
addr = ctypes.addressof(buf)

# Align the address
aligned_addr = (addr + ALIGNMENT - 1) & ~(ALIGNMENT - 1)
aligned_buf = (ctypes.c_char * buf_size).from_address(aligned_addr)

os.lseek(fd, offset, os.SEEK_SET)
bytes_read = os.readv(fd, [memoryview(aligned_buf)[:size]])
return bytes(aligned_buf[:bytes_read])

def benchmark_buffered_vs_direct(path: str) -> dict:
"""
Compare buffered vs direct I/O throughput for sequential read.
First, write a large test file.
"""
size_gb = 1
chunk_size = 1 * 1024 * 1024 # 1 MB reads

# Create test file if needed
if not os.path.exists(path):
print(f"Creating {size_gb} GB test file...")
with open(path, "wb") as f:
for _ in range(size_gb * 1024):
f.write(os.urandom(1024 * 1024))

file_size = os.path.getsize(path)

# Flush page cache (root required)
try:
with open("/proc/sys/vm/drop_caches", "w") as f:
f.write("3")
except PermissionError:
pass # will measure with warm cache if no root

# Buffered read
start = time.perf_counter()
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
buffered_time = time.perf_counter() - start
buffered_throughput = file_size / buffered_time / 1e9

# Direct I/O read
O_DIRECT = 0x4000
fd = os.open(path, os.O_RDONLY | O_DIRECT)
try:
start = time.perf_counter()
while True:
# Aligned read
data = read_direct_aligned(fd, chunk_size)
if not data:
break
direct_time = time.perf_counter() - start
direct_throughput = file_size / direct_time / 1e9
finally:
os.close(fd)

return {
"buffered_gbps": buffered_throughput,
"direct_gbps": direct_throughput,
}

mmap vs read() for ML Datasets

mmap maps a file directly into the process's virtual address space. The OS page cache serves as the backing store. This gives you pointer-based access to file data with zero-copy reads and automatic prefetching.

import mmap
import os
import struct
import time
import numpy as np
from typing import Iterator

class ShardedMmapDataset:
"""
Large dataset stored as fixed-size binary shards.
Uses mmap for zero-copy access - the OS page cache handles caching.
Suitable for datasets too large to fit in RAM.
"""

MAGIC = b"SHARD001"
HEADER_SIZE = 64 # bytes

def __init__(self, shard_dir: str, dtype: np.dtype = np.float32):
self.shard_dir = shard_dir
self.dtype = dtype
self.shards = []
self._load_shards()

def _load_shards(self):
import glob
shard_paths = sorted(glob.glob(os.path.join(self.shard_dir, "*.shard")))
for path in shard_paths:
f = open(path, "rb")
# Validate magic header
magic = f.read(8)
assert magic == self.MAGIC, f"Invalid shard file: {path}"

# Read shard metadata
n_samples, sample_dim = struct.unpack("!QQ", f.read(16))
f.read(self.HEADER_SIZE - 24) # padding

mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
self.shards.append({
"path": path,
"file": f,
"mmap": mm,
"n_samples": n_samples,
"sample_dim": sample_dim,
"item_bytes": n_samples * sample_dim * self.dtype.itemsize,
})

def __len__(self) -> int:
return sum(s["n_samples"] for s in self.shards)

def __getitem__(self, idx: int) -> np.ndarray:
# Find which shard contains this index
cumulative = 0
for shard in self.shards:
if idx < cumulative + shard["n_samples"]:
local_idx = idx - cumulative
offset = self.HEADER_SIZE + local_idx * shard["sample_dim"] * self.dtype.itemsize
size = shard["sample_dim"] * self.dtype.itemsize
raw = shard["mmap"][offset: offset + size]
return np.frombuffer(raw, dtype=self.dtype).copy()
cumulative += shard["n_samples"]
raise IndexError(f"Index {idx} out of range")

def sequential_iter(self, batch_size: int = 32) -> Iterator[np.ndarray]:
"""
Sequential scan - OS prefetcher will detect the pattern and prefetch.
This is the most cache-efficient access pattern.
"""
for shard in self.shards:
n = shard["n_samples"]
dim = shard["sample_dim"]
item_bytes = dim * self.dtype.itemsize

for batch_start in range(0, n, batch_size):
batch_end = min(batch_start + batch_size, n)
offset = self.HEADER_SIZE + batch_start * item_bytes
size = (batch_end - batch_start) * item_bytes
raw = shard["mmap"][offset: offset + size]
batch = np.frombuffer(raw, dtype=self.dtype).reshape(-1, dim).copy()
yield batch

def __del__(self):
for shard in self.shards:
shard["mmap"].close()
shard["file"].close()

# HuggingFace datasets uses Apache Arrow IPC format with mmap
# This gives zero-copy access to tokenized text datasets
from datasets import load_dataset, Dataset

def efficient_hf_dataset_access():
"""
HuggingFace datasets with mmap-backed Arrow format.
"""
dataset = load_dataset(
"wikipedia",
"20220301.en",
split="train",
keep_in_memory=False, # use mmap instead of loading to RAM
)

# Accessing a column: zero-copy from Arrow mmap
texts = dataset["text"] # backed by mmap, no copy

# Tokenize with batched processing (efficient for mmap-backed data)
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_batch(examples):
return tokenizer(
examples["text"],
truncation=True,
max_length=512,
padding="max_length",
)

tokenized = dataset.map(
tokenize_batch,
batched=True,
batch_size=1000,
num_proc=8, # parallel tokenization
remove_columns=["text"],
)

return tokenized

io_uring: The Future of Async I/O

io_uring (introduced in Linux 5.1, 2019) provides a shared ring buffer interface for submitting and completing I/O operations with minimal system call overhead. Two ring buffers - a submission queue (SQ) and a completion queue (CQ) - are shared between kernel and userspace.

# io_uring via liburing Python bindings
# pip install liburing (Python wrapper around liburing C library)

# Low-level io_uring via ctypes (for understanding the interface)
import ctypes
import ctypes.util
import struct
import os

# io_uring system call numbers (x86-64)
SYS_io_uring_setup = 425
SYS_io_uring_enter = 426
SYS_io_uring_register = 427

# io_uring_params flags
IORING_SETUP_SQPOLL = 0x2 # kernel thread polls SQ (zero-syscall mode)
IORING_SETUP_IOPOLL = 0x8 # poll-mode I/O (for NVMe with polling support)

class IoUringDemo:
"""
Demonstrate io_uring concepts using Python's asyncio-based alternative.
For production use, use aiofiles which uses io_uring on modern kernels.
"""
pass

# Practical: use aiofiles which transparently uses io_uring on Linux 5.1+
import asyncio
import aiofiles
from typing import List

async def async_read_dataset_files(
file_paths: List[str],
chunk_size: int = 64 * 1024 # 64 KB chunks
) -> List[bytes]:
"""
Read multiple files concurrently using async I/O.
aiofiles uses io_uring on Linux 5.1+, epoll on older kernels.
"""
async def read_one_file(path: str) -> bytes:
chunks = []
async with aiofiles.open(path, "rb") as f:
while True:
chunk = await f.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
return b"".join(chunks)

# Read all files concurrently
tasks = [read_one_file(path) for path in file_paths]
results = await asyncio.gather(*tasks)
return list(results)

async def async_write_checkpoint_with_verify(
checkpoint_data: bytes,
path: str
) -> None:
"""
Async checkpoint write with verification.
"""
import hashlib

# Compute checksum before writing
checksum = hashlib.sha256(checkpoint_data).hexdigest()

tmp_path = path + ".tmp"
async with aiofiles.open(tmp_path, "wb") as f:
await f.write(checkpoint_data)
await f.flush()
# fsync via OS file descriptor
os.fsync(f.fileno())

# Atomic rename
os.replace(tmp_path, path)

# Write checksum file
async with aiofiles.open(path + ".sha256", "w") as f:
await f.write(checksum)

print(f"Checkpoint saved: {len(checkpoint_data) / 1e9:.2f} GB, sha256: {checksum[:16]}...")

# Batch read for DataLoader prefetching using io_uring indirectly
async def prefetch_batch_files(
paths: List[str],
prefetch_count: int = 8
) -> asyncio.Queue:
"""
Producer coroutine that prefetches files into a queue.
Consumer (training loop) pops from queue.
"""
queue = asyncio.Queue(maxsize=prefetch_count)

async def producer():
for path in paths:
data = await read_with_madvise(path)
await queue.put(data)
await queue.put(None) # sentinel

asyncio.create_task(producer())
return queue

async def read_with_madvise(path: str) -> bytes:
"""
Read a file, advising the kernel about our access pattern.
madvise(MADV_SEQUENTIAL) triggers aggressive readahead.
"""
async with aiofiles.open(path, "rb") as f:
# On the underlying fd, set readahead hint
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
MADV_SEQUENTIAL = 2
# Get file size for madvise range
file_size = os.fstat(f.fileno()).st_size
# madvise on the fd's mmap would go here in a full implementation
return await f.read()

Sequential vs Random I/O for ML Workloads

The access pattern of your I/O workload determines which storage technology and file format is optimal.

Sequential I/O: reads data in order from beginning to end. The OS readahead prefetcher detects the pattern and prefetches pages before they are needed. HDDs can achieve 200 MB/s sequential but only 1-5 MB/s random (seek time dominates). SSDs: 5-7 GB/s sequential, 1-3 GB/s random. Sequential I/O is ideal for streaming large dataset shards.

Random I/O: accesses data at arbitrary locations. Critical for in-memory database-style datasets where each sample is at a different offset. Requires either a fast SSD (NVMe with 1M+ IOPS) or a data format that packs samples tightly (WebDataset, HDF5, LMDB).

import os
import time
import random
import numpy as np

def measure_sequential_throughput(path: str, chunk_mb: int = 4) -> float:
"""Measure sequential read throughput in GB/s."""
chunk_size = chunk_mb * 1024 * 1024
total_bytes = 0

# Drop page cache first (requires root)
try:
with open("/proc/sys/vm/drop_caches", "w") as f:
f.write("1")
except PermissionError:
pass # measure from cache if no root

start = time.perf_counter()
with open(path, "rb") as f:
while True:
data = f.read(chunk_size)
if not data:
break
total_bytes += len(data)

elapsed = time.perf_counter() - start
return total_bytes / elapsed / 1e9 # GB/s

def measure_random_iops(path: str, io_size: int = 4096, n_reads: int = 10000) -> float:
"""Measure random read IOPS (I/O operations per second)."""
file_size = os.path.getsize(path)
max_offset = file_size - io_size
offsets = [random.randint(0, max_offset // io_size) * io_size
for _ in range(n_reads)]

fd = os.open(path, os.O_RDONLY | os.O_DIRECT if os.name == "posix" else os.O_RDONLY)
# Use aligned buffer for O_DIRECT
buf = bytearray(io_size)

start = time.perf_counter()
try:
for offset in offsets:
os.lseek(fd, offset, os.SEEK_SET)
os.read(fd, io_size)
finally:
os.close(fd)

elapsed = time.perf_counter() - start
return n_reads / elapsed # IOPS

# ML-specific I/O patterns:
# 1. Training on ImageNet (random access): use NVMe RAID or LMDB
# 2. Training on text (sequential shards): HDD or network storage fine
# 3. Checkpoint writes: sequential large write, use fsync, NVMe preferred
# 4. Model loading for inference: sequential, page cache warm after first load

Filesystem Tuning for ML Workloads

# ============================================================
# ext4 tuning for ML workloads
# ============================================================

# Mount options for training data (in /etc/fstab):
# noatime: don't update access time on reads (saves writes)
# nodiratime: don't update dir access time
# data=writeback: fastest writes (no journaling for data, metadata journaled)
# barrier=0: disable write barriers (DANGEROUS without UPS, fast without)
# commit=60: journal commit interval in seconds (default 5s, longer = less IO)

# Example fstab entry:
# /dev/nvme0n1p1 /data ext4 defaults,noatime,nodiratime,data=writeback,commit=60 0 0

# After mounting, tune readahead for large sequential files (in blocks of 512B)
# 8192 = 4 MB readahead - good for large dataset shards
sudo blockdev --setra 8192 /dev/nvme0n1

# Or set in udev rules for persistence:
echo 'SUBSYSTEM=="block", KERNEL=="nvme*", ACTION=="add", ATTR{bdi/read_ahead_kb}="4096"' \
| sudo tee /etc/udev/rules.d/99-nvme-readahead.rules

# ============================================================
# XFS tuning (preferred for large file ML workloads)
# ============================================================
# XFS-specific mount options:
# logbsize=256k: increase journal buffer (default 32KB)
# largeio: hint that workload uses large I/O (>= 64KB chunks)
# inode64: enable inodes beyond 1TB for large filesystems

# Format with large stripe unit for RAID:
# mkfs.xfs -d su=256k,sw=4 /dev/md0 # 256KB stripe, 4-disk RAID5

# ============================================================
# tmpfs for fast scratch space
# ============================================================
# Use tmpfs for temporary data that must be written/read quickly
# Backed by RAM (and swap if RAM is full)

sudo mkdir /mnt/fast_scratch
sudo mount -t tmpfs -o size=32g,uid=$(id -u),gid=$(id -g) tmpfs /mnt/fast_scratch

# Ideal for: tokenized batch staging, small dataset subsets, temp checkpoints
# Performance: RAM speed (~50 GB/s sequential) - 10x faster than NVMe

# ============================================================
# Filesystem stats
# ============================================================
import os
import shutil

def filesystem_stats(path: str = ".") -> dict:
"""Get filesystem statistics for the given path."""
stat = os.statvfs(path)
disk = shutil.disk_usage(path)

# statvfs fields
total_bytes = stat.f_blocks * stat.f_frsize
free_bytes = stat.f_bfree * stat.f_frsize
avail_bytes = stat.f_bavail * stat.f_frsize # available to non-root
used_bytes = total_bytes - free_bytes
inode_total = stat.f_files
inode_free = stat.f_ffree
block_size = stat.f_bsize
fragment_size = stat.f_frsize

return {
"total_gb": total_bytes / 1e9,
"used_gb": used_bytes / 1e9,
"available_gb": avail_bytes / 1e9,
"use_percent": 100 * used_bytes / total_bytes if total_bytes > 0 else 0,
"inodes_total": inode_total,
"inodes_free": inode_free,
"inodes_used_percent": 100 * (1 - inode_free / inode_total) if inode_total > 0 else 0,
"block_size_bytes": block_size,
"fragment_size_bytes": fragment_size,
}

def check_inode_exhaustion(path: str = ".") -> bool:
"""
ML datasets with millions of small files can exhaust inodes.
A filesystem at 100% inode usage cannot create new files even
if there is plenty of disk space.
"""
stats = filesystem_stats(path)
inode_used_pct = stats["inodes_used_percent"]
if inode_used_pct > 80:
print(f"WARNING: {inode_used_pct:.1f}% of inodes used at {path}")
print("Consider archiving small files into tar/HDF5/WebDataset format")
return True
return False

def tune_dirty_pages_for_checkpoint(size_gb: float = 100.0) -> None:
"""
Tune dirty page limits before writing a large checkpoint.
After writing, restore defaults.
"""
# Calculate what dirty ratio allows the checkpoint to buffer fully
result = subprocess.run(["free", "-b"], capture_output=True, text=True)
# Parse total RAM
for line in result.stdout.split("\n"):
if line.startswith("Mem:"):
total_ram = int(line.split()[1])
break

# Set dirty ratio high enough to buffer the checkpoint
checkpoint_pct = int(100 * size_gb * 1e9 / total_ram) + 5

try:
with open("/proc/sys/vm/dirty_ratio", "w") as f:
f.write(str(min(95, checkpoint_pct)))
print(f"Set dirty_ratio to {checkpoint_pct}% for {size_gb} GB checkpoint")
except PermissionError:
print("Cannot tune dirty_ratio without root")

NFS, GPFS, and Lustre for HPC

Most large-scale training runs use distributed file systems rather than local storage.

NFS (Network File System): Simple, widely supported. Works fine for small clusters. Becomes a bottleneck at scale because all I/O goes through a single NFS server. Metadata operations (stat, readdir) are synchronous and slow on large directories.

GPFS/Spectrum Scale (IBM): High-performance parallel file system used in IBM HPC clusters. Supports stripping data across many storage nodes for parallel access. Provides filesystem quotas, snapshots, and tiered storage.

Lustre: The dominant file system for TOP500 supercomputers and large ML clusters. Data is stored across multiple object storage servers (OSSes), with metadata managed by separate metadata servers (MDTs). A 100,000-node cluster can achieve aggregate I/O of terabytes per second by parallelizing reads across thousands of OSTs (Object Storage Targets).

import os
import subprocess
from typing import Optional

def detect_filesystem_type(path: str) -> str:
"""Detect what filesystem a path is on."""
result = subprocess.run(
["df", "-T", path],
capture_output=True, text=True
)
if result.returncode == 0:
lines = result.stdout.strip().split("\n")
if len(lines) >= 2:
return lines[1].split()[1] # filesystem type field
return "unknown"

def optimize_for_lustre(path: str, stripe_count: int = -1) -> None:
"""
Set Lustre striping for a file or directory.
stripe_count=-1 means all OSTs (maximum parallelism).
For a 100 GB file, striping across all OSTs multiplies read bandwidth.
"""
try:
if stripe_count == -1:
# Use all available OSTs
subprocess.run(
["lfs", "setstripe", "-c", "-1", path],
check=True
)
else:
subprocess.run(
["lfs", "setstripe", "-c", str(stripe_count), path],
check=True
)
print(f"Set Lustre stripe count to {stripe_count} for {path}")
except (subprocess.CalledProcessError, FileNotFoundError):
print("lfs command not available - not on a Lustre filesystem")

def get_lustre_stats(path: str) -> Optional[dict]:
"""Get Lustre file layout information."""
result = subprocess.run(
["lfs", "getstripe", path],
capture_output=True, text=True
)
if result.returncode == 0:
return {"raw_output": result.stdout}
return None

# Best practices for Lustre in ML training:
# 1. Use large stripe size (1MB+) for checkpoint files
# lfs setstripe -c -1 -S 1m /scratch/checkpoints/
#
# 2. Use moderate stripe count for dataset shards
# lfs setstripe -c 8 -S 128k /scratch/datasets/
#
# 3. Avoid millions of small files - use tar archives or WebDataset
# Lustre MDT (metadata) is the bottleneck for small file operations
#
# 4. Use --noprogress flag in rsync to reduce metadata operations
# rsync -a --noprogress src/ dest/
#
# 5. Read stripe layout before copying to preserve it
# lfs find src/ --type f | while read f; do lfs setstripe dst/$f --copy $f; done

Object Storage (S3) vs POSIX

Amazon S3 and compatible object stores (GCS, Azure Blob) are not POSIX file systems. They have different semantics:

  • No random writes: objects are written atomically. You cannot append to an S3 object. A partial write fails silently in S3FS.
  • Eventually consistent listing: after writing an object, list_objects may not immediately show it.
  • High latency per operation: each S3 PUT/GET is a separate HTTP request (5-50ms latency). Throughput requires parallelism.
  • High aggregate throughput: S3 can sustain hundreds of GB/s aggregate if you parallelize requests.
import boto3
import concurrent.futures
import os
import io
import time
from typing import List, Tuple

def parallel_s3_download(
bucket: str,
s3_keys: List[str],
local_dir: str,
n_workers: int = 32,
chunk_size: int = 64 * 1024 * 1024 # 64 MB parts
) -> List[str]:
"""
Download S3 objects in parallel using multipart downloads.
This is how you achieve S3's full bandwidth (vs serial downloads).
"""
s3_client = boto3.client("s3")
local_paths = []

def download_one(key: str) -> str:
local_path = os.path.join(local_dir, os.path.basename(key))
# boto3's download_file uses multipart download automatically
s3_client.download_file(
bucket, key, local_path,
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=chunk_size,
max_concurrency=8, # per-object parallelism
multipart_chunksize=chunk_size,
use_threads=True,
)
)
return local_path

with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor:
futures = {executor.submit(download_one, key): key for key in s3_keys}
for future in concurrent.futures.as_completed(futures):
try:
path = future.result()
local_paths.append(path)
except Exception as e:
print(f"Failed to download {futures[future]}: {e}")

return local_paths

# Streaming from S3 for training (no local disk required)
# Use if dataset > local disk capacity
import smart_open # wraps S3, GCS, Azure Blob with streaming

def stream_training_shards_from_s3(
bucket: str,
prefix: str,
batch_size: int = 32
):
"""
Stream WebDataset shards directly from S3.
No local disk required. Ideal for very large datasets.
"""
import webdataset as wds

# WebDataset supports streaming from S3 URLs directly
urls = f"s3://{bucket}/{prefix}/shard-{{000000..000999}}.tar"

dataset = (
wds.WebDataset(urls)
.shuffle(1000)
.decode("pil") # decode images
.to_tuple("jpg", "cls") # extract (image, label) tuples
.map_tuple(transform, int)
.batched(batch_size)
)

return dataset

def transform(img):
"""Simple transform placeholder."""
import torchvision.transforms as T
return T.Compose([T.Resize(224), T.ToTensor()])(img)

Production Engineering Notes

Readahead Tuning

# View current readahead setting (in 512-byte blocks)
blockdev --getra /dev/nvme0n1

# For sequential streaming of large dataset files: increase readahead
# 4096 blocks = 2 MB readahead (default is usually 128 KB)
sudo blockdev --setra 4096 /dev/nvme0n1

# For random I/O (inference serving, random sample access): reduce readahead
# Readahead for random I/O just wastes cache and causes extra I/O
sudo blockdev --setra 256 /dev/nvme0n1

# Per-file readahead using fadvise (in Python)
import ctypes
import ctypes.util
POSIX_FADV_SEQUENTIAL = 2
POSIX_FADV_RANDOM = 1
POSIX_FADV_WILLNEED = 3 # trigger readahead now
POSIX_FADV_DONTNEED = 4 # evict from page cache after use

libc = ctypes.CDLL(ctypes.util.find_library("c"))

def advise_sequential_access(fd: int, offset: int = 0, length: int = 0) -> None:
"""Tell kernel this file will be read sequentially - increases readahead."""
libc.posix_fadvise(ctypes.c_int(fd), ctypes.c_int64(offset),
ctypes.c_int64(length), ctypes.c_int(POSIX_FADV_SEQUENTIAL))

def evict_from_page_cache(fd: int, offset: int = 0, length: int = 0) -> None:
"""
Evict pages after reading - prevents dataset from polluting
model/activation pages in the page cache.
Important when dataset is larger than RAM.
"""
libc.posix_fadvise(ctypes.c_int(fd), ctypes.c_int64(offset),
ctypes.c_int64(length), ctypes.c_int(POSIX_FADV_DONTNEED))

Dirty Page Ratio Tuning for Checkpointing

# Default: 20% dirty ratio (on 256 GB machine = 51 GB can be dirty before blocking)
cat /proc/sys/vm/dirty_ratio

# For checkpoint-heavy workloads where you call fsync explicitly:
# Lower dirty_ratio so background writeback happens more frequently
# Reduces the "burst" of writes when checkpoint is triggered
sudo sysctl -w vm.dirty_ratio=5
sudo sysctl -w vm.dirty_background_ratio=2

# Monitor dirty pages in real time
watch -n 1 "grep -E 'Dirty|Writeback' /proc/meminfo"

# If Writeback is consistently > 0, your storage cannot keep up
# with the write rate - time to upgrade storage or reduce checkpoint frequency

:::danger Fatal I/O Mistakes for ML Engineers

Assuming write() guarantees durability. It does not. write() copies data to the page cache. The process can exit and the data is safe. But a power failure, kernel panic, or storage device failure before the page cache is flushed will lose data. For checkpoints: always call fsync() after writing, or use a temp file + atomic rename pattern. The first checkpoint you lose to this mistake will cost hours or days of training time.

Using O_DIRECT without properly aligned buffers. Direct I/O requires buffer addresses and sizes to be aligned to the block size (typically 512 bytes or 4096 bytes). An unaligned buffer silently falls back to buffered I/O on some kernels, or raises EINVAL on others. Never use O_DIRECT without ctypes-allocated aligned buffers or the io_uring abstraction.

Storing millions of small files on Lustre or NFS. A 1M-image ImageNet dataset as individual JPEG files has excellent performance on local NVMe but will bring a Lustre MDT to its knees. The MDT handles all metadata operations (open, close, stat, readdir) and is shared across the entire cluster. 16 DataLoader workers each opening 100 images per second = 1600 open() calls per second per training job, multiplied across 100 concurrent training jobs = 160,000 metadata ops/second. The fix: archive into WebDataset (tarball shards), LMDB, or HDF5 before copying to the shared filesystem. :::

:::warning Page Cache Interference Between Training and Serving

When a training job and an inference server share the same machine, they compete for the page cache. The training DataLoader reads dataset files (gigabytes per minute), evicting the model weights from the page cache. The inference server suddenly experiences major page faults on the next request as it re-loads model pages from disk.

Fix: use posix_fadvise(POSIX_FADV_DONTNEED) after each training batch to voluntarily evict dataset pages from the page cache. This gives the inference server's model pages priority in the cache.

def evict_dataset_pages_after_batch(file_path: str, offset: int, length: int):
"""Call after each batch read to free page cache."""
with open(file_path, "rb") as f:
libc.posix_fadvise(
ctypes.c_int(f.fileno()),
ctypes.c_int64(offset),
ctypes.c_int64(length),
ctypes.c_int(4) # POSIX_FADV_DONTNEED
)

For dedicated training machines, this is unnecessary. For mixed training + serving, it is critical. :::


Interview Questions and Answers

Q1: A training job calls torch.save(model, "checkpoint.pt") and the process exits immediately after. Is the checkpoint on disk? What would you do to guarantee it?

Not necessarily. torch.save() calls Python's pickle serializer and writes to a Python file object backed by a write() syscall. write() copies data to the kernel's page cache and returns immediately. The physical disk write happens asynchronously, scheduled by the kernel's writeback threads based on vm.dirty_ratio thresholds. If the process exits before the kernel flushes the dirty pages, the data is safe (the kernel continues flushing after the process exits). If the machine crashes before flushing, the checkpoint is lost.

To guarantee durability:

import os

# Method 1: fsync after write (synchronous, blocks until disk acknowledges)
with open("checkpoint.pt", "wb") as f:
torch.save(model, f)
f.flush() # flush Python buffer to kernel
os.fsync(f.fileno()) # flush kernel page cache to disk

# Method 2: atomic temp file + fsync + rename
# Prevents partial checkpoint if crash occurs during write
import tempfile
from pathlib import Path

tmp_fd, tmp_path = tempfile.mkstemp(dir=Path("checkpoint.pt").parent, suffix=".tmp")
try:
with os.fdopen(tmp_fd, "wb") as f:
torch.save(model, f)
os.fsync(f.fileno())
os.replace(tmp_path, "checkpoint.pt") # atomic on POSIX
except:
os.unlink(tmp_path)
raise

Q2: Explain the difference between ext4 and XFS for ML storage workloads. When would you choose each?

ext4 is a mature journaling file system that supports extents (contiguous block allocation), delayed allocation (batch block assignments for better locality), and a flexible block group layout. It handles mixed workloads well and has excellent tooling. Limitations: a single directory with millions of files (common for ImageNet-style datasets) performs poorly because ext4's htree directory indexing has bounded scalability.

XFS uses B-tree-based allocation throughout (extents, free space, inodes) and scales to hundreds of millions of files in a single directory. It uses aggressive preallocation for growing files. XFS performs better for large file sequential I/O (model checkpoints, dataset shards) because its delayed allocation and preallocation reduce fragmentation. It has better sustained write performance because its journal uses a circular buffer with lower overhead.

Choose ext4 for: single-node workstations, mixed workloads, cloud VMs where ext4 is the default. Choose XFS for: dedicated training storage servers, filesystems with millions of files (ImageNet directories), large checkpoint workloads (>10 GB per checkpoint), and any setup where you have control over the format step (mkfs.xfs -d su=256k,sw=4 for RAID-optimized allocation).

Q3: What is O_DIRECT and when should you use it for ML data loading? What are the risks?

O_DIRECT is a file open flag that bypasses the kernel's page cache. Data flows directly between userspace buffers and the storage device. Requirements: the userspace buffer address and the read/write size must be aligned to the device's block size (typically 512 bytes or 4096 bytes).

Use O_DIRECT for ML data loading when: (1) you have a larger-than-RAM dataset and you want to avoid polluting the page cache with data that will only be read once per epoch, preventing your model weights from being evicted; (2) you manage your own I/O buffer pool and want predictable latency without page cache pressure; (3) you are benchmarking raw storage device throughput.

Risks: unaligned buffers raise EINVAL. For random I/O patterns, O_DIRECT is slower than buffered I/O because you lose the readahead and caching benefits. For re-read data (second epoch), O_DIRECT is catastrophically slow because every read hits disk.

The modern alternative: use posix_fadvise(POSIX_FADV_DONTNEED) after reading each batch - this evicts the batch from the page cache after use, preventing cache pollution without the alignment requirements of O_DIRECT.

Q4: Describe the io_uring interface. How is it different from epoll and why does it matter for ML data pipelines?

io_uring uses two ring buffers shared between kernel and userspace: a Submission Queue (SQ) where userspace writes I/O requests, and a Completion Queue (CQ) where the kernel writes results. The key difference: with traditional read()/write(), each I/O operation requires a system call (context switch to kernel, ~100ns overhead). With io_uring, you can submit hundreds of I/O operations by writing to the SQ ring and making a single io_uring_enter() syscall (or zero syscalls in SQPOLL mode where a kernel thread polls the SQ continuously).

epoll is a different mechanism: it is an event notification system that tells you when a file descriptor is ready for I/O. You still call read()/write() individually. epoll is best for network sockets. io_uring is best for file I/O where you want to batch many reads/writes.

For ML data pipelines: a DataLoader that reads 32 images per batch can submit all 32 read() requests to io_uring in one syscall, then process the completions as they arrive. The kernel overlaps the I/O operations with each other and with CPU-side processing. Compared to 32 sequential read() calls (32 system call round-trips), io_uring reduces overhead by roughly 30-50% for small-file random I/O patterns.

Q5: Your training job on a Lustre cluster shows 5x lower dataset read throughput than expected based on the cluster's aggregate bandwidth specification. What causes would you investigate?

First, check stripe configuration. If the dataset files were copied to Lustre without setting a stripe count, they default to 1 stripe (one OST). All reads for that file go to a single OST regardless of how many OSTs exist. Fix: lfs setstripe -c -1 /scratch/dataset/ before copying data, or lfs migrate -c -1 /scratch/dataset/ to re-stripe existing data.

Second, check MDT saturation. Each open(), stat(), and close() hits the MDT. With millions of small files and many concurrent DataLoader workers, the MDT becomes the bottleneck. lfs mdstat shows MDT load. Fix: archive small files into tar shards (WebDataset format) to eliminate per-file metadata operations.

Third, check network bandwidth. Between nodes and the Lustre OSSes. lctl get_param llite.*.stats shows read/write bytes from the client perspective. If aggregate is lower than expected, check for network congestion.

Fourth, check for lock contention. Lustre uses distributed locking (DLM). If many processes try to read the same file simultaneously, lock contention can serialize access. Fix: stripe files across OSTs so different processes access different OSTs.

Fifth, check I/O size. Lustre performs best with large I/O operations (1 MB+ per read). DataLoader workers that read 4 KB images one-by-one each make individual small Lustre RPCs. Use WebDataset or LMDB to pack many samples into large sequential reads.

Q6: Explain the atomic temp file + rename pattern for checkpoint saving. Why is it necessary? Are there any failure modes?

The pattern:

# Write to temp file
torch.save(model, "checkpoint.pt.tmp")
os.fsync(open("checkpoint.pt.tmp").fileno())
# Atomic rename
os.replace("checkpoint.pt.tmp", "checkpoint.pt")

It is necessary because writing directly to checkpoint.pt leaves a window where the file is partially written. If the process crashes during the write, checkpoint.pt is corrupted (header written, but data truncated). A reader cannot distinguish a partial write from a complete one. The temp file pattern ensures that checkpoint.pt always either exists in its old complete form or its new complete form - never a partial form. os.replace() is atomic on POSIX file systems (it is a rename, which modifies only the directory entry atomically).

Failure modes: (1) The temp file write succeeds, but the rename fails (e.g., cross-device rename, permission issue). checkpoint.pt is still the old version, but temp space is consumed. (2) The temp file is on a different filesystem than the destination - os.replace() will fail silently or raise OSError on cross-device rename. Always use tempfile.mkstemp(dir=same_filesystem) to ensure the temp and destination are on the same device. (3) On NFS, rename() may not be atomic - two writers renaming simultaneously can create a race. For distributed training checkpointing, use a distributed coordination layer (etcd, Zookeeper, or file locking with fcntl.flock) to ensure only one process writes the checkpoint.

© 2026 EngineersOfAI. All rights reserved.